Source code for pi_portal.modules.tasks.task.metaclasses.meta_task
"""A task to archive a folder of log files."""
from typing import Any, Dict, Optional, Tuple, Type, cast
from pi_portal.modules.tasks.config import ROUTING_MATRIX
from pi_portal.modules.tasks.enums import RoutingLabel, TaskType
from pi_portal.modules.tasks.task.bases.task_args_base import TaskArgsBase
[docs]class MetaTask(type):
"""Typed task classes.
:params task_type: The task type for this task class.
"""
__slots__ = ()
def __new__(
mcs: Type[type],
name: str,
bases: Tuple[type, ...],
class_dict: Dict[str, Any],
task_type: TaskType,
) -> "MetaTask":
new_cls = super(
).__new__( # type: ignore[misc]
mcs,
name,
bases,
class_dict,
)
new_cls.type = task_type
return cast(
MetaTask,
new_cls,
)
def __call__(
cls: "MetaTask",
args: "TaskArgsBase",
retry_after: int = 0,
routing_label: "Optional[RoutingLabel]" = None
) -> "MetaTask":
if routing_label is None:
routing_label = ROUTING_MATRIX[getattr(cls, "type")]
return cast(
MetaTask,
type.__call__(cls, args, retry_after, routing_label),
)