From cc74c0a66257918228e8a933bf02613aaf9e5ba1 Mon Sep 17 00:00:00 2001 From: niemijoe Date: Tue, 23 Aug 2022 13:37:15 +0300 Subject: [PATCH] Optimizations to mqtt topic listening --- mqtt_data_collector.py | 51 ++++++++++++++++++++---------------------- 1 file changed, 24 insertions(+), 27 deletions(-) diff --git a/mqtt_data_collector.py b/mqtt_data_collector.py index a1d2a20..1879825 100644 --- a/mqtt_data_collector.py +++ b/mqtt_data_collector.py @@ -33,8 +33,9 @@ def listen_topic(self): self.is_running = True client = mqtt.Client() - client.on_connect = self._on_connect_callback() - client.on_message = self._on_message_callback() + + client.on_connect = self._on_connect_callback + client.on_message = self._on_message_callback try: # Enable debugging if needed @@ -53,21 +54,16 @@ def listen_topic(self): self.is_running = False # The callback for when the client receives a CONNACK response from the server. - def _on_connect_callback(self): - def on_connect(client, userdata, flags, rc): - if rc == 0: - client.subscribe(self.topic_name) - else: - print(f"Error on connecting {client}, is our IP whitelisted for the topic?") - - return on_connect + def _on_connect_callback(self, client, userdata, flags, rc): + if rc == 0: + client.subscribe(self.topic_name) + else: + print(f"Error on connecting {client}, is our IP whitelisted for the topic?") # # The callback for when a PUBLISH message is received from the server. - def _on_message_callback(self): - def on_message(client, userdata, msg): - self.msg_count += 1 - # print(msg.topic+" "+str(msg.payload)) - return on_message + def _on_message_callback(self, client, userdata, msg): + self.msg_count += 1 + # print(msg.topic+" "+str(msg.payload)) # Enable debugging if needed # def on_log_callback(self, client, userdata, level, buf): @@ -116,18 +112,19 @@ def main(): print(f"Topic {topic.topic_name} was not running, starting it.") topic.listen_topic() - # If listen period has passed, send data to Azure - if time.time() > time_end: - time_end = time.time() + MONITOR_PERIOD_IN_SECONDS - topic_data_map = {} - for topic in topic_list: - topic_data_map_key = f"{topic.topic_address}:{topic.topic_name}:{topic.topic_port}" - topic_data_map[topic_data_map_key] = topic.msg_count - topic.msg_count = 0 - send_mqtt_msg_count_into_azure(topic_data_map) - time.sleep(1) - -def send_mqtt_msg_count_into_azure(topic_data_map): + sleep_time = time_end - time.time() + print(sleep_time) + # When listen period has passed, send data to Azure + time.sleep(sleep_time) + time_end = time.time() + MONITOR_PERIOD_IN_SECONDS + topic_data_map = {} + for topic in topic_list: + topic_data_map_key = f"{topic.topic_address}:{topic.topic_name}:{topic.topic_port}" + topic_data_map[topic_data_map_key] = topic.msg_count + topic.msg_count = 0 + send_mqtt_msg_count_to_azure(topic_data_map) + +def send_mqtt_msg_count_to_azure(topic_data_map): """ Send custom metrics into azure. Documentation for the required format can be found from here: https://docs.microsoft.com/en-us/azure/azure-monitor/essentials/metrics-custom-overview