Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multidriver #1

Merged
merged 4 commits into from
Dec 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
249 changes: 249 additions & 0 deletions src/espmega_lightshow/drivers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,249 @@
from abc import ABC
from espmega.espmega_r3 import ESPMega_standalone as ESPMega
import json
from typing import Optional
# This is the base class for all physical light drivers


class LightDriver(ABC):
conntected: bool = False
state: bool = False
color: tuple = (0, 0, 0)
brightness: int = 0

def __init__(self, **kwargs):
# The init function should take in any parameters needed to initialize the driver
# This function should not raise any exceptions if the driver is not able to be initialized
# Instead, it should set the driver to a state where it is not able to be controlled
pass

def set_light_state(self, state: bool) -> None:
# This function should set the light to the given state
pass

def get_light_state(self) -> int:
# This function should return the current state of the light
# Returns 0 if the light is off, 1 if the light is on
# Return 2 if the light is on but is not able to be controlled
# Return 3 if the light is off but is not able to be controlled
pass

def is_connected(self) -> bool:
# This function should return whether the driver is connected to the light
return self.conntected

def get_exception(self) -> Optional[str]:
# This function should return the exception that caused the driver to be disconnected
if self.conntected:
return None
return self.exception

@staticmethod
def get_driver_properties() -> dict:
# Standard properties:
# name: The name of the driver
# support_brightness: Whether the driver supports brightness control
# support_color: Whether the driver supports color control
pass

def set_brightness(self, brightness: float) -> None:
# This function should set the brightness of the light
# brightness is a float between 0 and 4095
pass

def get_brightness(self) -> float:
# This function should return the current brightness of the light
# brightness is a float between 0 and 4095
pass

def set_color(self, color: tuple) -> None:
# This function should set the color of the light
# color is a tuple of 3 integers between 0 and 4095
pass

def get_color(self) -> tuple:
# This function should return the current color of the light
# color is a tuple of 3 integers between 0 and 4095
pass


class ESPMegaLightDriver(LightDriver):
rapid_mode: bool = False

def __init__(self, controller: ESPMega, pwm_channel: int) -> int:
self.controller = controller
self.pwm_channel = pwm_channel
if controller is None:
self.conntected = False
self.exception = "Controller is not connected."
self.conntected = True

def set_light_state(self, state: bool) -> None:
if not self.conntected:
self.state = state
else:
self.controller.digital_write(self.pwm_channel, state)

def get_light_state(self) -> bool:
if self.conntected:
self.state = self.controller.get_pwm_state(self.pwm_channel)
return self.state + 2 * (not self.conntected)

@staticmethod
def get_driver_properties() -> dict:
return {
"name": "ESPMega",
"support_brightness": False,
"support_color": False
}


class ESPMegaStandaloneLightDriver(ESPMegaLightDriver):
def __init__(self, base_topic: str,pwm_channel: int, light_server: str, light_server_port: int, rapid_mode: bool = False) -> dict:
self.base_topic = base_topic
self.light_server = light_server
self.light_server_port = light_server_port
self.pwm_channel = pwm_channel
self.rapid_mode = rapid_mode
self.state = False
self.connected = False
try:
self.controller = ESPMega(
base_topic, light_server, light_server_port)
if rapid_mode:
self.controller.set_rapid_mode()
print("Connected to controller.")
self.connected = True
except Exception as e:
print(e)
self.controller = None
self.exception = e
self.connected = False
def close(self):
if self.conntected and self.rapid_mode:
self.controller.disable_rapid_response_mode()
@staticmethod
def get_driver_properties() -> dict:
return {
"name": "ESPMega Standalone",
"support_brightness": False,
"support_color": False
}


class ESPMegaLightGrid:
def __init__(self, light_server: str, light_server_port: int, rows: int = 0, columns: int = 0, rapid_mode: bool = False, design_mode: bool = False):
self.rows = rows
self.columns = columns
self.lights: list = [None] * rows * columns
self.drivers = {}
self.light_server = light_server
self.light_server_port = light_server_port
self.design_mode = design_mode

def assign_physical_light(self, row: int, column: int, physical_light: Optional[LightDriver]):
self.lights[row * self.columns + column] = physical_light

def mark_light_disappeared(self, row: int, column: int):
self.lights[row * self.columns + column] = None

def get_physical_light(self, row, column):
return self.lights[row * self.columns + column]

def set_light_state(self, row: int, column: int, state: bool) -> None:
physical_light = self.get_physical_light(row, column)
if not self.design_mode:
physical_light.set_light_state(state)

def get_light_state(self, row: int, column: int):
physical_light = self.get_physical_light(row, column)
return physical_light.get_light_state()

def read_light_map(self, light_map: list) -> list:
self.initialize_light_map(light_map)
for row_index, row in enumerate(light_map):
for column_index, light in enumerate(row):
self.__assign_light(row_index, column_index, light)
return [self.connected_drivers, self.failed_drivers]

def initialize_light_map(self, light_map):
self.light_map = light_map
self.rows = len(light_map)
self.columns = len(light_map[0])
self.lights = [None] * self.rows * self.columns
self.controllers = {} # Dictionary to store existing controllers
self.failed_controllers = {} # Dictionary to store failed controllers
self.connected_controllers = {} # Dictionary to store connected controllers

def _assign_light(self, row_index, column_index, light):
if self.design_mode:
self.connected_controllers[light["base_topic"]] = None
self.assign_physical_light(row_index, column_index, None)
return
if light is None:
self.assign_physical_light(row_index, column_index, None)
else:
self._assign_light_with_driver(row_index, column_index, light)

def _assign_light_with_driver(self, row_index, column_index, light):
base_topic = light["base_topic"]
pwm_id = light["pwm_id"]
if base_topic not in self.drivers:
self._create_new_driver(base_topic)
else:
controller = self.drivers[base_topic].controller
driver = ESPMegaLightDriver(controller, pwm_id)
self.assign_physical_light(row_index, column_index, driver)

def _create_new_driver(self, base_topic):
if not self.design_mode:
driver = ESPMegaStandaloneLightDriver(base_topic, self.light_server, self.light_server_port)
if driver.is_connected():
self.drivers[base_topic] = driver
else:
self.failed_drivers[base_topic] = driver.get_exception()

def read_light_map_from_file(self, filename: str):
try:
with open(filename, "r") as file:
light_map = json.load(file)

ESPMegaLightGrid._validate_light_map(light_map)
ESPMegaLightGrid.read_light_map(light_map)

except FileNotFoundError:
raise FileNotFoundError("The light map file does not exist.")

@staticmethod
def _validate_light_map(light_map):
if len(light_map) == 0:
raise ValueError("Light map cannot be empty.")

if len(light_map[0]) == 0:
raise ValueError("Light map cannot be empty.")

for row in light_map:
ESPMegaLightGrid._validate_row(row, light_map[0])

@staticmethod
def _validate_row(row, reference_row):
if len(row) != len(reference_row):
raise ValueError("All rows in the light map must have the same length.")

for column in row:
ESPMegaLightGrid._validate_column(column)

@staticmethod
def _validate_column(column):
if column is not None:
if "base_topic" not in column:
raise ValueError("The base_topic field is missing from a light.")

if "pwm_id" not in column:
raise ValueError("The pwm_id field is missing from a light.")

if not isinstance(column["base_topic"], str):
raise ValueError("The base_topic field must be a string.")

if not isinstance(column["pwm_id"], int):
raise ValueError("The pwm_id field must be an integer.")
23 changes: 23 additions & 0 deletions test_driver.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
from src.espmega_lightshow.drivers import ESPMegaLightDriver, ESPMegaStandaloneLightDriver, ESPMegaLightGrid
from time import sleep

# Define and instantiate the driver
driver = ESPMegaStandaloneLightDriver("/espmega/ProR3", 0, "192.168.0.26", 1883)
driver.set_light_state(True)

# Define and instantiate a slave driver
slave_driver = ESPMegaLightDriver(driver.controller, 1)
slave_driver.set_light_state(True)

# Define and instantiate a light grid
light_grid = ESPMegaLightGrid("192.168.0.26",1883, 2, 2, False, False)
light_grid.assign_physical_light(0, 0, driver)
light_grid.assign_physical_light(0, 1, slave_driver)

while True:
light_grid.set_light_state(0, 0, True)
light_grid.set_light_state(0, 1, False)
sleep(1)
light_grid.set_light_state(0, 0, False)
light_grid.set_light_state(0, 1, True)
sleep(1)
Loading