Skip to content

Commit

Permalink
WIP: Added limited retries to Ubidots, added context uuid to Ubidots …
Browse files Browse the repository at this point in the history
…client API log messages.
  • Loading branch information
dajtxx committed Jul 23, 2024
1 parent 8e79942 commit d8ebcf2
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 28 deletions.
3 changes: 2 additions & 1 deletion src/python/api/client/DAO.py
Original file line number Diff line number Diff line change
Expand Up @@ -1363,7 +1363,8 @@ def get_delivery_msg_count(name: str) -> int:
def get_delivery_msg_batch(name: str, from_uid: int = 0, batch_size: int = 10) -> List[Tuple[int, list[dict[Any]]]]:
try:
with _get_connection() as conn, conn.cursor() as cursor:
cursor.execute(f'select uid, json_msg, retry_count from {_get_delivery_table_id(name)} where uid > %s limit %s', (from_uid, batch_size))
# Using order by asc in case time series databases need values inserted in timestamp order.
cursor.execute(f'select uid, json_msg, retry_count from {_get_delivery_table_id(name)} where uid > %s order by uid asc limit %s', (from_uid, batch_size))
if cursor.rowcount < 1:
return 0, []

Expand Down
15 changes: 8 additions & 7 deletions src/python/api/client/Ubidots.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

from pdmodels.Models import Location, LogicalDevice

import util.LoggingUtil as lu

BASE_1_6 = "https://industrial.api.ubidots.com.au/api/v1.6"
BASE_2_0 = "https://industrial.api.ubidots.com.au/api/v2.0"

Expand Down Expand Up @@ -106,19 +108,19 @@ def get_all_devices() -> List[LogicalDevice]:
return devices


def get_device(label: str) -> LogicalDevice:
def get_device(label: str, logging_ctx: dict) -> LogicalDevice:
url = f'{BASE_2_0}/devices/~{label}'
time.sleep(0.3)
r = requests.get(url, headers=headers)
if r.status_code != 200:
logging.warn(f'devices/~{label} received response: {r.status_code}: {r.reason}')
lu.cid_logger.error(f'devices/~{label} received response: {r.status_code}: {r.reason}', extra=logging_ctx)
return None

response_obj = json.loads(r.content)
return _dict_to_logical_device(response_obj)


def post_device_data(label: str, body) -> bool:
def post_device_data(label: str, body: dict, logging_ctx: dict) -> bool:
"""
Post timeseries data to an Ubidots device.
Expand All @@ -137,19 +139,18 @@ def post_device_data(label: str, body) -> bool:
body_str = json.dumps(body)
r = requests.post(url, headers=hdrs, data=body_str)
if r.status_code != 200:
logging.info(f'POST {url}: {r.status_code}: {r.reason}')
logging.info(body_str)
lu.cid_logger.info(f'POST {url}: {r.status_code}: {r.reason}', extra=logging_ctx)
return False

return True


def update_device(label: str, patch_obj) -> None:
def update_device(label: str, patch_obj: dict, logging_ctx: dict) -> None:
url = f'{BASE_2_0}/devices/~{label}'
time.sleep(0.3)
response = requests.patch(url, headers=headers, json=patch_obj)
if response.status_code != 200:
logging.warning(f'PATCH response: {response.status_code}: {response.reason}')
lu.cid_logger.error(f'PATCH response: {response.status_code}: {response.reason}', extra=logging_ctx)


def main():
Expand Down
16 changes: 6 additions & 10 deletions src/python/delivery/BaseWriter.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import json, logging, os, signal, time

import BrokerConstants
import multiprocessing as mp
import pika, pika.channel, pika.spec
from pika.exchange_type import ExchangeType

Expand Down Expand Up @@ -44,11 +43,9 @@ def run(self) -> None:
delivery_thread = None
try:
dao.create_delivery_table(self.name)

delivery_thread = Thread(target=self.delivery_thread_proc, name='delivery_thread')
delivery_thread.start()
# mp.set_start_method('spawn')
# self.del_proc = mp.Process(target=self.delivery_thread_proc)
# self.del_proc.start()
except dao.DAOException as err:
use_delivery_table = False

Expand Down Expand Up @@ -101,16 +98,14 @@ def run(self) -> None:
time.sleep(10)
continue

# Ask the delivery thread to stop if the main thread got an error.
# Tell the delivery thread to stop if the main thread got an error.
self.keep_running = False
if self.connection is not None:
logging.info('Closing connection')
self.connection.close()

logging.info('Waiting for delivery thread')
delivery_thread.join()
# self.del_proc.kill()
# self.del_proc.join()

def delivery_thread_proc(self) -> None:
logging.info('Delivery threat started')
Expand All @@ -120,6 +115,7 @@ def delivery_thread_proc(self) -> None:
time.sleep(30)
continue

logging.info(f'Processing {count} messages')
msg_rows = dao.get_delivery_msg_batch(self.name)
for msg_uid, msg, retry_count in msg_rows:
logging.info(f'msg from table {msg_uid}, {retry_count}')
Expand All @@ -140,7 +136,7 @@ def delivery_thread_proc(self) -> None:
lu.cid_logger.error(f'Could not find logical device, dropping message: {msg}', extra=msg)
dao.remove_delivery_msg(self.name, msg_uid)

rc = self.on_message(pd, ld, msg)
rc = self.on_message(pd, ld, msg, retry_count)
if rc == BaseWriter.MSG_OK:
lu.cid_logger.info('Message processed ok.', extra=msg)
dao.remove_delivery_msg(self.name, msg_uid)
Expand All @@ -160,8 +156,8 @@ def delivery_thread_proc(self) -> None:
dao.stop()
logging.info('Delivery threat stopped.')

def on_message(self, pd: PhysicalDevice, ld: LogicalDevice, msg: dict[Any]) -> int:
logging.info(f'{pd.name} / {ld.name}: {msg}')
def on_message(self, pd: PhysicalDevice, ld: LogicalDevice, msg: dict[Any], retry_count: int) -> int:
logging.info(f'{pd.name} / {ld.name} / {retry_count}: {msg}')
return BaseWriter.MSG_OK

def sigterm_handler(self, sig_no, stack_frame) -> None:
Expand Down
24 changes: 14 additions & 10 deletions src/python/delivery/UbidotsWriter.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
on to Ubidots.
"""

import time
from typing import Any
import dateutil.parser

Expand All @@ -21,7 +22,7 @@ class UbidotsWriter(BaseWriter):
def __init__(self, name) -> None:
super().__init__(name)

def on_message(self, pd: PhysicalDevice, ld: LogicalDevice, msg: dict[Any]):
def on_message(self, pd: PhysicalDevice, ld: LogicalDevice, msg: dict[Any], retry_count: int) -> int:
"""
This function is called when a message arrives from RabbitMQ.
Expand Down Expand Up @@ -49,10 +50,9 @@ def on_message(self, pd: PhysicalDevice, ld: LogicalDevice, msg: dict[Any]):
"""

try:
logging.info(f'{pd.name} / {ld.name}')
lu.cid_logger.info(f'{pd.name} / {ld.name}', extra=msg)

ts = 0.0
# TODO: Find or create a class to hide all the Python datetime processing.
try:
ts_float = dateutil.parser.isoparse(msg[BrokerConstants.TIMESTAMP_KEY]).timestamp()
# datetime.timestamp() returns a float where the ms are to the right of the
Expand Down Expand Up @@ -131,11 +131,15 @@ def on_message(self, pd: PhysicalDevice, ld: LogicalDevice, msg: dict[Any]):
# data is posted to Ubidots.
#
ubidots_dev_label = ld.properties['ubidots']['label']
if not ubidots.post_device_data(ubidots_dev_label, ubidots_payload):
if not ubidots.post_device_data(ubidots_dev_label, ubidots_payload, {BrokerConstants.CORRELATION_ID_KEY: msg[BrokerConstants.CORRELATION_ID_KEY]}):
# The write to Ubidots failed.
logging.error('Delivery to Ubidots failed at API call.')
# Make this retry later.
return BaseWriter.MSG_FAIL
lu.cid_logger.error('Delivery to Ubidots failed at API call.', extra=msg)
if retry_count > 4:
lu.cid_logger.error(f'Retried message 5 times, giving up.', extra=msg)
return BaseWriter.MSG_FAIL
else:
time.sleep(5) # Pause in case Ubidtos is flooded.
return BaseWriter.MSG_RETRY

if new_device:
# Update the Ubidots device with info from the source device and/or the
Expand Down Expand Up @@ -166,22 +170,22 @@ def on_message(self, pd: PhysicalDevice, ld: LogicalDevice, msg: dict[Any]):

# TODO: What about Green Brain devices?

ubidots.update_device(ubidots_dev_label, patch_obj)
ubidots.update_device(ubidots_dev_label, patch_obj, {BrokerConstants.CORRELATION_ID_KEY: msg[BrokerConstants.CORRELATION_ID_KEY]})

# Update the newly created logical device properties with the information
# returned from Ubidots, but nothing else. We don't want to overwite the
# last_seen value because that should be set to the timestamp from the
# message, which was done in the mapper process.
lu.cid_logger.info('Updating logical device properties from Ubidots.', extra=msg)
ud = ubidots.get_device(ubidots_dev_label)
ud = ubidots.get_device(ubidots_dev_label, {BrokerConstants.CORRELATION_ID_KEY: msg[BrokerConstants.CORRELATION_ID_KEY]})
if ud is not None:
ld.properties['ubidots'] = ud.properties['ubidots']
dao.update_logical_device(ld)

return self.MSG_OK

except BaseException:
logging.exception('Error while processing message.')
lu.cid_logger.exception('Error while processing message.', extra=msg)
return self.MSG_FAIL


Expand Down

0 comments on commit d8ebcf2

Please sign in to comment.