diff --git a/mqtt_data_collector.py b/mqtt_data_collector.py index 9d49db8..4c4d463 100644 --- a/mqtt_data_collector.py +++ b/mqtt_data_collector.py @@ -11,20 +11,31 @@ load_dotenv() +# MQTT keep alive interval +# This needs to be small enough to detect if the connection is down so that message rate will be calculated correctly +MQTT_KEEP_ALIVE_SECS = 5 + IS_DEBUG = os.getenv('IS_DEBUG') == "True" # How long to listen to the topics until we send data to Azure. Should be 60 in production -MONITOR_PERIOD_IN_SECONDS = 60 if IS_DEBUG == False else 3 +MONITOR_PERIOD_IN_SECONDS = 60 if IS_DEBUG == False else 20 class Topic: + is_starting = False # True while connecting to the broker, False when connected or disconnected is_running = False msg_count = 0 + measuring_started_at = None + measuring_stopped_at = None + def __init__(self, topic_address, topic_name, topic_port): self.topic_address = topic_address self.topic_name = topic_name self.topic_port = topic_port + def get_broker_address(self): + return f"{self.topic_address}:{self.topic_port}" + def print_status(self): print(f"[Status] {self.topic_name}: msg_count: {self.msg_count}, is_running: {self.is_running}") @@ -32,49 +43,90 @@ def listen_topic(self): """ Documentation for paho.mqtt.python: https://github.com/eclipse/paho.mqtt.python """ - self.is_running = True + if self.is_starting: + print(f"MQTT client is already connecting to {self.get_broker_address()}") + return + + self.is_starting = True + + self.measuring_started_at = None + self.measuring_stopped_at = None client = mqtt.Client() client.on_connect = self._on_connect_callback client.on_message = self._on_message_callback + client.on_disconnect = self._on_disconnect_callback - try: - # Enable debugging if needed - # client.on_log = on_log_callback - # client.on_disconnect = on_disconnect_callback - - client.connect(self.topic_address, int(self.topic_port), MONITOR_PERIOD_IN_SECONDS) + # Enable debugging if needed + #client.on_log = self._on_log_callback - # Call that processes network traffic, dispatches callbacks and - # handles reconnecting. - client.loop_start() + client.connect_async(self.topic_address, int(self.topic_port), MQTT_KEEP_ALIVE_SECS) - except Exception as e: - print(f"Error on topic {self.topic_name} {self.topic_address} {self.topic_port}: {e}") - client.disconnect() - self.is_running = False + print(f"Connecting to MQTT broker at {self.get_broker_address()}") + # Starts thread that processes network traffic and dispatches callbacks + client.loop_start() # The callback for when the client receives a CONNACK response from the server. def _on_connect_callback(self, client, userdata, flags, rc): + print(f"Connected to MQTT broker at {self.get_broker_address()}") + self.is_starting = False if rc == 0: + self.is_running = True client.subscribe(self.topic_name) + self.measuring_started_at = time.perf_counter() + self.measuring_stopped_at = None else: print(f"Error on connecting {client}, is our IP whitelisted for the topic?") + # Called when MQTT is disconnected + def _on_disconnect_callback(self, client, userdata, rc): + print(f"Disconnected from {self.topic_address}, rc: {rc}") + self.measuring_stopped_at = time.perf_counter() + client.loop_stop() + self.is_running = False + # # The callback for when a PUBLISH message is received from the server. def _on_message_callback(self, client, userdata, msg): self.msg_count += 1 # print(msg.topic+" "+str(msg.payload)) - # Enable debugging if needed - # def on_log_callback(self, client, userdata, level, buf): - # print(buf) + def get_msg_count(self): + if self.measuring_started_at == None: + print(f"No data was measured for {self.get_broker_address()} on topic {self.topic_name}. Maybe the client was not connected?") + return None + + if self.measuring_stopped_at != None: + elapsed_time = self.measuring_stopped_at - self.measuring_started_at + + # If data was collected for too short period, we can't accurately calculate the message rate + if elapsed_time < min(25, 10*MQTT_KEEP_ALIVE_SECS): + # Return None if elapsed_time is too small to calculate accurate result + return None + + """ + Adjust elapsed_time to account for the time it took to detect that the connection was down. + This should take roughly 2 times the duration of MQTT keep alive interval. + This adjustment can cause the message rate to be slightly inflated, but this is less of a problem than too small message rate, which would cause unnecessary alerts. + """ + elapsed_time -= 2*MQTT_KEEP_ALIVE_SECS + else: + elapsed_time = time.perf_counter() - self.measuring_started_at - # Enable debugging if needed - # def on_disconnect_callback(self, client, userdata, rc): - # print("Disconnected") + if IS_DEBUG: + print(f"started: {self.measuring_started_at}, stopped: {self.measuring_started_at + elapsed_time}") + print(f"Elapsed time {elapsed_time}, messages: {self.msg_count}") + + msg_per_second = self.msg_count / elapsed_time + self.msg_count = 0 + self.measuring_started_at = time.perf_counter() + self.measuring_stopped_at = None + return msg_per_second + + # Enable debugging if needed + # def _on_log_callback(self, client, userdata, level, buf): + # print(buf) def main(): """ @@ -105,23 +157,14 @@ def main(): topic = Topic(topic_address, topic_name, topic_port) topic_list.append(topic) - time_end = time.time() + MONITOR_PERIOD_IN_SECONDS + for topic in topic_list: + topic.listen_topic() + + time_end = time.perf_counter() + MONITOR_PERIOD_IN_SECONDS # Keep listening to topics forever while True: - # (Re)start threads that are in is_running == False state - for topic in topic_list: - if topic.is_running == False: - print(f"Topic {topic.topic_name} was not running, starting it.") - - # Run listen_topic in a thread because we don't want to wait e.g. client.connect() - # If we didn't use threads here, one connection being stuck could cause long - # timeout for sending data to Azure. - t = Thread(target=topic.listen_topic) - t.daemon = False - t.start() - - sleep_time = time_end - time.time() - print(sleep_time) + sleep_time = time_end - time.perf_counter() + print(f"Sleeping for {sleep_time} secs") # Only sleep if sleep_time is positive. This can happen if sending data to Azure took longer than MONITOR_PERIOD_IN_SECONDS if sleep_time > 0: @@ -129,21 +172,30 @@ def main(): time.sleep(sleep_time) # TODO: remove this logging later when not needed - print("After sleep.") + # print("After sleep.") # Set time_end as MONITOR_PERIOD_IN_SECONDS in the future - time_end = time.time() + MONITOR_PERIOD_IN_SECONDS + time_end = time.perf_counter() + MONITOR_PERIOD_IN_SECONDS topic_data_map = {} # Save message counters into topic_data_map and reset them in each topic for topic in topic_list: topic_data_map_key = f"{topic.topic_address}:{topic.topic_name}:{topic.topic_port}" - topic_data_map[topic_data_map_key] = topic.msg_count - topic.msg_count = 0 + topic_data_map_value = topic.get_msg_count() + if topic_data_map_value != None: + topic_data_map[topic_data_map_key] = topic_data_map_value t = Thread(target=send_mqtt_msg_count_to_azure, args=(topic_data_map,)) t.start() + # (Re)start threads that are in is_running == False state + for topic in topic_list: + if topic.is_running == False: + print(f"Topic {topic.topic_name} was not running, starting it.") + + topic.listen_topic() + + def send_mqtt_msg_count_to_azure(topic_data_map): """ Send custom metrics into azure. Documentation for the required format can be found from here: @@ -158,6 +210,9 @@ def send_mqtt_msg_count_to_azure(topic_data_map): time_str = datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S") series_array = get_series_array(topic_data_map) + if not series_array: + print("No data to send to Azure") + return custom_metric_object = { # Time (timestamp): Date and time at which the metric is measured or collected @@ -204,8 +259,7 @@ def get_series_array(topic_data_map): for key in topic_data_map: topic_msg_count = topic_data_map[key] - # We want message count to be messages per second - topic_msg_count = round(topic_msg_count/MONITOR_PERIOD_IN_SECONDS, 2) + topic_msg_count = round(topic_msg_count, 2) # If over 10, round to whole number if topic_msg_count > 10: