"""QueueWorker class."""
from typing import TYPE_CHECKING
from pi_portal.modules.tasks.enums import TaskManifests, TaskType
from .bases import worker_base
if TYPE_CHECKING: # pragma: no cover
from pi_portal.modules.tasks.enums import RoutingLabel
from pi_portal.modules.tasks.scheduler import TaskScheduler
from pi_portal.modules.tasks.task.bases.task_base import TypeGenericTask
[docs]class QueueWorker(worker_base.WorkerBase):
"""Queue worker for the task scheduler.
:param scheduler: A task scheduler instance
:param routing_label: The routing label used by this queue worker.
"""
__slots__ = (
"_is_running",
"failed_task_manifest",
"log",
"registry",
"router",
"routing_label",
)
do_not_process = [TaskType.NON_SCHEDULED]
[docs] def __init__(
self,
scheduler: "TaskScheduler",
routing_label: "RoutingLabel",
) -> None:
self._is_running = True
self.failed_task_manifest = scheduler.manifests[TaskManifests.FAILED_TASKS]
self.log = scheduler.log
self.registry = scheduler.registry
self.router = scheduler.router
self.routing_label = routing_label
[docs] def start(self) -> None:
"""Maintain a continuous flow of tasks to the worker thread."""
self.log.warning(
"Worker thread has started ...",
extra={
"queue": self.routing_label.value,
},
)
while self._is_running:
self.consumer()
self.log.warning(
"Worker thread has exited!",
extra={
"queue": self.routing_label.value,
},
)
[docs] def consumer(self) -> None:
"""Fetch the next task from the queue and process it."""
task = self.router.get(routing_label=self.routing_label)
if self._should_task_be_processed(task):
self._do_task_processing(task)
self._do_task_ack(task)
self._do_task_success(task)
self._do_task_failure(task)
else:
self._do_task_ack(task)
def _should_task_be_processed(self, task: "TypeGenericTask") -> bool:
return task.type not in self.do_not_process
def _do_task_processing(self, task: "TypeGenericTask") -> None:
processor_class = self.registry.processors[task.type].ProcessorClass
processor_instance = processor_class(self.log, self.router)
processor_instance.process(task)
def _do_task_ack(self, task: "TypeGenericTask") -> None:
self.router.ack(task)
def _do_task_success(self, task: "TypeGenericTask") -> None:
if task.ok:
for success_task in task.on_success:
success_task.result.cause = task.result
self.router.put(success_task)
def _do_task_failure(self, task: "TypeGenericTask") -> None:
if not task.ok:
for failure_task in task.on_failure:
failure_task.result.cause = task.result
self.router.put(failure_task)
if task.retry_after > 0:
self.log.debug(
"Failed task '%s' will be rescheduled in %s second(s).",
task,
task.retry_after,
extra={
"queue": self.routing_label.value,
"task_id": task.id,
"task_type": task.type,
},
)
self.failed_task_manifest.add(task)
[docs] def halt(self) -> None:
"""Stop the worker from processing further tasks."""
self._is_running = False