Source code for pi_portal.modules.tasks.queue.bases.queue_base

"""The task queue for Pi Portal."""

import abc
import logging
from datetime import datetime, timezone
from typing import TYPE_CHECKING, NamedTuple

if TYPE_CHECKING:  # pragma: no cover
  from pi_portal.modules.tasks.enums import RoutingLabel
  from pi_portal.modules.tasks.task.bases.task_base import TypeGenericTask


[docs]class QueueBase(abc.ABC): """An abstract queue base class to wrap around implementations.""" __slots__ = ("log", "routing_label")
[docs] def __init__( self, log: logging.Logger, routing_label: "RoutingLabel", ) -> None: """:param log: A logger instance. :param routing_label: The routing label of this queue. """ self.log = log self.routing_label = routing_label
@abc.abstractmethod def _ack(self, task: "TypeGenericTask") -> None: """Override with vendor implementation.""" @abc.abstractmethod def _get(self) -> "TypeGenericTask": """Override with vendor implementation.""" @abc.abstractmethod def _maintenance(self) -> None: """Override with vendor implementation.""" @abc.abstractmethod def _metrics(self) -> "QueueMetrics": """Override with vendor implementation.""" @abc.abstractmethod def _put(self, task: "TypeGenericTask") -> None: """Override with vendor implementation.""" @abc.abstractmethod def _retry(self, task: "TypeGenericTask") -> None: """Override with vendor implementation."""
[docs] def ack(self, task: "TypeGenericTask") -> None: """Ack a task from the queue. :param task: the task to ack. """ self._ack(task) self.log.debug( "Ack: '%s'!", task, extra={ "queue": self.routing_label.value, "task_id": task.id, "task_type": task.type, }, )
[docs] def get(self) -> "TypeGenericTask": """Return the typed task object. :returns: The dequeued task object. """ task = self._get() self.log.debug( "Dequeued: '%s'!", task, extra={ "queue": self.routing_label.value, "task_id": task.id, "task_type": task.type, }, ) return task
[docs] def maintenance(self) -> None: """Perform queue maintenance tasks.""" self._maintenance()
[docs] def metrics(self) -> "QueueMetrics": """Extract queue metrics. :returns: A collection of metrics for the task queue. """ return self._metrics()
[docs] def put(self, task: "TypeGenericTask") -> None: """Enqueue a task. :param task: the task to schedule. """ task.scheduled = datetime.now(tz=timezone.utc) self._put(task) self.log.debug( "Enqueued: '%s'!", task, extra={ "queue": self.routing_label.value, "task_id": task.id, "task_type": task.type, }, )
[docs] def retry(self, task: "TypeGenericTask") -> None: """Retry a failed task. :param task: the task to retry. """ task.completed = None task.ok = None task.result.value = None self._retry(task) self.log.debug( "Retried: '%s'!", task, extra={ "queue": self.routing_label.value, "task_id": task.id, "task_type": task.type, } )
[docs]class QueueMetrics(NamedTuple): """A collection of metrics for the task queue.""" length: int acked_length: int unacked_length: int storage_mb: float