From 848c87e5bc7349448cc2c80ab02129a65a5afc12 Mon Sep 17 00:00:00 2001 From: Siwat Sirichai Date: Tue, 12 Dec 2023 13:21:58 +0700 Subject: [PATCH 1/4] driver class --- src/espmega_lightshow/drivers.py | 200 +++++++++++++++++++++++++++++++ test_driver.py | 23 ++++ 2 files changed, 223 insertions(+) create mode 100644 src/espmega_lightshow/drivers.py create mode 100644 test_driver.py diff --git a/src/espmega_lightshow/drivers.py b/src/espmega_lightshow/drivers.py new file mode 100644 index 0000000..ee84b5c --- /dev/null +++ b/src/espmega_lightshow/drivers.py @@ -0,0 +1,200 @@ +from abc import ABC +from espmega.espmega_r3 import ESPMega_standalone as ESPMega +import json +# This is the base class for all physical light drivers + + +class LightDriver(ABC): + # The init function should take in any parameters needed to initialize the driver + # This function should not raise any exceptions if the driver is not able to be initialized + # Instead, it should set the driver to a state where it is not able to be controlled + conntected: bool = False + state: bool = False + color: tuple = (0, 0, 0) + brightness: int = 0 + + def __init__(self, **kwargs): + pass + + def set_light_state(self, state: bool) -> None: + pass + + def get_light_state(self) -> int: + # Returns 0 if the light is off, 1 if the light is on + # Return 2 if the light is on but is not able to be controlled + # Return 3 if the light is off but is not able to be controlled + pass + + def is_connected(self) -> bool: + return self.conntected + + def get_exception(self) -> str: + if self.conntected: + return None + return self.exception + + @staticmethod + def get_driver_properties() -> dict: + # Standard properties: + # name: The name of the driver + # support_brightness: Whether the driver supports brightness control + # support_color: Whether the driver supports color control + pass + + def set_brightness(self, brightness: float) -> None: + pass + + def get_brightness(self) -> float: + pass + + def set_color(self, color: tuple) -> None: + pass + + def get_color(self) -> tuple: + pass + + +class ESPMegaLightDriver(LightDriver): + rapid_mode: bool = False + + def __init__(self, controller: ESPMega, pwm_channel: int) -> int: + self.controller = controller + self.pwm_channel = pwm_channel + if controller is None: + self.conntected = False + self.exception = "Controller is not connected." + self.conntected = True + + def set_light_state(self, state: bool) -> None: + if not self.conntected: + self.state = state + else: + self.controller.digital_write(self.pwm_channel, state) + + def get_light_state(self) -> bool: + if self.conntected: + self.state = self.controller.get_pwm_state(self.pwm_channel) + return self.state + 2 * (not self.conntected) + + @staticmethod + def get_driver_properties() -> dict: + return { + "name": "ESPMega", + "support_brightness": False, + "support_color": False + } + + +class ESPMegaStandaloneLightDriver(ESPMegaLightDriver): + def __init__(self, base_topic: str,pwm_channel: int, light_server: str, light_server_port: int) -> dict: + self.base_topic = base_topic + self.light_server = light_server + self.light_server_port = light_server_port + self.pwm_channel = pwm_channel + self.state = False + try: + self.controller = ESPMega( + base_topic, light_server, light_server_port) + except Exception as e: + self.controller = None + self.conntected = False + self.exception = e + self.conntected = True + + @staticmethod + def get_driver_properties() -> dict: + return { + "name": "ESPMega Standalone", + "support_brightness": False, + "support_color": False + } + + +class ESPMegaLightGrid: + def __init__(self, light_server: str, light_server_port: int, rows: int = 0, columns: int = 0, rapid_mode: bool = False, design_mode: bool = False): + self.rows = rows + self.columns = columns + self.lights: list = [None] * rows * columns + self.drivers = {} + self.light_server = light_server + self.light_server_port = light_server_port + self.design_mode = design_mode + + def assign_physical_light(self, row: int, column: int, physical_light: LightDriver): + self.lights[row * self.columns + column] = physical_light + + def get_physical_light(self, row, column): + return self.lights[row * self.columns + column] + + def set_light_state(self, row: int, column: int, state: bool) -> None: + physical_light = self.get_physical_light(row, column) + if not self.design_mode: + physical_light.set_light_state(state) + + def get_light_state(self, row: int, column: int): + physical_light = self.get_physical_light(row, column) + return physical_light.get_light_state() + + def read_light_map(self, light_map: list) -> list: + self.light_map = light_map + self.rows = len(light_map) + self.columns = len(light_map[0]) + self.lights = [None] * self.rows * self.columns + self.controllers = {} # Dictionary to store existing controllers + self.failed_controllers = {} # Dictionary to store failed controllers + self.connected_controllers = {} # Dictionary to store connected controllers + for row_index, row in enumerate(light_map): + for column_index, light in enumerate(row): + if self.design_mode: + self.connected_controllers[light["base_topic"]] = None + self.assign_physical_light(row_index, column_index, None) + continue + if light is None: + self.assign_physical_light(row_index, column_index, None) + else: + base_topic = light["base_topic"] + pwm_id = light["pwm_id"] + # Create a mapping of base_topic to controller + if base_topic not in self.drivers: + if not self.design_mode: + driver = ESPMegaStandaloneLightDriver(base_topic, self.light_server, self.light_server_port) + if driver.is_connected(): + self.drivers[base_topic] = driver + else: + self.failed_drivers[base_topic] = driver.get_exception() + else: + controller = self.drivers[base_topic].controller + driver = ESPMegaLightDriver(controller, pwm_id) + self.assign_physical_light(row_index, column_index, driver) + # Return a list of connected drivers list and failed drivers list + return [self.connected_drivers, self.failed_drivers] + def read_light_map_from_file(self, filename: str): + try: + with open(filename, "r") as file: + light_map = json.load(file) + # Check if the light map is valid + if len(light_map) == 0: + raise Exception("Light map cannot be empty.") + if len(light_map[0]) == 0: + raise Exception("Light map cannot be empty.") + for row in light_map: + if len(row) != len(light_map[0]): + raise Exception( + "All rows in the light map must have the same length.") + for column in row: + if column != None: + if "base_topic" not in column: + raise Exception( + "The base_topic field is missing from a light.") + if "pwm_id" not in column: + raise Exception( + "The pwm_id field is missing from a light.") + if type(column["base_topic"]) != str: + raise Exception( + "The base_topic field must be a string.") + if type(column["pwm_id"]) != int: + raise Exception( + "The pwm_id field must be an integer.") + self.read_light_map(light_map) + except FileNotFoundError: + raise Exception("The light map file does not exist.") \ No newline at end of file diff --git a/test_driver.py b/test_driver.py new file mode 100644 index 0000000..5b14404 --- /dev/null +++ b/test_driver.py @@ -0,0 +1,23 @@ +from src.espmega_lightshow.drivers import ESPMegaLightDriver, ESPMegaStandaloneLightDriver, ESPMegaLightGrid +from time import sleep + +# Define and instantiate the driver +driver = ESPMegaStandaloneLightDriver("/espmega/ProR3", 0, "192.168.0.26", 1883) +driver.set_light_state(True) + +# Define and instantiate a slave driver +slave_driver = ESPMegaLightDriver(driver.controller, 1) +slave_driver.set_light_state(True) + +# Define and instantiate a light grid +light_grid = ESPMegaLightGrid("192.168.0.26",1883, 2, 2, False, False) +light_grid.assign_physical_light(0, 0, driver) +light_grid.assign_physical_light(0, 1, slave_driver) + +while True: + light_grid.set_light_state(0, 0, True) + light_grid.set_light_state(0, 1, False) + sleep(1) + light_grid.set_light_state(0, 0, False) + light_grid.set_light_state(0, 1, True) + sleep(1) \ No newline at end of file From 15a0606910a18dfd521c085c10050a8aaf27fc1a Mon Sep 17 00:00:00 2001 From: Siwat Sirichai Date: Tue, 12 Dec 2023 13:33:54 +0700 Subject: [PATCH 2/4] rapid driver --- src/espmega_lightshow/drivers.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/src/espmega_lightshow/drivers.py b/src/espmega_lightshow/drivers.py index ee84b5c..db66e25 100644 --- a/src/espmega_lightshow/drivers.py +++ b/src/espmega_lightshow/drivers.py @@ -86,21 +86,27 @@ def get_driver_properties() -> dict: class ESPMegaStandaloneLightDriver(ESPMegaLightDriver): - def __init__(self, base_topic: str,pwm_channel: int, light_server: str, light_server_port: int) -> dict: + def __init__(self, base_topic: str,pwm_channel: int, light_server: str, light_server_port: int, rapid_mode: bool = False) -> dict: self.base_topic = base_topic self.light_server = light_server self.light_server_port = light_server_port self.pwm_channel = pwm_channel + self.rapid_mode = rapid_mode self.state = False try: self.controller = ESPMega( base_topic, light_server, light_server_port) + if rapid_mode: + self.controller.set_rapid_mode() except Exception as e: + print(e) self.controller = None self.conntected = False self.exception = e self.conntected = True - + def close(self): + if self.conntected and rapid_mode: + self.controller.disable_rapid_response_mode() @staticmethod def get_driver_properties() -> dict: return { From fc5536dd2233329b4566804fa38220df4877e9e1 Mon Sep 17 00:00:00 2001 From: Siwat Sirichai Date: Tue, 12 Dec 2023 13:39:47 +0700 Subject: [PATCH 3/4] Update drivers.py --- src/espmega_lightshow/drivers.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/src/espmega_lightshow/drivers.py b/src/espmega_lightshow/drivers.py index db66e25..8501bef 100644 --- a/src/espmega_lightshow/drivers.py +++ b/src/espmega_lightshow/drivers.py @@ -93,19 +93,21 @@ def __init__(self, base_topic: str,pwm_channel: int, light_server: str, light_se self.pwm_channel = pwm_channel self.rapid_mode = rapid_mode self.state = False + self.connected = False try: self.controller = ESPMega( base_topic, light_server, light_server_port) if rapid_mode: self.controller.set_rapid_mode() + print("Connected to controller.") + self.connected = True except Exception as e: print(e) self.controller = None - self.conntected = False self.exception = e - self.conntected = True + self.connected = False def close(self): - if self.conntected and rapid_mode: + if self.conntected and self.rapid_mode: self.controller.disable_rapid_response_mode() @staticmethod def get_driver_properties() -> dict: From a0f7ca89eb07eb77ee70d2838e5d6993f063030f Mon Sep 17 00:00:00 2001 From: Siwat Sirichai Date: Wed, 13 Dec 2023 10:54:51 +0700 Subject: [PATCH 4/4] Update drivers.py --- src/espmega_lightshow/drivers.py | 151 ++++++++++++++++++++----------- 1 file changed, 96 insertions(+), 55 deletions(-) diff --git a/src/espmega_lightshow/drivers.py b/src/espmega_lightshow/drivers.py index 8501bef..30cf553 100644 --- a/src/espmega_lightshow/drivers.py +++ b/src/espmega_lightshow/drivers.py @@ -1,34 +1,39 @@ from abc import ABC from espmega.espmega_r3 import ESPMega_standalone as ESPMega import json +from typing import Optional # This is the base class for all physical light drivers class LightDriver(ABC): - # The init function should take in any parameters needed to initialize the driver - # This function should not raise any exceptions if the driver is not able to be initialized - # Instead, it should set the driver to a state where it is not able to be controlled conntected: bool = False state: bool = False color: tuple = (0, 0, 0) brightness: int = 0 def __init__(self, **kwargs): + # The init function should take in any parameters needed to initialize the driver + # This function should not raise any exceptions if the driver is not able to be initialized + # Instead, it should set the driver to a state where it is not able to be controlled pass def set_light_state(self, state: bool) -> None: + # This function should set the light to the given state pass def get_light_state(self) -> int: + # This function should return the current state of the light # Returns 0 if the light is off, 1 if the light is on # Return 2 if the light is on but is not able to be controlled # Return 3 if the light is off but is not able to be controlled pass def is_connected(self) -> bool: + # This function should return whether the driver is connected to the light return self.conntected - def get_exception(self) -> str: + def get_exception(self) -> Optional[str]: + # This function should return the exception that caused the driver to be disconnected if self.conntected: return None return self.exception @@ -42,15 +47,23 @@ def get_driver_properties() -> dict: pass def set_brightness(self, brightness: float) -> None: + # This function should set the brightness of the light + # brightness is a float between 0 and 4095 pass def get_brightness(self) -> float: + # This function should return the current brightness of the light + # brightness is a float between 0 and 4095 pass def set_color(self, color: tuple) -> None: + # This function should set the color of the light + # color is a tuple of 3 integers between 0 and 4095 pass def get_color(self) -> tuple: + # This function should return the current color of the light + # color is a tuple of 3 integers between 0 and 4095 pass @@ -128,9 +141,12 @@ def __init__(self, light_server: str, light_server_port: int, rows: int = 0, col self.light_server_port = light_server_port self.design_mode = design_mode - def assign_physical_light(self, row: int, column: int, physical_light: LightDriver): + def assign_physical_light(self, row: int, column: int, physical_light: Optional[LightDriver]): self.lights[row * self.columns + column] = physical_light + def mark_light_disappeared(self, row: int, column: int): + self.lights[row * self.columns + column] = None + def get_physical_light(self, row, column): return self.lights[row * self.columns + column] @@ -144,6 +160,13 @@ def get_light_state(self, row: int, column: int): return physical_light.get_light_state() def read_light_map(self, light_map: list) -> list: + self.initialize_light_map(light_map) + for row_index, row in enumerate(light_map): + for column_index, light in enumerate(row): + self.__assign_light(row_index, column_index, light) + return [self.connected_drivers, self.failed_drivers] + + def initialize_light_map(self, light_map): self.light_map = light_map self.rows = len(light_map) self.columns = len(light_map[0]) @@ -151,58 +174,76 @@ def read_light_map(self, light_map: list) -> list: self.controllers = {} # Dictionary to store existing controllers self.failed_controllers = {} # Dictionary to store failed controllers self.connected_controllers = {} # Dictionary to store connected controllers - for row_index, row in enumerate(light_map): - for column_index, light in enumerate(row): - if self.design_mode: - self.connected_controllers[light["base_topic"]] = None - self.assign_physical_light(row_index, column_index, None) - continue - if light is None: - self.assign_physical_light(row_index, column_index, None) - else: - base_topic = light["base_topic"] - pwm_id = light["pwm_id"] - # Create a mapping of base_topic to controller - if base_topic not in self.drivers: - if not self.design_mode: - driver = ESPMegaStandaloneLightDriver(base_topic, self.light_server, self.light_server_port) - if driver.is_connected(): - self.drivers[base_topic] = driver - else: - self.failed_drivers[base_topic] = driver.get_exception() - else: - controller = self.drivers[base_topic].controller - driver = ESPMegaLightDriver(controller, pwm_id) - self.assign_physical_light(row_index, column_index, driver) - # Return a list of connected drivers list and failed drivers list - return [self.connected_drivers, self.failed_drivers] + + def _assign_light(self, row_index, column_index, light): + if self.design_mode: + self.connected_controllers[light["base_topic"]] = None + self.assign_physical_light(row_index, column_index, None) + return + if light is None: + self.assign_physical_light(row_index, column_index, None) + else: + self._assign_light_with_driver(row_index, column_index, light) + + def _assign_light_with_driver(self, row_index, column_index, light): + base_topic = light["base_topic"] + pwm_id = light["pwm_id"] + if base_topic not in self.drivers: + self._create_new_driver(base_topic) + else: + controller = self.drivers[base_topic].controller + driver = ESPMegaLightDriver(controller, pwm_id) + self.assign_physical_light(row_index, column_index, driver) + + def _create_new_driver(self, base_topic): + if not self.design_mode: + driver = ESPMegaStandaloneLightDriver(base_topic, self.light_server, self.light_server_port) + if driver.is_connected(): + self.drivers[base_topic] = driver + else: + self.failed_drivers[base_topic] = driver.get_exception() + def read_light_map_from_file(self, filename: str): try: with open(filename, "r") as file: light_map = json.load(file) - # Check if the light map is valid - if len(light_map) == 0: - raise Exception("Light map cannot be empty.") - if len(light_map[0]) == 0: - raise Exception("Light map cannot be empty.") - for row in light_map: - if len(row) != len(light_map[0]): - raise Exception( - "All rows in the light map must have the same length.") - for column in row: - if column != None: - if "base_topic" not in column: - raise Exception( - "The base_topic field is missing from a light.") - if "pwm_id" not in column: - raise Exception( - "The pwm_id field is missing from a light.") - if type(column["base_topic"]) != str: - raise Exception( - "The base_topic field must be a string.") - if type(column["pwm_id"]) != int: - raise Exception( - "The pwm_id field must be an integer.") - self.read_light_map(light_map) + + ESPMegaLightGrid._validate_light_map(light_map) + ESPMegaLightGrid.read_light_map(light_map) + except FileNotFoundError: - raise Exception("The light map file does not exist.") \ No newline at end of file + raise FileNotFoundError("The light map file does not exist.") + + @staticmethod + def _validate_light_map(light_map): + if len(light_map) == 0: + raise ValueError("Light map cannot be empty.") + + if len(light_map[0]) == 0: + raise ValueError("Light map cannot be empty.") + + for row in light_map: + ESPMegaLightGrid._validate_row(row, light_map[0]) + + @staticmethod + def _validate_row(row, reference_row): + if len(row) != len(reference_row): + raise ValueError("All rows in the light map must have the same length.") + + for column in row: + ESPMegaLightGrid._validate_column(column) + + @staticmethod + def _validate_column(column): + if column is not None: + if "base_topic" not in column: + raise ValueError("The base_topic field is missing from a light.") + + if "pwm_id" not in column: + raise ValueError("The pwm_id field is missing from a light.") + + if not isinstance(column["base_topic"], str): + raise ValueError("The base_topic field must be a string.") + + if not isinstance(column["pwm_id"], int): + raise ValueError("The pwm_id field must be an integer.")