Source code for pi_portal.modules.tasks.processor.chat_upload_video

"""Processes requests to upload a video to chat."""
import os

from pi_portal import config
from pi_portal.modules.tasks.enums import TaskType
from pi_portal.modules.tasks.processor.bases import processor_base
from pi_portal.modules.tasks.processor.mixins import chat_client
from pi_portal.modules.tasks.task import chat_upload_video, file_system_move


[docs]class ProcessorClass( chat_client.ChatClientMixin, processor_base.TaskProcessorBase[ chat_upload_video.Args, chat_upload_video.ReturnType, ], ): """Processes requests to upload a video to chat.""" __slots__ = () recovery_archival_suffix = "-RECOVERED" type = TaskType.CHAT_UPLOAD_VIDEO def _process( self, task: processor_base.TaskBase[ chat_upload_video.Args, chat_upload_video.ReturnType, ], ) -> chat_upload_video.ReturnType: archival_path = os.path.join( config.PATH_ARCHIVAL_QUEUE_VIDEO_UPLOAD, os.path.basename(task.args.path), ) exists_source = os.path.exists(task.args.path) exists_destination = os.path.exists(archival_path) if not exists_source: # Partial failure: has already been moved self.recover(task) return if exists_source and not exists_destination: # pylint: disable=duplicate-code self.log.debug( "Uploading: '%s' -> 'CHAT' ...", task.args.path, extra={ "task_id": task.id, "task_type": task.type }, ) self.client.send_file(task.args.path, task.args.description) if exists_destination: archival_path = self._recovery_archival_path(archival_path) self.recover(task) next_args = file_system_move.Args( source=task.args.path, destination=archival_path, ) task.on_success.append(file_system_move.Task(args=next_args)) def _recovery_archival_path(self, path: str) -> str: name, ext = os.path.splitext(os.path.basename(path)) recovery_archival_path = os.path.join( config.PATH_ARCHIVAL_QUEUE_VIDEO_UPLOAD, name + self.recovery_archival_suffix + ext ) return recovery_archival_path