Source code for pi_portal.modules.tasks.service_client

"""Client for the task scheduler service."""

import os.path
from datetime import datetime
from typing import Tuple

from pi_portal import config
from pi_portal.modules.system.socket.unix_stream_http_client import (
    UnixStreamHttpClient,
    UnixStreamHttpResponse,
)
from pi_portal.modules.tasks.config import DEFERRED_MESSAGE_PREFIX
from pi_portal.modules.tasks.enums import TaskType


[docs]class TaskSchedulerServiceClient: """Client for the task scheduler service.""" http_client: UnixStreamHttpClient camera_snapshot_failure_message = ( "An error occurred while requesting a snapshot!" )
[docs] def __init__(self) -> None: self.http_client = UnixStreamHttpClient( config.PI_PORTAL_TASK_MANAGER_SOCKET )
[docs] def camera_snapshot( self, camera: int, ) -> UnixStreamHttpResponse: """Schedule a camera snapshot from the specified camera. :param camera: The camera identifier to request the snapshot from. :returns: A response from the task scheduler API. """ payload = { "type": TaskType.CAMERA_SNAPSHOT.value, "args": { "camera": camera, }, "on_failure": [ { "type": TaskType.CHAT_SEND_MESSAGE.value, "args": { "message": self.camera_snapshot_failure_message, }, "retry_after": 300, } ] } return self.http_client.post("/schedule/", payload)
[docs] def chat_send_message( self, message: str, ) -> UnixStreamHttpResponse: """Send a message to the chat client via the API. :param message: The message you wish to send. :returns: A response from the task scheduler API. """ payload = { "type": TaskType.CHAT_SEND_MESSAGE.value, "args": { "message": message, }, "on_failure": [ { "type": TaskType.CHAT_SEND_MESSAGE.value, "args": { "message": DEFERRED_MESSAGE_PREFIX + message, }, "retry_after": 300, } ] } return self.http_client.post("/schedule/", payload)
[docs] def chat_send_temperature_reading(self,) -> UnixStreamHttpResponse: """Send the latest temperature reading to the chat client via the API. :returns: A response from the task scheduler API. """ header = "Latest temperature readings:" payload = { "type": TaskType.CHAT_SEND_TEMPERATURE_READING.value, "args": { "header": header }, "on_failure": [ { "type": TaskType.CHAT_SEND_TEMPERATURE_READING.value, "args": { "header": DEFERRED_MESSAGE_PREFIX + header, }, "retry_after": 300, } ] } return self.http_client.post("/schedule/", payload)
[docs] def chat_upload_snapshot( self, path: str, ) -> UnixStreamHttpResponse: """Upload camera snapshots to the chat client via the API. :param path: The path to the motion snapshot to upload. :returns: A response from the task scheduler API. """ camera, video_time = self._parse_motion_file_path(path) description = f"Camera: {camera}, Time: {video_time}" payload = { "type": TaskType.CHAT_UPLOAD_SNAPSHOT.value, "args": { "description": description, "path": path }, "on_failure": [ { "type": TaskType.CHAT_UPLOAD_SNAPSHOT.value, "args": { "description": DEFERRED_MESSAGE_PREFIX + description, "path": path }, "retry_after": 300, } ] } return self.http_client.post("/schedule/", payload)
[docs] def chat_upload_video( self, path: str, ) -> UnixStreamHttpResponse: """Upload camera videos to the chat client via the API. :param path: The path to the motion video to upload. :returns: A response from the task scheduler API. """ camera, video_time = self._parse_motion_file_path(path) description = f"Motion detected! Camera: {camera}, Time: {video_time}" payload = { "type": TaskType.CHAT_UPLOAD_VIDEO.value, "args": { "description": description, "path": path, }, "on_failure": [ { "type": TaskType.CHAT_UPLOAD_VIDEO.value, "args": { "description": DEFERRED_MESSAGE_PREFIX + description, "path": path, }, "retry_after": 300, } ] } return self.http_client.post("/schedule/", payload)
def _parse_motion_file_path(self, path: str) -> Tuple[str, str]: camera = "Unknown" parsed_datetime = "Unknown" raw_filename = os.path.basename(path).split(".")[0] try: camera, raw_datetime = raw_filename.split("-")[0:2] parsed_datetime = ( datetime.strptime(raw_datetime, "%Y%m%d%H%M%S").isoformat() ) except (TypeError, ValueError): pass return camera, parsed_datetime
[docs] def file_system_copy( self, source: str, destination: str, ) -> UnixStreamHttpResponse: """Schedule copying a file via the API. :param source: The path to the source file. :param destination: The path to the destination file. :returns: A response from the task scheduler API. """ payload = { "type": TaskType.FILE_SYSTEM_COPY.value, "args": { "destination": destination, "source": source }, } return self.http_client.post("/schedule/", payload)
[docs] def set_flag( self, flag_name: str, value: bool, ) -> UnixStreamHttpResponse: """Schedule copying a file via the API. :param flag_name: The flag name to set. :param value: The value to assign to this flag. :returns: A response from the task scheduler API. """ payload = { "type": TaskType.FLAG_SET_VALUE.value, "args": { "flag_name": flag_name, "value": value }, } return self.http_client.post("/schedule/", payload)