Source code for pi_portal.modules.tasks.queue.persist_queue

"""The persist-queue library's implementation of a queue."""
import logging
import os
import shutil
from typing import TYPE_CHECKING, Optional, cast

from persistqueue import Empty as VendorException
from persistqueue import SQLiteAckQueue as VendorQueue
from pi_portal import config
from typing_extensions import TypedDict
from .bases.queue_base import QueueBase, QueueMetrics

if TYPE_CHECKING:  # pragma: no cover
  from pi_portal.modules.tasks.enums import RoutingLabel
  from pi_portal.modules.tasks.task.bases.task_base import TypeGenericTask


[docs]class TypeRawTask(TypedDict): """Typed representation of a 'raw' deserialized persist-queue task.""" pqid: int data: "TypeGenericTask"
[docs]class Queue(QueueBase): """The persist-queue library's implementation of a queue. :param log: A logger instance. """ __slots__ = ("_path", "_queue") _active_size = 0 _db_path = config.PATH_TASKS_SERVICE_DATABASES _queue: VendorQueue timeout = 2
[docs] def __init__( self, log: logging.Logger, routing_label: "RoutingLabel", ) -> None: super().__init__(log, routing_label) self._path = os.path.join(self._db_path, routing_label.value) self._initialize()
def _initialize(self) -> None: os.makedirs(self._path, exist_ok=True) self._queue = VendorQueue( path=self._path, auto_resume=True, multithreading=True, timeout=self.timeout, )
[docs] def raw(self) -> VendorQueue: """Access the underlying queue directly.""" return self._queue
def _ack(self, task: "TypeGenericTask") -> None: self._queue.ack(id=task.id) def _get(self) -> "TypeGenericTask": try: raw_data = self._responsive_get() item = raw_data['data'] item.id = raw_data['pqid'] return item except AttributeError: self.log.error( "Fatal error during deserialization!", extra={'queue': self.routing_label.value}, ) self.log.error( "To restore service the queue is being cleared. " "Tasks have been lost!", extra={'queue': self.routing_label.value}, ) shutil.rmtree(self._path) self._initialize() return self._get() def _responsive_get(self) -> "TypeRawTask": raw_data: "Optional[TypeRawTask]" = None while not raw_data: try: raw_data = cast( TypeRawTask, self._queue.get(block=True, timeout=1, raw=True), ) except VendorException: pass return raw_data def _maintenance(self) -> None: self._queue.clear_acked_data() self._queue.shrink_disk_usage() def _metrics(self) -> "QueueMetrics": return QueueMetrics( length=self._queue.size, acked_length=self._queue.acked_count(), unacked_length=self._queue.unack_count(), storage_mb=self._size_on_disk_in_mb() ) def _size_on_disk_in_mb(self) -> float: return sum( os.path.getsize(f) for f in os.scandir(self._path) if f.is_file() ) / 1024 / 1024 def _put(self, task: "TypeGenericTask") -> None: task.id = self._queue.put(task) def _retry(self, task: "TypeGenericTask") -> None: self._queue.nack(id=task.id)