diff --git a/src/python/api/client/DAO.py b/src/python/api/client/DAO.py index ce7b15e..c4c1db9 100644 --- a/src/python/api/client/DAO.py +++ b/src/python/api/client/DAO.py @@ -1363,7 +1363,8 @@ def get_delivery_msg_count(name: str) -> int: def get_delivery_msg_batch(name: str, from_uid: int = 0, batch_size: int = 10) -> List[Tuple[int, list[dict[Any]]]]: try: with _get_connection() as conn, conn.cursor() as cursor: - cursor.execute(f'select uid, json_msg, retry_count from {_get_delivery_table_id(name)} where uid > %s limit %s', (from_uid, batch_size)) + # Using order by asc in case time series databases need values inserted in timestamp order. + cursor.execute(f'select uid, json_msg, retry_count from {_get_delivery_table_id(name)} where uid > %s order by uid asc limit %s', (from_uid, batch_size)) if cursor.rowcount < 1: return 0, [] diff --git a/src/python/api/client/Ubidots.py b/src/python/api/client/Ubidots.py index 8c893aa..592a7c9 100644 --- a/src/python/api/client/Ubidots.py +++ b/src/python/api/client/Ubidots.py @@ -4,6 +4,8 @@ from pdmodels.Models import Location, LogicalDevice +import util.LoggingUtil as lu + BASE_1_6 = "https://industrial.api.ubidots.com.au/api/v1.6" BASE_2_0 = "https://industrial.api.ubidots.com.au/api/v2.0" @@ -106,19 +108,19 @@ def get_all_devices() -> List[LogicalDevice]: return devices -def get_device(label: str) -> LogicalDevice: +def get_device(label: str, logging_ctx: dict) -> LogicalDevice: url = f'{BASE_2_0}/devices/~{label}' time.sleep(0.3) r = requests.get(url, headers=headers) if r.status_code != 200: - logging.warn(f'devices/~{label} received response: {r.status_code}: {r.reason}') + lu.cid_logger.error(f'devices/~{label} received response: {r.status_code}: {r.reason}', extra=logging_ctx) return None response_obj = json.loads(r.content) return _dict_to_logical_device(response_obj) -def post_device_data(label: str, body) -> bool: +def post_device_data(label: str, body: dict, logging_ctx: dict) -> bool: """ Post timeseries data to an Ubidots device. @@ -137,19 +139,18 @@ def post_device_data(label: str, body) -> bool: body_str = json.dumps(body) r = requests.post(url, headers=hdrs, data=body_str) if r.status_code != 200: - logging.info(f'POST {url}: {r.status_code}: {r.reason}') - logging.info(body_str) + lu.cid_logger.info(f'POST {url}: {r.status_code}: {r.reason}', extra=logging_ctx) return False return True -def update_device(label: str, patch_obj) -> None: +def update_device(label: str, patch_obj: dict, logging_ctx: dict) -> None: url = f'{BASE_2_0}/devices/~{label}' time.sleep(0.3) response = requests.patch(url, headers=headers, json=patch_obj) if response.status_code != 200: - logging.warning(f'PATCH response: {response.status_code}: {response.reason}') + lu.cid_logger.error(f'PATCH response: {response.status_code}: {response.reason}', extra=logging_ctx) def main(): diff --git a/src/python/delivery/BaseWriter.py b/src/python/delivery/BaseWriter.py index 73051bb..1b6427b 100644 --- a/src/python/delivery/BaseWriter.py +++ b/src/python/delivery/BaseWriter.py @@ -6,7 +6,6 @@ import json, logging, os, signal, time import BrokerConstants -import multiprocessing as mp import pika, pika.channel, pika.spec from pika.exchange_type import ExchangeType @@ -44,11 +43,9 @@ def run(self) -> None: delivery_thread = None try: dao.create_delivery_table(self.name) + delivery_thread = Thread(target=self.delivery_thread_proc, name='delivery_thread') delivery_thread.start() - # mp.set_start_method('spawn') - # self.del_proc = mp.Process(target=self.delivery_thread_proc) - # self.del_proc.start() except dao.DAOException as err: use_delivery_table = False @@ -101,7 +98,7 @@ def run(self) -> None: time.sleep(10) continue - # Ask the delivery thread to stop if the main thread got an error. + # Tell the delivery thread to stop if the main thread got an error. self.keep_running = False if self.connection is not None: logging.info('Closing connection') @@ -109,8 +106,6 @@ def run(self) -> None: logging.info('Waiting for delivery thread') delivery_thread.join() - # self.del_proc.kill() - # self.del_proc.join() def delivery_thread_proc(self) -> None: logging.info('Delivery threat started') @@ -120,6 +115,7 @@ def delivery_thread_proc(self) -> None: time.sleep(30) continue + logging.info(f'Processing {count} messages') msg_rows = dao.get_delivery_msg_batch(self.name) for msg_uid, msg, retry_count in msg_rows: logging.info(f'msg from table {msg_uid}, {retry_count}') @@ -140,7 +136,7 @@ def delivery_thread_proc(self) -> None: lu.cid_logger.error(f'Could not find logical device, dropping message: {msg}', extra=msg) dao.remove_delivery_msg(self.name, msg_uid) - rc = self.on_message(pd, ld, msg) + rc = self.on_message(pd, ld, msg, retry_count) if rc == BaseWriter.MSG_OK: lu.cid_logger.info('Message processed ok.', extra=msg) dao.remove_delivery_msg(self.name, msg_uid) @@ -160,8 +156,8 @@ def delivery_thread_proc(self) -> None: dao.stop() logging.info('Delivery threat stopped.') - def on_message(self, pd: PhysicalDevice, ld: LogicalDevice, msg: dict[Any]) -> int: - logging.info(f'{pd.name} / {ld.name}: {msg}') + def on_message(self, pd: PhysicalDevice, ld: LogicalDevice, msg: dict[Any], retry_count: int) -> int: + logging.info(f'{pd.name} / {ld.name} / {retry_count}: {msg}') return BaseWriter.MSG_OK def sigterm_handler(self, sig_no, stack_frame) -> None: diff --git a/src/python/delivery/UbidotsWriter.py b/src/python/delivery/UbidotsWriter.py index 58eb6a3..6290908 100644 --- a/src/python/delivery/UbidotsWriter.py +++ b/src/python/delivery/UbidotsWriter.py @@ -3,6 +3,7 @@ on to Ubidots. """ +import time from typing import Any import dateutil.parser @@ -21,7 +22,7 @@ class UbidotsWriter(BaseWriter): def __init__(self, name) -> None: super().__init__(name) - def on_message(self, pd: PhysicalDevice, ld: LogicalDevice, msg: dict[Any]): + def on_message(self, pd: PhysicalDevice, ld: LogicalDevice, msg: dict[Any], retry_count: int) -> int: """ This function is called when a message arrives from RabbitMQ. @@ -49,10 +50,9 @@ def on_message(self, pd: PhysicalDevice, ld: LogicalDevice, msg: dict[Any]): """ try: - logging.info(f'{pd.name} / {ld.name}') + lu.cid_logger.info(f'{pd.name} / {ld.name}', extra=msg) ts = 0.0 - # TODO: Find or create a class to hide all the Python datetime processing. try: ts_float = dateutil.parser.isoparse(msg[BrokerConstants.TIMESTAMP_KEY]).timestamp() # datetime.timestamp() returns a float where the ms are to the right of the @@ -131,11 +131,15 @@ def on_message(self, pd: PhysicalDevice, ld: LogicalDevice, msg: dict[Any]): # data is posted to Ubidots. # ubidots_dev_label = ld.properties['ubidots']['label'] - if not ubidots.post_device_data(ubidots_dev_label, ubidots_payload): + if not ubidots.post_device_data(ubidots_dev_label, ubidots_payload, {BrokerConstants.CORRELATION_ID_KEY: msg[BrokerConstants.CORRELATION_ID_KEY]}): # The write to Ubidots failed. - logging.error('Delivery to Ubidots failed at API call.') - # Make this retry later. - return BaseWriter.MSG_FAIL + lu.cid_logger.error('Delivery to Ubidots failed at API call.', extra=msg) + if retry_count > 4: + lu.cid_logger.error(f'Retried message 5 times, giving up.', extra=msg) + return BaseWriter.MSG_FAIL + else: + time.sleep(5) # Pause in case Ubidtos is flooded. + return BaseWriter.MSG_RETRY if new_device: # Update the Ubidots device with info from the source device and/or the @@ -166,14 +170,14 @@ def on_message(self, pd: PhysicalDevice, ld: LogicalDevice, msg: dict[Any]): # TODO: What about Green Brain devices? - ubidots.update_device(ubidots_dev_label, patch_obj) + ubidots.update_device(ubidots_dev_label, patch_obj, {BrokerConstants.CORRELATION_ID_KEY: msg[BrokerConstants.CORRELATION_ID_KEY]}) # Update the newly created logical device properties with the information # returned from Ubidots, but nothing else. We don't want to overwite the # last_seen value because that should be set to the timestamp from the # message, which was done in the mapper process. lu.cid_logger.info('Updating logical device properties from Ubidots.', extra=msg) - ud = ubidots.get_device(ubidots_dev_label) + ud = ubidots.get_device(ubidots_dev_label, {BrokerConstants.CORRELATION_ID_KEY: msg[BrokerConstants.CORRELATION_ID_KEY]}) if ud is not None: ld.properties['ubidots'] = ud.properties['ubidots'] dao.update_logical_device(ld) @@ -181,7 +185,7 @@ def on_message(self, pd: PhysicalDevice, ld: LogicalDevice, msg: dict[Any]): return self.MSG_OK except BaseException: - logging.exception('Error while processing message.') + lu.cid_logger.exception('Error while processing message.', extra=msg) return self.MSG_FAIL