From 2adb8b9c8f325fe7c88b647cccf576812d1b7b45 Mon Sep 17 00:00:00 2001 From: Jaakko Malkki Date: Thu, 1 Dec 2022 17:10:14 +0200 Subject: [PATCH 1/7] Handle MQTT disconnections by calculating message rate based on the duration that the MQTT client was connected --- mqtt_data_collector.py | 107 ++++++++++++++++++++++++++++++----------- 1 file changed, 78 insertions(+), 29 deletions(-) diff --git a/mqtt_data_collector.py b/mqtt_data_collector.py index 9d49db8..cae5ef9 100644 --- a/mqtt_data_collector.py +++ b/mqtt_data_collector.py @@ -11,15 +11,22 @@ load_dotenv() +# MQTT keep alive interval +# This needs to be small enough to detect if the connection is down so that message rate will be calculated correctly +MQTT_KEEP_ALIVE_SECS = 1 + IS_DEBUG = os.getenv('IS_DEBUG') == "True" # How long to listen to the topics until we send data to Azure. Should be 60 in production -MONITOR_PERIOD_IN_SECONDS = 60 if IS_DEBUG == False else 3 +MONITOR_PERIOD_IN_SECONDS = 60 if IS_DEBUG == False else 20 class Topic: is_running = False msg_count = 0 + measuring_started_at = None + measuring_stopped_at = None + def __init__(self, topic_address, topic_name, topic_port): self.topic_address = topic_address self.topic_name = topic_name @@ -34,47 +41,90 @@ def listen_topic(self): """ self.is_running = True + self.measuring_started_at = None + self.measuring_stopped_at = None + client = mqtt.Client() client.on_connect = self._on_connect_callback client.on_message = self._on_message_callback + client.on_disconnect = self._on_disconnect_callback try: # Enable debugging if needed - # client.on_log = on_log_callback - # client.on_disconnect = on_disconnect_callback + #client.on_log = self._on_log_callback - client.connect(self.topic_address, int(self.topic_port), MONITOR_PERIOD_IN_SECONDS) + client.connect_async(self.topic_address, int(self.topic_port), MQTT_KEEP_ALIVE_SECS) - # Call that processes network traffic, dispatches callbacks and - # handles reconnecting. + print(f"Connecting to MQTT broker at {self.topic_address}:{self.topic_port}") + # Starts thread that processes network traffic and dispatches callbacks client.loop_start() except Exception as e: print(f"Error on topic {self.topic_name} {self.topic_address} {self.topic_port}: {e}") + self.measuring_stopped_at = time.perf_counter() client.disconnect() self.is_running = False # The callback for when the client receives a CONNACK response from the server. def _on_connect_callback(self, client, userdata, flags, rc): + print(f"Connected to MQTT broker at {self.topic_address}:{self.topic_port}") if rc == 0: client.subscribe(self.topic_name) + self.measuring_started_at = time.perf_counter() + self.measuring_stopped_at = None else: print(f"Error on connecting {client}, is our IP whitelisted for the topic?") + # Enable debugging if needed + def _on_disconnect_callback(self, client, userdata, rc): + print(f"Disconnected from {self.topic_address}, rc: {rc}") + self.measuring_stopped_at = time.perf_counter() + client.loop_stop() + self.is_running = False + # # The callback for when a PUBLISH message is received from the server. def _on_message_callback(self, client, userdata, msg): self.msg_count += 1 # print(msg.topic+" "+str(msg.payload)) - # Enable debugging if needed - # def on_log_callback(self, client, userdata, level, buf): - # print(buf) + def get_msg_count(self): + if self.measuring_started_at == None: + print(f"No data was measured for {self.topic_address}:{self.topic_port} on topic {self.topic_name}. Maybe the client was not connected?") + return None - # Enable debugging if needed - # def on_disconnect_callback(self, client, userdata, rc): - # print("Disconnected") + if self.measuring_stopped_at != None: + elapsed_time = self.measuring_stopped_at - self.measuring_started_at + + # If data was collected for too short period, we can't accurately calculate the message rate + if elapsed_time < 10*MQTT_KEEP_ALIVE_SECS: + return None + """ + Adjust elapsed_time to account the time it took to detect that the connection was down. + This should take roughly 2 times the duration of MQTT keep alive interval. + This adjustment can cause the message rate to be slightly inflated, but this is less of a problem than too small message rate, which would cause unnecessary alerts. + """ + elapsed_time -= 2*MQTT_KEEP_ALIVE_SECS + else: + elapsed_time = time.perf_counter() - self.measuring_started_at + + print(f"started: {self.measuring_started_at}, stopped: {self.measuring_started_at + elapsed_time}") + + if IS_DEBUG: + print(f"Elapsed time {elapsed_time}, messages: {self.msg_count}") + + msg_per_second = self.msg_count / elapsed_time + self.msg_count = 0 + self.measuring_started_at = time.perf_counter() + self.measuring_stopped_at = None + + # Return None if elapsed_time is too small to calculate accurate result + return msg_per_second + + # Enable debugging if needed + def _on_log_callback(self, client, userdata, level, buf): + print(buf) def main(): """ @@ -105,23 +155,14 @@ def main(): topic = Topic(topic_address, topic_name, topic_port) topic_list.append(topic) + for topic in topic_list: + topic.listen_topic() + time_end = time.time() + MONITOR_PERIOD_IN_SECONDS # Keep listening to topics forever while True: - # (Re)start threads that are in is_running == False state - for topic in topic_list: - if topic.is_running == False: - print(f"Topic {topic.topic_name} was not running, starting it.") - - # Run listen_topic in a thread because we don't want to wait e.g. client.connect() - # If we didn't use threads here, one connection being stuck could cause long - # timeout for sending data to Azure. - t = Thread(target=topic.listen_topic) - t.daemon = False - t.start() - sleep_time = time_end - time.time() - print(sleep_time) + print(f"Sleeping for {sleep_time} secs") # Only sleep if sleep_time is positive. This can happen if sending data to Azure took longer than MONITOR_PERIOD_IN_SECONDS if sleep_time > 0: @@ -138,12 +179,21 @@ def main(): # Save message counters into topic_data_map and reset them in each topic for topic in topic_list: topic_data_map_key = f"{topic.topic_address}:{topic.topic_name}:{topic.topic_port}" - topic_data_map[topic_data_map_key] = topic.msg_count - topic.msg_count = 0 + topic_data_map_value = topic.get_msg_count() + if topic_data_map_value != None: + topic_data_map[topic_data_map_key] = topic_data_map_value t = Thread(target=send_mqtt_msg_count_to_azure, args=(topic_data_map,)) t.start() + # (Re)start threads that are in is_running == False state + for topic in topic_list: + if topic.is_running == False: + print(f"Topic {topic.topic_name} was not running, starting it.") + + topic.listen_topic() + + def send_mqtt_msg_count_to_azure(topic_data_map): """ Send custom metrics into azure. Documentation for the required format can be found from here: @@ -204,8 +254,7 @@ def get_series_array(topic_data_map): for key in topic_data_map: topic_msg_count = topic_data_map[key] - # We want message count to be messages per second - topic_msg_count = round(topic_msg_count/MONITOR_PERIOD_IN_SECONDS, 2) + topic_msg_count = round(topic_msg_count, 2) # If over 10, round to whole number if topic_msg_count > 10: From 3c129e6e3245a66eef5f2674619dafde4e0a8770 Mon Sep 17 00:00:00 2001 From: Jaakko Malkki Date: Fri, 2 Dec 2022 11:25:15 +0200 Subject: [PATCH 2/7] Do not try connecting to MQTT if connection is already starting --- mqtt_data_collector.py | 35 +++++++++++++++++++---------------- 1 file changed, 19 insertions(+), 16 deletions(-) diff --git a/mqtt_data_collector.py b/mqtt_data_collector.py index cae5ef9..c1ebd44 100644 --- a/mqtt_data_collector.py +++ b/mqtt_data_collector.py @@ -21,6 +21,7 @@ MONITOR_PERIOD_IN_SECONDS = 60 if IS_DEBUG == False else 20 class Topic: + is_starting = False # True while connecting to the broker, False when connected or disconnected is_running = False msg_count = 0 @@ -32,6 +33,9 @@ def __init__(self, topic_address, topic_name, topic_port): self.topic_name = topic_name self.topic_port = topic_port + def get_broker_address(self): + return f"{self.topic_address}:{self.topic_port}" + def print_status(self): print(f"[Status] {self.topic_name}: msg_count: {self.msg_count}, is_running: {self.is_running}") @@ -39,7 +43,11 @@ def listen_topic(self): """ Documentation for paho.mqtt.python: https://github.com/eclipse/paho.mqtt.python """ - self.is_running = True + if is_starting: + print(f"MQTT client is already connecting to {self.get_broker_address()}") + return + + self.is_starting = True self.measuring_started_at = None self.measuring_stopped_at = None @@ -50,26 +58,21 @@ def listen_topic(self): client.on_message = self._on_message_callback client.on_disconnect = self._on_disconnect_callback - try: - # Enable debugging if needed - #client.on_log = self._on_log_callback - - client.connect_async(self.topic_address, int(self.topic_port), MQTT_KEEP_ALIVE_SECS) + # Enable debugging if needed + #client.on_log = self._on_log_callback - print(f"Connecting to MQTT broker at {self.topic_address}:{self.topic_port}") - # Starts thread that processes network traffic and dispatches callbacks - client.loop_start() + client.connect_async(self.topic_address, int(self.topic_port), MQTT_KEEP_ALIVE_SECS) - except Exception as e: - print(f"Error on topic {self.topic_name} {self.topic_address} {self.topic_port}: {e}") - self.measuring_stopped_at = time.perf_counter() - client.disconnect() - self.is_running = False + print(f"Connecting to MQTT broker at {self.get_broker_address()}") + # Starts thread that processes network traffic and dispatches callbacks + client.loop_start() # The callback for when the client receives a CONNACK response from the server. def _on_connect_callback(self, client, userdata, flags, rc): - print(f"Connected to MQTT broker at {self.topic_address}:{self.topic_port}") + print(f"Connected to MQTT broker at {self.get_broker_address()}") + self.is_starting = False if rc == 0: + self.is_running = True client.subscribe(self.topic_name) self.measuring_started_at = time.perf_counter() self.measuring_stopped_at = None @@ -90,7 +93,7 @@ def _on_message_callback(self, client, userdata, msg): def get_msg_count(self): if self.measuring_started_at == None: - print(f"No data was measured for {self.topic_address}:{self.topic_port} on topic {self.topic_name}. Maybe the client was not connected?") + print(f"No data was measured for {self.get_broker_address()} on topic {self.topic_name}. Maybe the client was not connected?") return None if self.measuring_stopped_at != None: From 7142cb66932077b0b6562a070c866821bf2e4004 Mon Sep 17 00:00:00 2001 From: Jaakko Malkki Date: Fri, 2 Dec 2022 11:30:55 +0200 Subject: [PATCH 3/7] Use time.perf_counter() instead of time.time() for timing --- mqtt_data_collector.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/mqtt_data_collector.py b/mqtt_data_collector.py index c1ebd44..8693389 100644 --- a/mqtt_data_collector.py +++ b/mqtt_data_collector.py @@ -161,10 +161,10 @@ def main(): for topic in topic_list: topic.listen_topic() - time_end = time.time() + MONITOR_PERIOD_IN_SECONDS + time_end = time.perf_counter() + MONITOR_PERIOD_IN_SECONDS # Keep listening to topics forever while True: - sleep_time = time_end - time.time() + sleep_time = time_end - time.perf_counter() print(f"Sleeping for {sleep_time} secs") # Only sleep if sleep_time is positive. This can happen if sending data to Azure took longer than MONITOR_PERIOD_IN_SECONDS @@ -176,7 +176,7 @@ def main(): print("After sleep.") # Set time_end as MONITOR_PERIOD_IN_SECONDS in the future - time_end = time.time() + MONITOR_PERIOD_IN_SECONDS + time_end = time.perf_counter() + MONITOR_PERIOD_IN_SECONDS topic_data_map = {} # Save message counters into topic_data_map and reset them in each topic From 5f9dfc7d786118aff51e5c7a5cb5ddbe08e4fe93 Mon Sep 17 00:00:00 2001 From: Jaakko Malkki Date: Fri, 2 Dec 2022 11:34:45 +0200 Subject: [PATCH 4/7] If there is no data to send to Azure (e.g. when MQTT connections are down), do not send empty array --- mqtt_data_collector.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/mqtt_data_collector.py b/mqtt_data_collector.py index 8693389..a8f1b40 100644 --- a/mqtt_data_collector.py +++ b/mqtt_data_collector.py @@ -211,6 +211,9 @@ def send_mqtt_msg_count_to_azure(topic_data_map): time_str = datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S") series_array = get_series_array(topic_data_map) + if not series_array: + print("No data to send to Azure") + return custom_metric_object = { # Time (timestamp): Date and time at which the metric is measured or collected From ccccd96eb3654b9e8e3586e327eaf87f7cedcd5c Mon Sep 17 00:00:00 2001 From: Jaakko Malkki Date: Fri, 2 Dec 2022 11:36:05 +0200 Subject: [PATCH 5/7] Fix checking if MQTT is already connecting --- mqtt_data_collector.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mqtt_data_collector.py b/mqtt_data_collector.py index a8f1b40..450b2ec 100644 --- a/mqtt_data_collector.py +++ b/mqtt_data_collector.py @@ -43,7 +43,7 @@ def listen_topic(self): """ Documentation for paho.mqtt.python: https://github.com/eclipse/paho.mqtt.python """ - if is_starting: + if self.is_starting: print(f"MQTT client is already connecting to {self.get_broker_address()}") return From b0ce39ced5fa6106bd8bfc135a832728c504ccfb Mon Sep 17 00:00:00 2001 From: Jaakko Malkki Date: Fri, 2 Dec 2022 11:55:20 +0200 Subject: [PATCH 6/7] Update comments and logging --- mqtt_data_collector.py | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/mqtt_data_collector.py b/mqtt_data_collector.py index 450b2ec..21e815d 100644 --- a/mqtt_data_collector.py +++ b/mqtt_data_collector.py @@ -79,7 +79,7 @@ def _on_connect_callback(self, client, userdata, flags, rc): else: print(f"Error on connecting {client}, is our IP whitelisted for the topic?") - # Enable debugging if needed + # Called when MQTT is disconnected def _on_disconnect_callback(self, client, userdata, rc): print(f"Disconnected from {self.topic_address}, rc: {rc}") self.measuring_stopped_at = time.perf_counter() @@ -101,10 +101,11 @@ def get_msg_count(self): # If data was collected for too short period, we can't accurately calculate the message rate if elapsed_time < 10*MQTT_KEEP_ALIVE_SECS: + # Return None if elapsed_time is too small to calculate accurate result return None """ - Adjust elapsed_time to account the time it took to detect that the connection was down. + Adjust elapsed_time to account for the time it took to detect that the connection was down. This should take roughly 2 times the duration of MQTT keep alive interval. This adjustment can cause the message rate to be slightly inflated, but this is less of a problem than too small message rate, which would cause unnecessary alerts. """ @@ -112,9 +113,8 @@ def get_msg_count(self): else: elapsed_time = time.perf_counter() - self.measuring_started_at - print(f"started: {self.measuring_started_at}, stopped: {self.measuring_started_at + elapsed_time}") - if IS_DEBUG: + print(f"started: {self.measuring_started_at}, stopped: {self.measuring_started_at + elapsed_time}") print(f"Elapsed time {elapsed_time}, messages: {self.msg_count}") msg_per_second = self.msg_count / elapsed_time @@ -122,12 +122,11 @@ def get_msg_count(self): self.measuring_started_at = time.perf_counter() self.measuring_stopped_at = None - # Return None if elapsed_time is too small to calculate accurate result return msg_per_second # Enable debugging if needed - def _on_log_callback(self, client, userdata, level, buf): - print(buf) + # def _on_log_callback(self, client, userdata, level, buf): + # print(buf) def main(): """ @@ -173,7 +172,7 @@ def main(): time.sleep(sleep_time) # TODO: remove this logging later when not needed - print("After sleep.") + # print("After sleep.") # Set time_end as MONITOR_PERIOD_IN_SECONDS in the future time_end = time.perf_counter() + MONITOR_PERIOD_IN_SECONDS From 44f26c12c6b426e308dcdc8f0af737d02d7738f5 Mon Sep 17 00:00:00 2001 From: Jaakko Malkki Date: Fri, 2 Dec 2022 14:43:29 +0200 Subject: [PATCH 7/7] Increase MQTT keep alive to 5s --- mqtt_data_collector.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/mqtt_data_collector.py b/mqtt_data_collector.py index 21e815d..4c4d463 100644 --- a/mqtt_data_collector.py +++ b/mqtt_data_collector.py @@ -13,7 +13,7 @@ # MQTT keep alive interval # This needs to be small enough to detect if the connection is down so that message rate will be calculated correctly -MQTT_KEEP_ALIVE_SECS = 1 +MQTT_KEEP_ALIVE_SECS = 5 IS_DEBUG = os.getenv('IS_DEBUG') == "True" @@ -100,7 +100,7 @@ def get_msg_count(self): elapsed_time = self.measuring_stopped_at - self.measuring_started_at # If data was collected for too short period, we can't accurately calculate the message rate - if elapsed_time < 10*MQTT_KEEP_ALIVE_SECS: + if elapsed_time < min(25, 10*MQTT_KEEP_ALIVE_SECS): # Return None if elapsed_time is too small to calculate accurate result return None