"""GPIOMonitorBase class."""
import abc
import time
from typing import Generic, Sequence, TypeVar
from pi_portal.modules.integrations.gpio.components.bases import \
input as gpio_input
from pi_portal.modules.integrations.slack import client
from pi_portal.modules.mixins import write_log_file
from pi_portal.modules.python.rpi import RPi
TypeGenericGpio = TypeVar('TypeGenericGpio', bound=gpio_input.GPIOInputBase)
[docs]class GPIOMonitorBase(
abc.ABC, write_log_file.LogFileWriter, Generic[TypeGenericGpio]
):
"""GPIO input monitor class.
:param gpio_pins: A list of GPIO Inputs to monitor.
"""
gpio_poll_interval = 0.5
gpio_log_changes_only: bool
gpio_pins: Sequence[TypeGenericGpio]
logger_name: str
log_file_path: str
def __init__(self, gpio_pins: Sequence[TypeGenericGpio]) -> None:
self.configure_logger()
self.slack_client = client.SlackClient()
self.gpio_pins = gpio_pins
self.hook_setup_gpio()
[docs] def start(self) -> None:
"""Begin the monitoring loop."""
while self.is_running():
self.update_state()
self.check_state_for_changes()
time.sleep(self.gpio_poll_interval)
[docs] def is_running(self) -> bool:
"""Indicate if the polling loop is active. Override for testing."""
return True
[docs] def update_state(self) -> None:
"""Poll GPIO and update the known state for the doors."""
for gpio_pin in self.gpio_pins:
gpio_pin.poll()
[docs] def check_state_for_changes(self) -> None:
"""Log any detected differences in state."""
for gpio_pin in self.gpio_pins:
if not self.gpio_log_changes_only or gpio_pin.has_changed():
self.hook_log_state(gpio_pin)
[docs] def hook_setup_gpio(self) -> None:
"""Initialize the GPIO mode, specific to the RPi library."""
RPi.GPIO.setmode(RPi.GPIO.BCM)
[docs] @abc.abstractmethod
def hook_log_state(self, gpio_pin: TypeGenericGpio) -> None:
"""Override to log customized messages about a GPIO input state.
:param gpio_pin: A GPIO pin to log an event for.
"""