Skip to content

Commit

Permalink
add alarm payload v2 and logic for alarm color management
Browse files Browse the repository at this point in the history
  • Loading branch information
v00g100skr committed Mar 27, 2024
1 parent adbae10 commit f75d229
Show file tree
Hide file tree
Showing 3 changed files with 200 additions and 131 deletions.
70 changes: 43 additions & 27 deletions deploy/updater/updater.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,23 +59,28 @@ async def update_data(mc):
try:
await asyncio.sleep(loop_time)
tcp_cached = await mc.get(b"tcp")
alerts_cached = await mc.get(b"alerts_websocket_v1")
weather_cached = await mc.get(b"weather_websocket_v1")
alerts_cached_v1 = await mc.get(b"alerts_websocket_v1")
alerts_cached_v2 = await mc.get(b"alerts_websocket_v2")
weather_cached_v1 = await mc.get(b"weather_websocket_v1")
alerts_data = await mc.get(b'alerts')
weather_data = await mc.get(b'weather')

if tcp_cached:
tcp_cached_data = json.loads(tcp_cached.decode('utf-8'))
else:
tcp_cached_data = {}
if alerts_cached:
alerts_cached_data = json.loads(alerts_cached.decode('utf-8'))
if alerts_cached_v1:
alerts_cached_data_v1 = json.loads(alerts_cached_v1.decode('utf-8'))
else:
alerts_cached_data = []
if weather_cached:
weather_cached_data = json.loads(weather_cached.decode('utf-8'))
alerts_cached_data_v1 = []
if alerts_cached_v2:
alerts_cached_data_v2 = json.loads(alerts_cached_v2.decode('utf-8'))
else:
weather_cached_data = []
alerts_cached_data_v2 = []
if weather_cached_v1:
weather_cached_data_v1 = json.loads(weather_cached_v1.decode('utf-8'))
else:
weather_cached_data_v1 = []

current_datetime = datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ")

Expand All @@ -88,35 +93,39 @@ async def update_data(mc):
local_time = datetime.now(timezone.utc)
formatted_local_time = local_time.strftime("%Y-%m-%dT%H:%M:%SZ")

alerts = []
weather = []
alerts_v1 = []
alerts_v2 = []
weather_v1 = []

try:
for region_name in regions:
time_diff = await calculate_time_difference(alerts_data['states'][region_name]['changed'], formatted_local_time)
if alerts_data['states'][region_name]['alertnow']:
if time_diff > 300:
alert_mode = 1
alert_mode_v1 = 1
else:
alert_mode = 3
alert_mode_v1 = 3
alert_mode_v2 = 1
else:
if time_diff > 300:
alert_mode = 0
alert_mode_v1 = 0
else:
alert_mode = 2
alert_mode_v1 = 2
alert_mode_v2 = 0

alerts.append(str(alert_mode))
alerts_v1.append(str(alert_mode_v1))
alerts_v2.append([str(alert_mode_v2), alerts_data['states'][region_name]['changed']])
except Exception as e:
logger.error(f"Alert error: {e}")

try:
for region in regions:
weather_temp = float(weather_data['states'][region]['temp'])
weather.append(str(weather_temp))
weather_v1.append(str(weather_temp))
except Exception as e:
logger.error(f"Weather error: {e}")

tcp_data = "%s:%s" % (",".join(alerts), ",".join(weather))
tcp_data = "%s:%s" % (",".join(alerts_v1), ",".join(weather_v1))

if tcp_cached_data != tcp_data:
logging.debug("store tcp data: %s" % current_datetime)
Expand All @@ -125,19 +134,26 @@ async def update_data(mc):
else:
logging.debug("tcp data not changed")

if alerts_cached_data != alerts:
logging.debug("store alerts: %s" % current_datetime)
await mc.set(b"alerts_websocket_v1", json.dumps(alerts).encode('utf-8'))
logging.debug("alerts stored")
if alerts_cached_data_v1 != alerts_v1:
logging.debug("store alerts_v1: %s" % current_datetime)
await mc.set(b"alerts_websocket_v1", json.dumps(alerts_v1).encode('utf-8'))
logging.debug("alerts_v1 stored")
else:
logging.debug("alerts_v1 not changed")

if alerts_cached_data_v2 != alerts_v2:
logging.debug("store alerts_v2: %s" % current_datetime)
await mc.set(b"alerts_websocket_v2", json.dumps(alerts_v2).encode('utf-8'))
logging.debug("alerts_v2 stored")
else:
logging.debug("alerts not changed")
logging.debug("alerts_v2 not changed")

if weather_cached_data != weather:
logging.debug("store weather: %s" % current_datetime)
await mc.set(b"weather_websocket_v1", json.dumps(weather).encode('utf-8'))
logging.debug("weather stored")
if weather_cached_data_v1 != weather_v1:
logging.debug("store weather_v1: %s" % current_datetime)
await mc.set(b"weather_websocket_v1", json.dumps(weather_v1).encode('utf-8'))
logging.debug("weather_v1 stored")
else:
logging.debug("weather not changed")
logging.debug("weather_v1 not changed")
except Exception as e:
logging.error(f"Error fetching data: {str(e)}")
raise
Expand Down
192 changes: 116 additions & 76 deletions deploy/websocket_server/websocket_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,13 @@
mc = Client(memcached_host, 11211)
geo = database.Reader('GeoLite2-City.mmdb')


class SharedData:
def __init__(self):
self.alerts = '[]'
self.alerts_v1 = '[]'
self.alerts_v2 = '[]'
self.alerts_full = {}
self.weather = '[]'
self.weather_v1 = '[]'
self.weather_full = {}
self.bins = '[]'
self.test_bins = '[]'
Expand All @@ -38,6 +40,12 @@ def __init__(self):

shared_data = SharedData()


class AlertVersion:
v1 = 1
v2 = 2


regions = {
"Закарпатська область": {"id": 0},
"Івано-Франківська область": {"id": 1},
Expand Down Expand Up @@ -68,7 +76,7 @@ def __init__(self):
}


async def alerts_data(websocket, client, shared_data):
async def alerts_data(websocket, client, shared_data, alert_version):
client_ip, client_port = websocket.remote_address
while True:
match client['firmware']:
Expand All @@ -78,18 +86,32 @@ async def alerts_data(websocket, client, shared_data):
client_id = client['firmware']
try:
logger.debug(f"{client_ip}:{client_id}: check")
if client['alerts'] != shared_data.alerts:
alerts = json.dumps([int(alert) for alert in json.loads(shared_data.alerts)])
payload = '{"payload":"alerts","alerts":%s}' % alerts
await websocket.send(payload)
logger.info(f"{client_ip}:{client_id} <<< new alerts")
client['alerts'] = shared_data.alerts
if client['weather'] != shared_data.weather:
weather = json.dumps([float(weather) for weather in json.loads(shared_data.weather)])
match alert_version:
case AlertVersion.v1:
if client['alerts'] != shared_data.alerts_v1:
alerts = json.dumps([int(alert) for alert in json.loads(shared_data.alerts_v1)])
payload = '{"payload":"alerts","alerts":%s}' % alerts
await websocket.send(payload)
logger.info(f"{client_ip}:{client_id} <<< new alerts")
client['alerts'] = shared_data.alerts_v1
case AlertVersion.v2:
if client['alerts'] != shared_data.alerts_v2:
alerts = []
for alert in json.loads(shared_data.alerts_v2):
datetime_obj = datetime.fromisoformat(alert[1].replace("Z", "+00:00"))
datetime_obj_utc = datetime_obj.replace(tzinfo=timezone.utc)
alerts.append([int(alert[0]),int(datetime_obj_utc.timestamp())])
alerts = json.dumps(alerts)
payload = '{"payload":"alerts","alerts":%s}' % alerts
await websocket.send(payload)
logger.info(f"{client_ip}:{client_id} <<< new alerts")
client['alerts'] = shared_data.alerts_v2
if client['weather'] != shared_data.weather_v1:
weather = json.dumps([float(weather) for weather in json.loads(shared_data.weather_v1)])
payload = '{"payload":"weather","weather":%s}' % weather
await websocket.send(payload)
logger.info(f"{client_ip}:{client_id} <<< new weather")
client['weather'] = shared_data.weather
client['weather'] = shared_data.weather_v1
if client['bins'] != shared_data.bins:
payload = '{"payload": "bins", "bins": %s}' % shared_data.bins
await websocket.send(payload)
Expand Down Expand Up @@ -142,53 +164,56 @@ async def echo(websocket, path):

match path:
case "/data_v1":
data_task = asyncio.create_task(alerts_data(websocket, client, shared_data))
try:
while True:
async for message in websocket:
match client['firmware']:
case 'unknown':
client_id = client_port
case _:
client_id = client['firmware']
logger.info(f"{client_ip}:{client_id} >>> {message}")

def split_message(message):
parts = message.split(':', 1) # Split at most into 2 parts
header = parts[0]
data = parts[1] if len(parts) > 1 else ''
return header, data

header, data = split_message(message)
match header:
case 'district':
district_data = await district_data_v1(int(data))
payload = json.dumps(district_data).encode('utf-8')
await websocket.send(payload)
logger.info(f"{client_ip}:{client_id} <<< district {payload} ")
case 'firmware':
client['firmware'] = data
logger.warning(f"{client_ip}:{client_id} >>> firmware saved")
case 'chip_id':
client['chip_id'] = data
logger.info(f"{client_ip}:{client_id} >>> chip_id saved")
case _:
logger.info(f"{client_ip}:{client_id} !!! unknown data request")
except websockets.exceptions.ConnectionClosedError as e:
logger.error(f"Connection closed with error - {e}")
except Exception as e:
pass
finally:
data_task.cancel()
del shared_data.clients[f'{client_ip}_{client_port}']
try:
await data_task
except asyncio.CancelledError:
logger.info(f"{client_ip}:{client_id} !!! tasks cancelled")
logger.info(f"{client_ip}:{client_id} !!! end")
data_task = asyncio.create_task(alerts_data(websocket, client, shared_data, AlertVersion.v1))

case "/data_v2":
data_task = asyncio.create_task(alerts_data(websocket, client, shared_data, AlertVersion.v2))

case _:
return

try:
while True:
async for message in websocket:
match client['firmware']:
case 'unknown':
client_id = client_port
case _:
client_id = client['firmware']
logger.info(f"{client_ip}:{client_id} >>> {message}")

def split_message(message):
parts = message.split(':', 1) # Split at most into 2 parts
header = parts[0]
data = parts[1] if len(parts) > 1 else ''
return header, data

header, data = split_message(message)
match header:
case 'district':
district_data = await district_data_v1(int(data))
payload = json.dumps(district_data).encode('utf-8')
await websocket.send(payload)
logger.info(f"{client_ip}:{client_id} <<< district {payload} ")
case 'firmware':
client['firmware'] = data
logger.warning(f"{client_ip}:{client_id} >>> firmware saved")
case 'chip_id':
client['chip_id'] = data
logger.info(f"{client_ip}:{client_id} >>> chip_id saved")
case _:
logger.info(f"{client_ip}:{client_id} !!! unknown data request")
except websockets.exceptions.ConnectionClosedError as e:
logger.error(f"Connection closed with error - {e}")
except Exception as e:
pass
finally:
data_task.cancel()
del shared_data.clients[f'{client_ip}_{client_port}']
try:
await data_task
except asyncio.CancelledError:
logger.info(f"{client_ip}:{client_id} !!! tasks cancelled")
logger.info(f"{client_ip}:{client_id} !!! end")

async def district_data_v1(district_id):
alerts_cached_data = shared_data.alerts_full
Expand All @@ -212,19 +237,28 @@ async def district_data_v1(district_id):
async def update_shared_data(shared_data, mc):
while True:
logger.debug("memcache check")
alerts, weather, bins, test_bins, alerts_full, weather_full = await get_data_from_memcached(mc)
alerts_v1, alerts_v2, weather_v1, bins, test_bins, alerts_full, weather_full = await get_data_from_memcached(mc)

try:
if alerts_v1 != shared_data.alerts_v1:
shared_data.alerts_v1 = alerts_v1
logger.info(f"alerts_v1 updated: {alerts_v1}")
except Exception as e:
logger.error(f"error in alerts_v1: {e}")

try:
if alerts != shared_data.alerts:
shared_data.alerts = alerts
logger.info(f"alerts updated: {alerts}")
if alerts_v2 != shared_data.alerts_v2:
shared_data.alerts_v2 = alerts_v2
logger.info(f"alerts_v2 updated: {alerts_v2}")
except Exception as e:
logger.error(f"error in alerts: {e}")
logger.error(f"error in alerts_v2: {e}")

try:
if weather != shared_data.weather:
shared_data.weather = weather
logger.info(f"weather updated: {weather}")
if weather_v1 != shared_data.weather_v1:
shared_data.weather_v1 = weather_v1
logger.info(f"weather updated: {weather_v1}")
except Exception as e:
logger.error(f"error in weather: {e}")
logger.error(f"error in weather_v1: {e}")

try:
if bins != shared_data.bins:
Expand Down Expand Up @@ -269,8 +303,9 @@ async def print_clients(shared_data, mc):


async def get_data_from_memcached(mc):
alerts_cached = await mc.get(b"alerts_websocket_v1")
weather_cached = await mc.get(b"weather_websocket_v1")
alerts_cached_v1 = await mc.get(b"alerts_websocket_v1")
alerts_cached_v2 = await mc.get(b"alerts_websocket_v2")
weather_cached_v1 = await mc.get(b"weather_websocket_v1")
bins_cached = await mc.get(b"bins")
test_bins_cached = await mc.get(b"test_bins")
alerts_full_cached = await mc.get(b"alerts")
Expand All @@ -280,17 +315,22 @@ async def get_data_from_memcached(mc):
values = [0] * 25
position = random.randint(0, 25)
values.insert(position, 1)
alerts_cached_data = json.dumps(values[:26])
alerts_cached_data_v1 = json.dumps(values[:26])
alerts_cached_data_v2 = json.dumps(values[:26])
else:
if alerts_cached:
alerts_cached_data = alerts_cached.decode('utf-8')
if alerts_cached_v1:
alerts_cached_data_v1 = alerts_cached_v1.decode('utf-8')
else:
alerts_cached_data_v1 = '[]'
if alerts_cached_v2:
alerts_cached_data_v2 = alerts_cached_v2.decode('utf-8')
else:
alerts_cached_data = '[]'
alerts_cached_data_v2 = '[]'

if weather_cached:
weather_cached_data = weather_cached.decode('utf-8')
if weather_cached_v1:
weather_cached_data_v1 = weather_cached_v1.decode('utf-8')
else:
weather_cached_data = '[]'
weather_cached_data_v1 = '[]'

if bins_cached:
bins_cached_data = bins_cached.decode('utf-8')
Expand All @@ -312,7 +352,7 @@ async def get_data_from_memcached(mc):
else:
weather_full_cached_data = {}

return alerts_cached_data, weather_cached_data, bins_cached_data, test_bins_cached_data, alerts_full_cached_data, weather_full_cached_data
return alerts_cached_data_v1, alerts_cached_data_v2, weather_cached_data_v1, bins_cached_data, test_bins_cached_data, alerts_full_cached_data, weather_full_cached_data


start_server = websockets.serve(echo, "0.0.0.0", websocket_port, ping_interval=ping_interval)
Expand Down
Loading

0 comments on commit f75d229

Please sign in to comment.