Skip to content

Commit

Permalink
Listen topics without threads
Browse files Browse the repository at this point in the history
  • Loading branch information
niemijoe committed Aug 23, 2022
1 parent b52c7d3 commit eec06e0
Showing 1 changed file with 11 additions and 17 deletions.
28 changes: 11 additions & 17 deletions mqtt_data_collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,15 @@ def __init__(self, topic_address, topic_name, topic_port):
self.topic_name = topic_name
self.topic_port = topic_port

def start_listening_topic_in_thread(self):
self.is_running = True
t = Thread(target=self._listen_topic)
t.start()

def print_status(self):
print(f"[Status] {self.topic_name}: msg_count: {self.msg_count}, is_running: {self.is_running}")

def _listen_topic(self):
def listen_topic(self):
"""
Documentation for paho.mqtt.python: https://github.com/eclipse/paho.mqtt.python
"""
self.is_running = True

client = mqtt.Client()
client.on_connect = self._on_connect_callback()
client.on_message = self._on_message_callback()
Expand Down Expand Up @@ -117,24 +114,17 @@ def main():
for topic in topic_list:
if topic.is_running == False:
print(f"Topic {topic.topic_name} was not running, starting it.")
topic.start_listening_topic_in_thread()
topic.listen_topic()

# If listen period has passed, send data to Azure
if time.time() > time_end:
time_end = time.time() + MONITOR_PERIOD_IN_SECONDS
topic_data_map = {}
for topic in topic_list:
topic_data_map_key = f"{topic.topic_address}:{topic.topic_name}:{topic.topic_port}"
topic_data_map[topic_data_map_key] = topic.msg_count
topic.msg_count = 0

if IS_DEBUG:
print(topic_data_map)
else:
send_mqtt_msg_count_into_azure(topic_data_map)
print(f"Mqtt metrics sent: {datetime.now().strftime('%Y-%m-%dT%H:%M:%S')}")

time_end = time.time() + MONITOR_PERIOD_IN_SECONDS

send_mqtt_msg_count_into_azure(topic_data_map)
time.sleep(1)

def send_mqtt_msg_count_into_azure(topic_data_map):
Expand Down Expand Up @@ -173,7 +163,11 @@ def send_mqtt_msg_count_into_azure(topic_data_map):

custom_metric_json = json.dumps(custom_metric_object)

send_custom_metrics_request(custom_metric_json, attempts_remaining=3)
if IS_DEBUG:
print(custom_metric_json)
else:
send_custom_metrics_request(custom_metric_json, attempts_remaining=3)
print(f"Mqtt metrics sent: {datetime.now().strftime('%Y-%m-%dT%H:%M:%S')}")

def get_series_array(topic_data_map):
series_array = []
Expand Down

0 comments on commit eec06e0

Please sign in to comment.