Skip to content

Commit

Permalink
Merge pull request #1 from HSLdevcom/listen_mqtt_topics_continuously
Browse files Browse the repository at this point in the history
Listen to mqtt topics continuously
  • Loading branch information
Joe Niemi authored Aug 22, 2022
2 parents 0d5e81e + 4b69b06 commit aca2a30
Show file tree
Hide file tree
Showing 2 changed files with 129 additions and 103 deletions.
202 changes: 112 additions & 90 deletions mqtt_data_collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,107 +14,129 @@
# How long to listen to the topics until we send data to Azure. Should be 60 in production
MONITOR_PERIOD_IN_SECONDS = 60 if IS_DEBUG == False else 3

class Topic:
is_running = False
msg_count = 0

def __init__(self, topic_address, topic_name, topic_port):
self.topic_address = topic_address
self.topic_name = topic_name
self.topic_port = topic_port

def start_listening_topic_in_thread(self):
self.is_running = True
t = Thread(target=self._listen_topic)
t.start()

def print_status(self):
print(f"[Status] {self.topic_name}: msg_count: {self.msg_count}, is_running: {self.is_running}")

def _listen_topic(self):
"""
Documentation for paho.mqtt.python: https://github.com/eclipse/paho.mqtt.python
"""
client = mqtt.Client()
client.on_connect = self._on_connect_callback()
client.on_message = self._on_message_callback()

try:
# Enable debugging if needed
# client.on_log = on_log_callback
# client.on_disconnect = on_disconnect_callback

client.connect(self.topic_address, int(self.topic_port), MONITOR_PERIOD_IN_SECONDS)

# Call that processes network traffic, dispatches callbacks and
# handles reconnecting.
client.loop_start()

except Exception as e:
print(f"Error on topic {self.topic_name} {self.topic_address} {self.topic_port}: {e}")
client.disconnect()
self.is_running = False

# The callback for when the client receives a CONNACK response from the server.
def _on_connect_callback(self):
def on_connect(client, userdata, flags, rc):
if rc == 0:
client.subscribe(self.topic_name)
else:
print(f"Error on connecting {client}, is our IP whitelisted for the topic?")

return on_connect

# # The callback for when a PUBLISH message is received from the server.
def _on_message_callback(self):
def on_message(client, userdata, msg):
self.msg_count += 1
# print(msg.topic+" "+str(msg.payload))
return on_message

# Enable debugging if needed
# def on_log_callback(self, client, userdata, level, buf):
# print(buf)

# Enable debugging if needed
# def on_disconnect_callback(self, client, userdata, rc):
# print("Disconnected")


def main():
"""
In order for this to work, info for each topic (IP address, topic name and port) has to be defined in .env file with format: TOPIC<topic index>=<IP address, topic name, port>
Listens each topic continuously in a thread. Sends topic messages count per second every
minute to Azure Monitor.
Creates a thread for each topic to listen
After listening to all the threads is finished, send data to Azure Monitor
In order for this to work, info for each topic (IP address, topic name and port) has to
be defined in .env file with format: TOPIC<topic index>=<IP address, topic name, port>
"""
topic_data_collection = {}
print("Starting MQTT topic listener...")

topic_list = []
index = 1
threads = []
while True:
topic_data_string = os.getenv(f'TOPIC{index}')
index += 1
if (topic_data_string is None):
break
if topic_data_string is None or topic_data_string.count(',') != 2:
raise Exception(
f"Some topic data was missing. Required data: address,topic,port. We got: {topic_data_string}")
topic_data_array = topic_data_string.split(",")
topic_address = topic_data_array[0]
topic_name = topic_data_array[1]
topic_port = topic_data_array[2]
if (topic_address is None or topic_name is None or topic_port is None):
raise Exception(f"Some required topic data was missing, topic_address: {topic_address}, topic_name: {topic_name}, topic_port: {topic_port}")
topic = Topic(topic_address, topic_name, topic_port)
topic_list.append(topic)

def listen_topic_thread(topic_data_string):
if topic_data_string is None or topic_data_string.count(',') != 2:
raise Exception(
f"Some topic data was missing. Required data: address,topic,port. We got: {topic_data_string}")
topic_data_array = topic_data_string.split(",")
topic_address = topic_data_array[0]
topic_name = topic_data_array[1]
topic_port = topic_data_array[2]
if (topic_address is None or topic_name is None or topic_port is None):
raise Exception(f"Some required topic data was missing, topic_address: {topic_address}, topic_name: {topic_name}, topic_port: {topic_port}")
topic_data_collection_key = f"{topic_address}:{topic_name}:{topic_port}"
topic_data_collection[topic_data_collection_key] = 0

listen_topic(topic_data_collection, topic_data_collection_key, topic_address, topic_name, topic_port)
t = Thread(target=listen_topic_thread, args=(topic_data_string,))
threads.append(t)

# Start all threads simultaneously
for i in range(len(threads)):
threads[i].start()

# Wait for all the threads to finish
for i in range(len(threads)):
threads[i].join()

if IS_DEBUG:
print(topic_data_collection)
else:
send_mqtt_msg_count_into_azure(topic_data_collection)
print(f'Mqtt metrics sent: {datetime.now().strftime("%Y-%m-%dT%H:%M:%S")}')

def listen_topic(topic_data_collection, topic_data_collection_key, address, topic, port):
"""
Documentation for paho.mqtt.python: https://github.com/eclipse/paho.mqtt.python
"""
time_end = time.time() + MONITOR_PERIOD_IN_SECONDS
# Keep listening to topics forever
while True:
# (Re)start threads that are in is_running == False state
for topic in topic_list:
if topic.is_running == False:
print(f"Topic {topic.topic_name} was not running, starting it.")
topic.start_listening_topic_in_thread()

# If listen period has passed, send data to Azure
if time.time() > time_end:
topic_data_map = {}
for topic in topic_list:
topic_data_map[topic.topic_name] = topic.msg_count
topic.msg_count = 0

if IS_DEBUG:
print(topic_data_map)
else:
send_mqtt_msg_count_into_azure(topic_data_map)
print(f"Mqtt metrics sent: {datetime.now().strftime('%Y-%m-%dT%H:%M:%S')}")

time_end = time.time() + MONITOR_PERIOD_IN_SECONDS

client = mqtt.Client()
client.on_connect = on_connect_callback(topic)
client.on_message = on_message_callback(topic_data_collection, topic_data_collection_key)
# Enable debugging if needed
# client.on_log = on_log_callback
# client.on_disconnect = on_disconnect_callback

try:
client.connect(address, int(port), MONITOR_PERIOD_IN_SECONDS)
except:
print(f'Error: could not connect to {address} {topic} {port}')

# Call that processes network traffic, dispatches callbacks and
# handles reconnecting.
client.loop_start()

while time.time() < time_end:
time.sleep(1)

client.loop_stop()

# The callback for when the client receives a CONNACK response from the server.
def on_connect_callback(topic):
def on_connect(client, userdata, flags, rc):
if rc == 0:
client.subscribe(topic)
else:
print(f'Error on connecting {client}, is our IP whitelisted for the topic?')
return on_connect

# # The callback for when a PUBLISH message is received from the server.
def on_message_callback(topic_data_collection, topic_data_collection_key):

def on_message(client, userdata, msg):
topic_data_collection[topic_data_collection_key] += 1
# print(msg.topic+" "+str(msg.payload))

return on_message

# Enable debugging if needed
# def on_log_callback(client, userdata, level, buf):
# print(buf)

# Enable debugging if needed
# def on_disconnect_callback(client, userdata, rc):
# print("Disconnected")

def send_mqtt_msg_count_into_azure(topic_data_collection):
def send_mqtt_msg_count_into_azure(topic_data_map):
"""
Send custom metrics into azure. Documentation for the required format can be found from here:
https://docs.microsoft.com/en-us/azure/azure-monitor/essentials/metrics-custom-overview
Expand All @@ -127,7 +149,7 @@ def send_mqtt_msg_count_into_azure(topic_data_collection):
# Azure wants time in UTC ISO 8601 format
time = datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S")

series_array = get_series_array(topic_data_collection)
series_array = get_series_array(topic_data_map)

custom_metric_object = {
# Time (timestamp): Date and time at which the metric is measured or collected
Expand All @@ -152,10 +174,10 @@ def send_mqtt_msg_count_into_azure(topic_data_collection):

send_custom_metrics_request(custom_metric_json, attempts_remaining=3)

def get_series_array(topic_data_collection):
def get_series_array(topic_data_map):
series_array = []
for key in topic_data_collection:
topic_msg_count = topic_data_collection[key]
for key in topic_data_map:
topic_msg_count = topic_data_map[key]

# We want message count to be messages per second
topic_msg_count = round(topic_msg_count/MONITOR_PERIOD_IN_SECONDS, 2)
Expand Down
30 changes: 17 additions & 13 deletions pulsar_data_collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,19 +43,23 @@


def main():
topic_data_collection = {}
# Structure:
# key: topic_name: <string>
# value: topic_data: <object>
topic_data_map = {}

# Merge all topic name lists as a single array
collect_data_from_topics_list = list(set(TOPIC_NAMES_TO_COLLECT_MSG_RATE_IN + TOPIC_NAMES_TO_COLLECT_MSG_RATE_OUT + TOPIC_NAMES_TO_COLLECT_STORAGE_SIZE))

for topic_name in collect_data_from_topics_list:
topic_data = collect_data_from_topic(topic_name)
if topic_data != None:
topic_data_collection[topic_name] = topic_data
topic_data_map[topic_name] = topic_data

if bool(topic_data_collection):
send_metrics_into_azure(topic_data_collection)
if bool(topic_data_map):
send_metrics_into_azure(topic_data_map)
else:
print(f'Not sending metrics, topic_data_collection was empty.')
print(f'Not sending metrics, topic_data_map was empty.')

def collect_data_from_topic(topic_name):
pulsar_url = f'{ADMIN_URL}/admin/v2/persistent/{NAMESPACE}/{topic_name}/stats'
Expand All @@ -70,16 +74,16 @@ def collect_data_from_topic(topic_name):
except Exception as e:
print(f'Failed to send a POST request to {pulsar_url}. Is pulsar running and accepting requests?')

def send_metrics_into_azure(topic_data_collection):
send_pulsar_topic_metric_into_azure(METRIC_MSG_RATE_IN, "msgRateIn", topic_data_collection, TOPIC_NAMES_TO_COLLECT_MSG_RATE_IN)
send_pulsar_topic_metric_into_azure(METRIC_MSG_RATE_OUT, "msgRateOut", topic_data_collection, TOPIC_NAMES_TO_COLLECT_MSG_RATE_OUT)
send_pulsar_topic_metric_into_azure(METRIC_STORAGE_SIZE, "storageSize", topic_data_collection, TOPIC_NAMES_TO_COLLECT_STORAGE_SIZE)
def send_metrics_into_azure(topic_data_map):
send_pulsar_topic_metric_into_azure(METRIC_MSG_RATE_IN, "msgRateIn", topic_data_map, TOPIC_NAMES_TO_COLLECT_MSG_RATE_IN)
send_pulsar_topic_metric_into_azure(METRIC_MSG_RATE_OUT, "msgRateOut", topic_data_map, TOPIC_NAMES_TO_COLLECT_MSG_RATE_OUT)
send_pulsar_topic_metric_into_azure(METRIC_STORAGE_SIZE, "storageSize", topic_data_map, TOPIC_NAMES_TO_COLLECT_STORAGE_SIZE)
print(f'Pulsar metrics sent: {datetime.now().strftime("%Y-%m-%dT%H:%M:%S")}')

def send_pulsar_topic_metric_into_azure(
log_analytics_metric_name,
topic_data_metric_name,
topic_data_collection,
topic_data_map,
topic_names_to_collect
):
"""
Expand All @@ -94,7 +98,7 @@ def send_pulsar_topic_metric_into_azure(
# Azure wants time in UTC ISO 8601 format
time = datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S")

series_array = get_series_array(topic_data_collection, topic_data_metric_name, topic_names_to_collect)
series_array = get_series_array(topic_data_map, topic_data_metric_name, topic_names_to_collect)

custom_metric_object = {
# Time (timestamp): Date and time at which the metric is measured or collected
Expand Down Expand Up @@ -122,10 +126,10 @@ def send_pulsar_topic_metric_into_azure(
else:
send_custom_metrics_request(custom_metric_json, 3)

def get_series_array(topic_data_collection, topic_data_metric_name, topic_names_to_collect):
def get_series_array(topic_data_map, topic_data_metric_name, topic_names_to_collect):
series_array = []
for topic_name in topic_names_to_collect:
topic_msg_count = topic_data_collection[topic_name][topic_data_metric_name]
topic_msg_count = topic_data_map[topic_name][topic_data_metric_name]

topic_msg_count = round(topic_msg_count, 2)

Expand Down

0 comments on commit aca2a30

Please sign in to comment.