Source code for pi_portal.modules.tasks.processor.supervisor_process

"""Processes requests to manage a supervisor process."""
from typing import Callable, Dict

from pi_portal.modules.system.supervisor_config import ProcessStatus
from pi_portal.modules.system.supervisor_process import (
    SupervisorProcess,
    SupervisorProcessException,
)
from pi_portal.modules.tasks.enums import TaskType
from pi_portal.modules.tasks.processor.bases import processor_base
from pi_portal.modules.tasks.task import supervisor_process


[docs]class ProcessorClass( processor_base.TaskProcessorBase[ supervisor_process.Args, supervisor_process.ReturnType, ], ): """Processes requests to manage a supervisor process.""" __slots__ = () type = TaskType.SUPERVISOR_PROCESS def _process( self, task: processor_base.TaskBase[ supervisor_process.Args, supervisor_process.ReturnType, ], ) -> supervisor_process.ReturnType: managed_process = SupervisorProcess(task.args.process) self.log.info( "Supervisord process management request: '%s' -> '%s' ...", task.args.process.value, task.args.requested_state.value, extra={ "task_id": task.id, "task_type": task.type, }, ) method: Dict[ProcessStatus, Callable[[], None]] = { ProcessStatus.STOPPED: managed_process.stop, ProcessStatus.RUNNING: managed_process.start, } try: method[task.args.requested_state]() except SupervisorProcessException: self.log.info( "Supervisord process already in requested state: '%s' -> '%s' !", task.args.process.value, task.args.requested_state.value, extra={ "task_id": task.id, "task_type": task.type, }, )