diff --git a/mqtt_data_collector.py b/mqtt_data_collector.py index 7938344..a1d2a20 100644 --- a/mqtt_data_collector.py +++ b/mqtt_data_collector.py @@ -23,18 +23,15 @@ def __init__(self, topic_address, topic_name, topic_port): self.topic_name = topic_name self.topic_port = topic_port - def start_listening_topic_in_thread(self): - self.is_running = True - t = Thread(target=self._listen_topic) - t.start() - def print_status(self): print(f"[Status] {self.topic_name}: msg_count: {self.msg_count}, is_running: {self.is_running}") - def _listen_topic(self): + def listen_topic(self): """ Documentation for paho.mqtt.python: https://github.com/eclipse/paho.mqtt.python """ + self.is_running = True + client = mqtt.Client() client.on_connect = self._on_connect_callback() client.on_message = self._on_message_callback() @@ -117,24 +114,17 @@ def main(): for topic in topic_list: if topic.is_running == False: print(f"Topic {topic.topic_name} was not running, starting it.") - topic.start_listening_topic_in_thread() + topic.listen_topic() # If listen period has passed, send data to Azure if time.time() > time_end: + time_end = time.time() + MONITOR_PERIOD_IN_SECONDS topic_data_map = {} for topic in topic_list: topic_data_map_key = f"{topic.topic_address}:{topic.topic_name}:{topic.topic_port}" topic_data_map[topic_data_map_key] = topic.msg_count topic.msg_count = 0 - - if IS_DEBUG: - print(topic_data_map) - else: - send_mqtt_msg_count_into_azure(topic_data_map) - print(f"Mqtt metrics sent: {datetime.now().strftime('%Y-%m-%dT%H:%M:%S')}") - - time_end = time.time() + MONITOR_PERIOD_IN_SECONDS - + send_mqtt_msg_count_into_azure(topic_data_map) time.sleep(1) def send_mqtt_msg_count_into_azure(topic_data_map): @@ -173,7 +163,11 @@ def send_mqtt_msg_count_into_azure(topic_data_map): custom_metric_json = json.dumps(custom_metric_object) - send_custom_metrics_request(custom_metric_json, attempts_remaining=3) + if IS_DEBUG: + print(custom_metric_json) + else: + send_custom_metrics_request(custom_metric_json, attempts_remaining=3) + print(f"Mqtt metrics sent: {datetime.now().strftime('%Y-%m-%dT%H:%M:%S')}") def get_series_array(topic_data_map): series_array = []