Skip to content

Commit

Permalink
Optimizations to mqtt topic listening
Browse files Browse the repository at this point in the history
  • Loading branch information
niemijoe committed Aug 23, 2022
1 parent eec06e0 commit cc74c0a
Showing 1 changed file with 24 additions and 27 deletions.
51 changes: 24 additions & 27 deletions mqtt_data_collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,9 @@ def listen_topic(self):
self.is_running = True

client = mqtt.Client()
client.on_connect = self._on_connect_callback()
client.on_message = self._on_message_callback()

client.on_connect = self._on_connect_callback
client.on_message = self._on_message_callback

try:
# Enable debugging if needed
Expand All @@ -53,21 +54,16 @@ def listen_topic(self):
self.is_running = False

# The callback for when the client receives a CONNACK response from the server.
def _on_connect_callback(self):
def on_connect(client, userdata, flags, rc):
if rc == 0:
client.subscribe(self.topic_name)
else:
print(f"Error on connecting {client}, is our IP whitelisted for the topic?")

return on_connect
def _on_connect_callback(self, client, userdata, flags, rc):
if rc == 0:
client.subscribe(self.topic_name)
else:
print(f"Error on connecting {client}, is our IP whitelisted for the topic?")

# # The callback for when a PUBLISH message is received from the server.
def _on_message_callback(self):
def on_message(client, userdata, msg):
self.msg_count += 1
# print(msg.topic+" "+str(msg.payload))
return on_message
def _on_message_callback(self, client, userdata, msg):
self.msg_count += 1
# print(msg.topic+" "+str(msg.payload))

# Enable debugging if needed
# def on_log_callback(self, client, userdata, level, buf):
Expand Down Expand Up @@ -116,18 +112,19 @@ def main():
print(f"Topic {topic.topic_name} was not running, starting it.")
topic.listen_topic()

# If listen period has passed, send data to Azure
if time.time() > time_end:
time_end = time.time() + MONITOR_PERIOD_IN_SECONDS
topic_data_map = {}
for topic in topic_list:
topic_data_map_key = f"{topic.topic_address}:{topic.topic_name}:{topic.topic_port}"
topic_data_map[topic_data_map_key] = topic.msg_count
topic.msg_count = 0
send_mqtt_msg_count_into_azure(topic_data_map)
time.sleep(1)

def send_mqtt_msg_count_into_azure(topic_data_map):
sleep_time = time_end - time.time()
print(sleep_time)
# When listen period has passed, send data to Azure
time.sleep(sleep_time)
time_end = time.time() + MONITOR_PERIOD_IN_SECONDS
topic_data_map = {}
for topic in topic_list:
topic_data_map_key = f"{topic.topic_address}:{topic.topic_name}:{topic.topic_port}"
topic_data_map[topic_data_map_key] = topic.msg_count
topic.msg_count = 0
send_mqtt_msg_count_to_azure(topic_data_map)

def send_mqtt_msg_count_to_azure(topic_data_map):
"""
Send custom metrics into azure. Documentation for the required format can be found from here:
https://docs.microsoft.com/en-us/azure/azure-monitor/essentials/metrics-custom-overview
Expand Down

0 comments on commit cc74c0a

Please sign in to comment.