Source code for pi_portal.modules.tasks.schema

"""Schedulable task schemas."""

from dataclasses import dataclass
from typing import TYPE_CHECKING, Any, Protocol, Type, runtime_checkable

if TYPE_CHECKING:  # pragma: no cover
  from pi_portal.modules.tasks.workers.cron_jobs.bases.cron_job_base import (
      CronJobBase,
  )
  from .enums import RoutingLabel, TaskType
  from .processor.bases.processor_base import TaskProcessorBase
  from .task.bases.task_args_base import TaskArgsBase
  from .task.bases.task_base import TypeGenericTask


[docs]@dataclass class CronModule: """A typed representation of a task module.""" cron_job_class: "Type[CronJobBase[TaskArgsBase]]"
[docs]@runtime_checkable class ProcessorModule(Protocol): """A typed representation of a task processor module.""" ProcessorClass: "Type[TaskProcessorBase[TaskArgsBase, Any]]"
[docs]@dataclass class RegisteredCronJob: """A registered task.""" # pylint: disable=invalid-name CronJobClass: "Type[CronJobBase[TaskArgsBase]]"
[docs]@dataclass class RegisteredTask: # pylint: disable=invalid-name """A registered task.""" ApiEnabled: "bool" ArgClass: "Type[TaskArgsBase]" TaskClass: "Type[TypeGenericTask]"
[docs]@dataclass class RegisteredProcessor: """A typed representation of a registered task processor.""" # pylint: disable=invalid-name ProcessorClass: "Type[TaskProcessorBase[TaskArgsBase, Any]]"
[docs]@runtime_checkable class TaskModule(Protocol): """A typed representation of a task module.""" ApiEnabled: bool Args: "Type[TaskArgsBase]" ReturnType: Any TaskType: "TaskType" Task: "Type[TypeGenericTask]"