diff --git a/src/espmega_lightshow/drivers.py b/src/espmega_lightshow/drivers.py new file mode 100644 index 0000000..30cf553 --- /dev/null +++ b/src/espmega_lightshow/drivers.py @@ -0,0 +1,249 @@ +from abc import ABC +from espmega.espmega_r3 import ESPMega_standalone as ESPMega +import json +from typing import Optional +# This is the base class for all physical light drivers + + +class LightDriver(ABC): + conntected: bool = False + state: bool = False + color: tuple = (0, 0, 0) + brightness: int = 0 + + def __init__(self, **kwargs): + # The init function should take in any parameters needed to initialize the driver + # This function should not raise any exceptions if the driver is not able to be initialized + # Instead, it should set the driver to a state where it is not able to be controlled + pass + + def set_light_state(self, state: bool) -> None: + # This function should set the light to the given state + pass + + def get_light_state(self) -> int: + # This function should return the current state of the light + # Returns 0 if the light is off, 1 if the light is on + # Return 2 if the light is on but is not able to be controlled + # Return 3 if the light is off but is not able to be controlled + pass + + def is_connected(self) -> bool: + # This function should return whether the driver is connected to the light + return self.conntected + + def get_exception(self) -> Optional[str]: + # This function should return the exception that caused the driver to be disconnected + if self.conntected: + return None + return self.exception + + @staticmethod + def get_driver_properties() -> dict: + # Standard properties: + # name: The name of the driver + # support_brightness: Whether the driver supports brightness control + # support_color: Whether the driver supports color control + pass + + def set_brightness(self, brightness: float) -> None: + # This function should set the brightness of the light + # brightness is a float between 0 and 4095 + pass + + def get_brightness(self) -> float: + # This function should return the current brightness of the light + # brightness is a float between 0 and 4095 + pass + + def set_color(self, color: tuple) -> None: + # This function should set the color of the light + # color is a tuple of 3 integers between 0 and 4095 + pass + + def get_color(self) -> tuple: + # This function should return the current color of the light + # color is a tuple of 3 integers between 0 and 4095 + pass + + +class ESPMegaLightDriver(LightDriver): + rapid_mode: bool = False + + def __init__(self, controller: ESPMega, pwm_channel: int) -> int: + self.controller = controller + self.pwm_channel = pwm_channel + if controller is None: + self.conntected = False + self.exception = "Controller is not connected." + self.conntected = True + + def set_light_state(self, state: bool) -> None: + if not self.conntected: + self.state = state + else: + self.controller.digital_write(self.pwm_channel, state) + + def get_light_state(self) -> bool: + if self.conntected: + self.state = self.controller.get_pwm_state(self.pwm_channel) + return self.state + 2 * (not self.conntected) + + @staticmethod + def get_driver_properties() -> dict: + return { + "name": "ESPMega", + "support_brightness": False, + "support_color": False + } + + +class ESPMegaStandaloneLightDriver(ESPMegaLightDriver): + def __init__(self, base_topic: str,pwm_channel: int, light_server: str, light_server_port: int, rapid_mode: bool = False) -> dict: + self.base_topic = base_topic + self.light_server = light_server + self.light_server_port = light_server_port + self.pwm_channel = pwm_channel + self.rapid_mode = rapid_mode + self.state = False + self.connected = False + try: + self.controller = ESPMega( + base_topic, light_server, light_server_port) + if rapid_mode: + self.controller.set_rapid_mode() + print("Connected to controller.") + self.connected = True + except Exception as e: + print(e) + self.controller = None + self.exception = e + self.connected = False + def close(self): + if self.conntected and self.rapid_mode: + self.controller.disable_rapid_response_mode() + @staticmethod + def get_driver_properties() -> dict: + return { + "name": "ESPMega Standalone", + "support_brightness": False, + "support_color": False + } + + +class ESPMegaLightGrid: + def __init__(self, light_server: str, light_server_port: int, rows: int = 0, columns: int = 0, rapid_mode: bool = False, design_mode: bool = False): + self.rows = rows + self.columns = columns + self.lights: list = [None] * rows * columns + self.drivers = {} + self.light_server = light_server + self.light_server_port = light_server_port + self.design_mode = design_mode + + def assign_physical_light(self, row: int, column: int, physical_light: Optional[LightDriver]): + self.lights[row * self.columns + column] = physical_light + + def mark_light_disappeared(self, row: int, column: int): + self.lights[row * self.columns + column] = None + + def get_physical_light(self, row, column): + return self.lights[row * self.columns + column] + + def set_light_state(self, row: int, column: int, state: bool) -> None: + physical_light = self.get_physical_light(row, column) + if not self.design_mode: + physical_light.set_light_state(state) + + def get_light_state(self, row: int, column: int): + physical_light = self.get_physical_light(row, column) + return physical_light.get_light_state() + + def read_light_map(self, light_map: list) -> list: + self.initialize_light_map(light_map) + for row_index, row in enumerate(light_map): + for column_index, light in enumerate(row): + self.__assign_light(row_index, column_index, light) + return [self.connected_drivers, self.failed_drivers] + + def initialize_light_map(self, light_map): + self.light_map = light_map + self.rows = len(light_map) + self.columns = len(light_map[0]) + self.lights = [None] * self.rows * self.columns + self.controllers = {} # Dictionary to store existing controllers + self.failed_controllers = {} # Dictionary to store failed controllers + self.connected_controllers = {} # Dictionary to store connected controllers + + def _assign_light(self, row_index, column_index, light): + if self.design_mode: + self.connected_controllers[light["base_topic"]] = None + self.assign_physical_light(row_index, column_index, None) + return + if light is None: + self.assign_physical_light(row_index, column_index, None) + else: + self._assign_light_with_driver(row_index, column_index, light) + + def _assign_light_with_driver(self, row_index, column_index, light): + base_topic = light["base_topic"] + pwm_id = light["pwm_id"] + if base_topic not in self.drivers: + self._create_new_driver(base_topic) + else: + controller = self.drivers[base_topic].controller + driver = ESPMegaLightDriver(controller, pwm_id) + self.assign_physical_light(row_index, column_index, driver) + + def _create_new_driver(self, base_topic): + if not self.design_mode: + driver = ESPMegaStandaloneLightDriver(base_topic, self.light_server, self.light_server_port) + if driver.is_connected(): + self.drivers[base_topic] = driver + else: + self.failed_drivers[base_topic] = driver.get_exception() + + def read_light_map_from_file(self, filename: str): + try: + with open(filename, "r") as file: + light_map = json.load(file) + + ESPMegaLightGrid._validate_light_map(light_map) + ESPMegaLightGrid.read_light_map(light_map) + + except FileNotFoundError: + raise FileNotFoundError("The light map file does not exist.") + + @staticmethod + def _validate_light_map(light_map): + if len(light_map) == 0: + raise ValueError("Light map cannot be empty.") + + if len(light_map[0]) == 0: + raise ValueError("Light map cannot be empty.") + + for row in light_map: + ESPMegaLightGrid._validate_row(row, light_map[0]) + + @staticmethod + def _validate_row(row, reference_row): + if len(row) != len(reference_row): + raise ValueError("All rows in the light map must have the same length.") + + for column in row: + ESPMegaLightGrid._validate_column(column) + + @staticmethod + def _validate_column(column): + if column is not None: + if "base_topic" not in column: + raise ValueError("The base_topic field is missing from a light.") + + if "pwm_id" not in column: + raise ValueError("The pwm_id field is missing from a light.") + + if not isinstance(column["base_topic"], str): + raise ValueError("The base_topic field must be a string.") + + if not isinstance(column["pwm_id"], int): + raise ValueError("The pwm_id field must be an integer.") diff --git a/test_driver.py b/test_driver.py new file mode 100644 index 0000000..5b14404 --- /dev/null +++ b/test_driver.py @@ -0,0 +1,23 @@ +from src.espmega_lightshow.drivers import ESPMegaLightDriver, ESPMegaStandaloneLightDriver, ESPMegaLightGrid +from time import sleep + +# Define and instantiate the driver +driver = ESPMegaStandaloneLightDriver("/espmega/ProR3", 0, "192.168.0.26", 1883) +driver.set_light_state(True) + +# Define and instantiate a slave driver +slave_driver = ESPMegaLightDriver(driver.controller, 1) +slave_driver.set_light_state(True) + +# Define and instantiate a light grid +light_grid = ESPMegaLightGrid("192.168.0.26",1883, 2, 2, False, False) +light_grid.assign_physical_light(0, 0, driver) +light_grid.assign_physical_light(0, 1, slave_driver) + +while True: + light_grid.set_light_state(0, 0, True) + light_grid.set_light_state(0, 1, False) + sleep(1) + light_grid.set_light_state(0, 0, False) + light_grid.set_light_state(0, 1, True) + sleep(1) \ No newline at end of file