From 20cad475d3cb575582dd46ec1054317c35c3fdb0 Mon Sep 17 00:00:00 2001 From: niemijoe Date: Fri, 12 Aug 2022 12:21:28 +0300 Subject: [PATCH] dimValue key includes topic address and port --- README.md | 27 ++++++++++++++++++++++----- mqtt_data_collector.py | 24 +++++++++++++----------- 2 files changed, 35 insertions(+), 16 deletions(-) diff --git a/README.md b/README.md index 96156be..64a6917 100644 --- a/README.md +++ b/README.md @@ -2,21 +2,38 @@ Collects MQTT and Pulsar data and sends it to Azure Monitor so that alerts can monitor the data and alert when needed. -### Run locally +## Run locally + +Have `.env` file at the project directory containing all of the secret values (you can get secrets from a pulsar-proxy VM from pulsar-dev resource group) + +### Run pulsar data collector To run `pulsar_data_collector.py`, you need to have a tunnel open to pulsar_dev_proxy so that `ADMIN_URL` env variable points to pulsar admin's port. -Have `.env` file at the project directory containing all of the secret values (you can get secrets from a pulsar-proxy VM from pulsar-dev resource group) and then run either: ``` python3 pulsar_data_collector.py ``` -or + +### Run mqtt data collector + +To run `mqtt_data_collector.py`, some of the addresses might require having a tunnel open to pulsar_bastion and then listening through the tunnel. + +Example of opening two tunnels into localhost ports 9001 and 9002: + +``` +ssh -L 9001:: -L 9002:: +``` + +and in `.env` file, you need to have topics that require a tunnel configured like so: TOPIC=localhost,,9001 +for those topics that require tunneling. + +Now you can run: ``` python3 mqtt_data_collector.py ``` -### Send custom metrics manually to Azure Monitor +## Send custom metrics manually to Azure Monitor If you need to send new custom metrics to Azure Monitor, you can firstly test sending by editing @@ -28,6 +45,6 @@ Notes: - Edit what you need in `custom_metric_example.json` (at least the timestamp) - You need a fresh `access token` for this command, you can get it by running `main.py` locally (see access_token.txt file) -### Deployment +## Deployment Deployment is done with ansible on the pulsar proxy server. In order to update this app, create a new release in github: https://github.com/HSLdevcom/transitdata-monitor-data-collector/releases/new and then run the pulsar proxy playbook. diff --git a/mqtt_data_collector.py b/mqtt_data_collector.py index b0ef780..6971ce0 100644 --- a/mqtt_data_collector.py +++ b/mqtt_data_collector.py @@ -40,8 +40,10 @@ def listen_topic_thread(topic_data_string): topic_port = topic_data_array[2] if (topic_address is None or topic_name is None or topic_port is None): raise Exception(f"Some required topic data was missing, topic_address: {topic_address}, topic_name: {topic_name}, topic_port: {topic_port}") - topic_data_collection[topic_name] = 0 - listen_topic(topic_data_collection, topic_address, topic_name, topic_port) + topic_data_collection_key = f"{topic_address}:{topic_name}:{topic_port}" + topic_data_collection[topic_data_collection_key] = 0 + + listen_topic(topic_data_collection, topic_data_collection_key, topic_address, topic_name, topic_port) t = Thread(target=listen_topic_thread, args=(topic_data_string,)) threads.append(t) @@ -59,7 +61,7 @@ def listen_topic_thread(topic_data_string): send_mqtt_msg_count_into_azure(topic_data_collection) print(f'Mqtt metrics sent: {datetime.now().strftime("%Y-%m-%dT%H:%M:%S")}') -def listen_topic(topic_data_collection, address, topic, port): +def listen_topic(topic_data_collection, topic_data_collection_key, address, topic, port): """ Documentation for paho.mqtt.python: https://github.com/eclipse/paho.mqtt.python """ @@ -67,7 +69,7 @@ def listen_topic(topic_data_collection, address, topic, port): client = mqtt.Client() client.on_connect = on_connect_callback(topic) - client.on_message = on_message_callback(topic_data_collection, topic) + client.on_message = on_message_callback(topic_data_collection, topic_data_collection_key) # Enable debugging if needed # client.on_log = on_log_callback # client.on_disconnect = on_disconnect_callback @@ -96,10 +98,10 @@ def on_connect(client, userdata, flags, rc): return on_connect # # The callback for when a PUBLISH message is received from the server. -def on_message_callback(topic_data_collection, topic): +def on_message_callback(topic_data_collection, topic_data_collection_key): def on_message(client, userdata, msg): - topic_data_collection[topic] += 1 + topic_data_collection[topic_data_collection_key] += 1 # print(msg.topic+" "+str(msg.payload)) return on_message @@ -152,17 +154,17 @@ def send_mqtt_msg_count_into_azure(topic_data_collection): def get_series_array(topic_data_collection): series_array = [] - for topic_name in topic_data_collection: - topic_msg_count = topic_data_collection[topic_name] + for key in topic_data_collection: + topic_msg_count = topic_data_collection[key] # Azure doesn't seem to like # in a dimValue, replace it with * - parsed_topic_name = topic_name.replace("#", "*") + parsed_key = key.replace("#", "*") # Azure doesn't seem to like + in a dimValue, replace it with ^ - parsed_topic_name = parsed_topic_name.replace("+", "^") + parsed_key = parsed_key.replace("+", "^") dimValue = { "dimValues": [ - parsed_topic_name + parsed_key ], "sum": topic_msg_count, "count": 1