Skip to content

Commit

Permalink
Update drivers.py
Browse files Browse the repository at this point in the history
  • Loading branch information
SiwatS committed Dec 13, 2023
1 parent fc5536d commit a0f7ca8
Showing 1 changed file with 96 additions and 55 deletions.
151 changes: 96 additions & 55 deletions src/espmega_lightshow/drivers.py
Original file line number Diff line number Diff line change
@@ -1,34 +1,39 @@
from abc import ABC
from espmega.espmega_r3 import ESPMega_standalone as ESPMega
import json
from typing import Optional
# This is the base class for all physical light drivers


class LightDriver(ABC):
# The init function should take in any parameters needed to initialize the driver
# This function should not raise any exceptions if the driver is not able to be initialized
# Instead, it should set the driver to a state where it is not able to be controlled
conntected: bool = False
state: bool = False
color: tuple = (0, 0, 0)
brightness: int = 0

def __init__(self, **kwargs):
# The init function should take in any parameters needed to initialize the driver
# This function should not raise any exceptions if the driver is not able to be initialized
# Instead, it should set the driver to a state where it is not able to be controlled
pass

def set_light_state(self, state: bool) -> None:
# This function should set the light to the given state
pass

def get_light_state(self) -> int:
# This function should return the current state of the light
# Returns 0 if the light is off, 1 if the light is on
# Return 2 if the light is on but is not able to be controlled
# Return 3 if the light is off but is not able to be controlled
pass

def is_connected(self) -> bool:
# This function should return whether the driver is connected to the light
return self.conntected

def get_exception(self) -> str:
def get_exception(self) -> Optional[str]:
# This function should return the exception that caused the driver to be disconnected
if self.conntected:
return None
return self.exception
Expand All @@ -42,15 +47,23 @@ def get_driver_properties() -> dict:
pass

def set_brightness(self, brightness: float) -> None:
# This function should set the brightness of the light
# brightness is a float between 0 and 4095
pass

def get_brightness(self) -> float:
# This function should return the current brightness of the light
# brightness is a float between 0 and 4095
pass

def set_color(self, color: tuple) -> None:
# This function should set the color of the light
# color is a tuple of 3 integers between 0 and 4095
pass

def get_color(self) -> tuple:
# This function should return the current color of the light
# color is a tuple of 3 integers between 0 and 4095
pass


Expand Down Expand Up @@ -128,9 +141,12 @@ def __init__(self, light_server: str, light_server_port: int, rows: int = 0, col
self.light_server_port = light_server_port
self.design_mode = design_mode

def assign_physical_light(self, row: int, column: int, physical_light: LightDriver):
def assign_physical_light(self, row: int, column: int, physical_light: Optional[LightDriver]):
self.lights[row * self.columns + column] = physical_light

def mark_light_disappeared(self, row: int, column: int):
self.lights[row * self.columns + column] = None

def get_physical_light(self, row, column):
return self.lights[row * self.columns + column]

Expand All @@ -144,65 +160,90 @@ def get_light_state(self, row: int, column: int):
return physical_light.get_light_state()

def read_light_map(self, light_map: list) -> list:
self.initialize_light_map(light_map)
for row_index, row in enumerate(light_map):
for column_index, light in enumerate(row):
self.__assign_light(row_index, column_index, light)
return [self.connected_drivers, self.failed_drivers]

def initialize_light_map(self, light_map):
self.light_map = light_map
self.rows = len(light_map)
self.columns = len(light_map[0])
self.lights = [None] * self.rows * self.columns
self.controllers = {} # Dictionary to store existing controllers
self.failed_controllers = {} # Dictionary to store failed controllers
self.connected_controllers = {} # Dictionary to store connected controllers
for row_index, row in enumerate(light_map):
for column_index, light in enumerate(row):
if self.design_mode:
self.connected_controllers[light["base_topic"]] = None
self.assign_physical_light(row_index, column_index, None)
continue
if light is None:
self.assign_physical_light(row_index, column_index, None)
else:
base_topic = light["base_topic"]
pwm_id = light["pwm_id"]
# Create a mapping of base_topic to controller
if base_topic not in self.drivers:
if not self.design_mode:
driver = ESPMegaStandaloneLightDriver(base_topic, self.light_server, self.light_server_port)
if driver.is_connected():
self.drivers[base_topic] = driver
else:
self.failed_drivers[base_topic] = driver.get_exception()
else:
controller = self.drivers[base_topic].controller
driver = ESPMegaLightDriver(controller, pwm_id)
self.assign_physical_light(row_index, column_index, driver)
# Return a list of connected drivers list and failed drivers list
return [self.connected_drivers, self.failed_drivers]

def _assign_light(self, row_index, column_index, light):
if self.design_mode:
self.connected_controllers[light["base_topic"]] = None
self.assign_physical_light(row_index, column_index, None)
return
if light is None:
self.assign_physical_light(row_index, column_index, None)
else:
self._assign_light_with_driver(row_index, column_index, light)

def _assign_light_with_driver(self, row_index, column_index, light):
base_topic = light["base_topic"]
pwm_id = light["pwm_id"]
if base_topic not in self.drivers:
self._create_new_driver(base_topic)
else:
controller = self.drivers[base_topic].controller
driver = ESPMegaLightDriver(controller, pwm_id)
self.assign_physical_light(row_index, column_index, driver)

def _create_new_driver(self, base_topic):
if not self.design_mode:
driver = ESPMegaStandaloneLightDriver(base_topic, self.light_server, self.light_server_port)
if driver.is_connected():
self.drivers[base_topic] = driver
else:
self.failed_drivers[base_topic] = driver.get_exception()

def read_light_map_from_file(self, filename: str):
try:
with open(filename, "r") as file:
light_map = json.load(file)
# Check if the light map is valid
if len(light_map) == 0:
raise Exception("Light map cannot be empty.")
if len(light_map[0]) == 0:
raise Exception("Light map cannot be empty.")
for row in light_map:
if len(row) != len(light_map[0]):
raise Exception(
"All rows in the light map must have the same length.")
for column in row:
if column != None:
if "base_topic" not in column:
raise Exception(
"The base_topic field is missing from a light.")
if "pwm_id" not in column:
raise Exception(
"The pwm_id field is missing from a light.")
if type(column["base_topic"]) != str:
raise Exception(
"The base_topic field must be a string.")
if type(column["pwm_id"]) != int:
raise Exception(
"The pwm_id field must be an integer.")
self.read_light_map(light_map)

ESPMegaLightGrid._validate_light_map(light_map)
ESPMegaLightGrid.read_light_map(light_map)

except FileNotFoundError:
raise Exception("The light map file does not exist.")
raise FileNotFoundError("The light map file does not exist.")

@staticmethod
def _validate_light_map(light_map):
if len(light_map) == 0:
raise ValueError("Light map cannot be empty.")

if len(light_map[0]) == 0:
raise ValueError("Light map cannot be empty.")

for row in light_map:
ESPMegaLightGrid._validate_row(row, light_map[0])

@staticmethod
def _validate_row(row, reference_row):
if len(row) != len(reference_row):
raise ValueError("All rows in the light map must have the same length.")

for column in row:
ESPMegaLightGrid._validate_column(column)

@staticmethod
def _validate_column(column):
if column is not None:
if "base_topic" not in column:
raise ValueError("The base_topic field is missing from a light.")

if "pwm_id" not in column:
raise ValueError("The pwm_id field is missing from a light.")

if not isinstance(column["base_topic"], str):
raise ValueError("The base_topic field must be a string.")

if not isinstance(column["pwm_id"], int):
raise ValueError("The pwm_id field must be an integer.")

0 comments on commit a0f7ca8

Please sign in to comment.