diff --git a/mqtt_data_collector.py b/mqtt_data_collector.py index eb68d56..9339a7e 100644 --- a/mqtt_data_collector.py +++ b/mqtt_data_collector.py @@ -14,107 +14,129 @@ # How long to listen to the topics until we send data to Azure. Should be 60 in production MONITOR_PERIOD_IN_SECONDS = 60 if IS_DEBUG == False else 3 +class Topic: + is_running = False + msg_count = 0 + + def __init__(self, topic_address, topic_name, topic_port): + self.topic_address = topic_address + self.topic_name = topic_name + self.topic_port = topic_port + + def start_listening_topic_in_thread(self): + self.is_running = True + t = Thread(target=self._listen_topic) + t.start() + + def print_status(self): + print(f"[Status] {self.topic_name}: msg_count: {self.msg_count}, is_running: {self.is_running}") + + def _listen_topic(self): + """ + Documentation for paho.mqtt.python: https://github.com/eclipse/paho.mqtt.python + """ + client = mqtt.Client() + client.on_connect = self._on_connect_callback() + client.on_message = self._on_message_callback() + + try: + # Enable debugging if needed + # client.on_log = on_log_callback + # client.on_disconnect = on_disconnect_callback + + client.connect(self.topic_address, int(self.topic_port), MONITOR_PERIOD_IN_SECONDS) + + # Call that processes network traffic, dispatches callbacks and + # handles reconnecting. + client.loop_start() + + except Exception as e: + print(f"Error on topic {self.topic_name} {self.topic_address} {self.topic_port}: {e}") + client.disconnect() + self.is_running = False + + # The callback for when the client receives a CONNACK response from the server. + def _on_connect_callback(self): + def on_connect(client, userdata, flags, rc): + if rc == 0: + client.subscribe(self.topic_name) + else: + print(f"Error on connecting {client}, is our IP whitelisted for the topic?") + + return on_connect + + # # The callback for when a PUBLISH message is received from the server. + def _on_message_callback(self): + def on_message(client, userdata, msg): + self.msg_count += 1 + # print(msg.topic+" "+str(msg.payload)) + return on_message + + # Enable debugging if needed + # def on_log_callback(self, client, userdata, level, buf): + # print(buf) + + # Enable debugging if needed + # def on_disconnect_callback(self, client, userdata, rc): + # print("Disconnected") + + def main(): """ - In order for this to work, info for each topic (IP address, topic name and port) has to be defined in .env file with format: TOPIC= + Listens each topic continuously in a thread. Sends topic messages count per second every + minute to Azure Monitor. - Creates a thread for each topic to listen - After listening to all the threads is finished, send data to Azure Monitor + In order for this to work, info for each topic (IP address, topic name and port) has to + be defined in .env file with format: TOPIC= """ - topic_data_collection = {} + print("Starting MQTT topic listener...") + + topic_list = [] index = 1 - threads = [] while True: topic_data_string = os.getenv(f'TOPIC{index}') index += 1 if (topic_data_string is None): break + if topic_data_string is None or topic_data_string.count(',') != 2: + raise Exception( + f"Some topic data was missing. Required data: address,topic,port. We got: {topic_data_string}") + topic_data_array = topic_data_string.split(",") + topic_address = topic_data_array[0] + topic_name = topic_data_array[1] + topic_port = topic_data_array[2] + if (topic_address is None or topic_name is None or topic_port is None): + raise Exception(f"Some required topic data was missing, topic_address: {topic_address}, topic_name: {topic_name}, topic_port: {topic_port}") + topic = Topic(topic_address, topic_name, topic_port) + topic_list.append(topic) - def listen_topic_thread(topic_data_string): - if topic_data_string is None or topic_data_string.count(',') != 2: - raise Exception( - f"Some topic data was missing. Required data: address,topic,port. We got: {topic_data_string}") - topic_data_array = topic_data_string.split(",") - topic_address = topic_data_array[0] - topic_name = topic_data_array[1] - topic_port = topic_data_array[2] - if (topic_address is None or topic_name is None or topic_port is None): - raise Exception(f"Some required topic data was missing, topic_address: {topic_address}, topic_name: {topic_name}, topic_port: {topic_port}") - topic_data_collection_key = f"{topic_address}:{topic_name}:{topic_port}" - topic_data_collection[topic_data_collection_key] = 0 - - listen_topic(topic_data_collection, topic_data_collection_key, topic_address, topic_name, topic_port) - t = Thread(target=listen_topic_thread, args=(topic_data_string,)) - threads.append(t) - - # Start all threads simultaneously - for i in range(len(threads)): - threads[i].start() - - # Wait for all the threads to finish - for i in range(len(threads)): - threads[i].join() - - if IS_DEBUG: - print(topic_data_collection) - else: - send_mqtt_msg_count_into_azure(topic_data_collection) - print(f'Mqtt metrics sent: {datetime.now().strftime("%Y-%m-%dT%H:%M:%S")}') - -def listen_topic(topic_data_collection, topic_data_collection_key, address, topic, port): - """ - Documentation for paho.mqtt.python: https://github.com/eclipse/paho.mqtt.python - """ time_end = time.time() + MONITOR_PERIOD_IN_SECONDS + # Keep listening to topics forever + while True: + # (Re)start threads that are in is_running == False state + for topic in topic_list: + if topic.is_running == False: + print(f"Topic {topic.topic_name} was not running, starting it.") + topic.start_listening_topic_in_thread() + + # If listen period has passed, send data to Azure + if time.time() > time_end: + topic_data_map = {} + for topic in topic_list: + topic_data_map[topic.topic_name] = topic.msg_count + topic.msg_count = 0 + + if IS_DEBUG: + print(topic_data_map) + else: + send_mqtt_msg_count_into_azure(topic_data_map) + print(f"Mqtt metrics sent: {datetime.now().strftime('%Y-%m-%dT%H:%M:%S')}") + + time_end = time.time() + MONITOR_PERIOD_IN_SECONDS - client = mqtt.Client() - client.on_connect = on_connect_callback(topic) - client.on_message = on_message_callback(topic_data_collection, topic_data_collection_key) - # Enable debugging if needed - # client.on_log = on_log_callback - # client.on_disconnect = on_disconnect_callback - - try: - client.connect(address, int(port), MONITOR_PERIOD_IN_SECONDS) - except: - print(f'Error: could not connect to {address} {topic} {port}') - - # Call that processes network traffic, dispatches callbacks and - # handles reconnecting. - client.loop_start() - - while time.time() < time_end: time.sleep(1) - client.loop_stop() - -# The callback for when the client receives a CONNACK response from the server. -def on_connect_callback(topic): - def on_connect(client, userdata, flags, rc): - if rc == 0: - client.subscribe(topic) - else: - print(f'Error on connecting {client}, is our IP whitelisted for the topic?') - return on_connect - -# # The callback for when a PUBLISH message is received from the server. -def on_message_callback(topic_data_collection, topic_data_collection_key): - - def on_message(client, userdata, msg): - topic_data_collection[topic_data_collection_key] += 1 - # print(msg.topic+" "+str(msg.payload)) - - return on_message - -# Enable debugging if needed -# def on_log_callback(client, userdata, level, buf): - # print(buf) - -# Enable debugging if needed -# def on_disconnect_callback(client, userdata, rc): -# print("Disconnected") - -def send_mqtt_msg_count_into_azure(topic_data_collection): +def send_mqtt_msg_count_into_azure(topic_data_map): """ Send custom metrics into azure. Documentation for the required format can be found from here: https://docs.microsoft.com/en-us/azure/azure-monitor/essentials/metrics-custom-overview @@ -127,7 +149,7 @@ def send_mqtt_msg_count_into_azure(topic_data_collection): # Azure wants time in UTC ISO 8601 format time = datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S") - series_array = get_series_array(topic_data_collection) + series_array = get_series_array(topic_data_map) custom_metric_object = { # Time (timestamp): Date and time at which the metric is measured or collected @@ -152,10 +174,10 @@ def send_mqtt_msg_count_into_azure(topic_data_collection): send_custom_metrics_request(custom_metric_json, attempts_remaining=3) -def get_series_array(topic_data_collection): +def get_series_array(topic_data_map): series_array = [] - for key in topic_data_collection: - topic_msg_count = topic_data_collection[key] + for key in topic_data_map: + topic_msg_count = topic_data_map[key] # We want message count to be messages per second topic_msg_count = round(topic_msg_count/MONITOR_PERIOD_IN_SECONDS, 2) diff --git a/pulsar_data_collector.py b/pulsar_data_collector.py index a375232..a5d11b3 100644 --- a/pulsar_data_collector.py +++ b/pulsar_data_collector.py @@ -43,19 +43,23 @@ def main(): - topic_data_collection = {} + # Structure: + # key: topic_name: + # value: topic_data: + topic_data_map = {} + # Merge all topic name lists as a single array collect_data_from_topics_list = list(set(TOPIC_NAMES_TO_COLLECT_MSG_RATE_IN + TOPIC_NAMES_TO_COLLECT_MSG_RATE_OUT + TOPIC_NAMES_TO_COLLECT_STORAGE_SIZE)) for topic_name in collect_data_from_topics_list: topic_data = collect_data_from_topic(topic_name) if topic_data != None: - topic_data_collection[topic_name] = topic_data + topic_data_map[topic_name] = topic_data - if bool(topic_data_collection): - send_metrics_into_azure(topic_data_collection) + if bool(topic_data_map): + send_metrics_into_azure(topic_data_map) else: - print(f'Not sending metrics, topic_data_collection was empty.') + print(f'Not sending metrics, topic_data_map was empty.') def collect_data_from_topic(topic_name): pulsar_url = f'{ADMIN_URL}/admin/v2/persistent/{NAMESPACE}/{topic_name}/stats' @@ -70,16 +74,16 @@ def collect_data_from_topic(topic_name): except Exception as e: print(f'Failed to send a POST request to {pulsar_url}. Is pulsar running and accepting requests?') -def send_metrics_into_azure(topic_data_collection): - send_pulsar_topic_metric_into_azure(METRIC_MSG_RATE_IN, "msgRateIn", topic_data_collection, TOPIC_NAMES_TO_COLLECT_MSG_RATE_IN) - send_pulsar_topic_metric_into_azure(METRIC_MSG_RATE_OUT, "msgRateOut", topic_data_collection, TOPIC_NAMES_TO_COLLECT_MSG_RATE_OUT) - send_pulsar_topic_metric_into_azure(METRIC_STORAGE_SIZE, "storageSize", topic_data_collection, TOPIC_NAMES_TO_COLLECT_STORAGE_SIZE) +def send_metrics_into_azure(topic_data_map): + send_pulsar_topic_metric_into_azure(METRIC_MSG_RATE_IN, "msgRateIn", topic_data_map, TOPIC_NAMES_TO_COLLECT_MSG_RATE_IN) + send_pulsar_topic_metric_into_azure(METRIC_MSG_RATE_OUT, "msgRateOut", topic_data_map, TOPIC_NAMES_TO_COLLECT_MSG_RATE_OUT) + send_pulsar_topic_metric_into_azure(METRIC_STORAGE_SIZE, "storageSize", topic_data_map, TOPIC_NAMES_TO_COLLECT_STORAGE_SIZE) print(f'Pulsar metrics sent: {datetime.now().strftime("%Y-%m-%dT%H:%M:%S")}') def send_pulsar_topic_metric_into_azure( log_analytics_metric_name, topic_data_metric_name, - topic_data_collection, + topic_data_map, topic_names_to_collect ): """ @@ -94,7 +98,7 @@ def send_pulsar_topic_metric_into_azure( # Azure wants time in UTC ISO 8601 format time = datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S") - series_array = get_series_array(topic_data_collection, topic_data_metric_name, topic_names_to_collect) + series_array = get_series_array(topic_data_map, topic_data_metric_name, topic_names_to_collect) custom_metric_object = { # Time (timestamp): Date and time at which the metric is measured or collected @@ -122,10 +126,10 @@ def send_pulsar_topic_metric_into_azure( else: send_custom_metrics_request(custom_metric_json, 3) -def get_series_array(topic_data_collection, topic_data_metric_name, topic_names_to_collect): +def get_series_array(topic_data_map, topic_data_metric_name, topic_names_to_collect): series_array = [] for topic_name in topic_names_to_collect: - topic_msg_count = topic_data_collection[topic_name][topic_data_metric_name] + topic_msg_count = topic_data_map[topic_name][topic_data_metric_name] topic_msg_count = round(topic_msg_count, 2)