Source code for pi_portal.cli_commands.cli_machine.task_scheduler

"""Machine CLI command to start the task scheduler service."""

import uvicorn
from pi_portal import config
from pi_portal.cli_commands.bases import command
from pi_portal.cli_commands.mixins import state


[docs]class TaskSchedulerCommand( command.CommandBase, state.CommandManagedStateMixin, ): """CLI command to start the task scheduler service."""
[docs] def invoke(self) -> None: """Invoke the command.""" uvicorn.run( "pi_portal.modules.tasks:create_service", factory=True, uds=config.PI_PORTAL_TASK_MANAGER_SOCKET, reload=False, workers=1, limit_concurrency=config.PI_PORTAL_TASK_MANAGER_CONCURRENCY_LIMIT, )