Skip to content

Commit

Permalink
Initial commit
Browse files Browse the repository at this point in the history
  • Loading branch information
niemijoe committed Aug 5, 2022
1 parent f72f310 commit 91afc88
Show file tree
Hide file tree
Showing 8 changed files with 424 additions and 2 deletions.
11 changes: 11 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
TENANT_ID=<secret>
CLIENT_ID=<secret>
CLIENT_SECRET=<secret>
ADMIN_URL=http://localhost:8089
NAMESPACE=<secret>
PULSAR_PROXY_RESOURCE_ID=<secret>
ACCESS_TOKEN_PATH=access_token.txt
TOPIC1=<IP address>,<topic name><port>
TOPIC2=<IP address>,<topic name><port>
TOPIC3=...
...
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
.env
access_token.txt
.idea
35 changes: 33 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,2 +1,33 @@
# transitdata-monitor-data-collector
Collects MQTT and Pulsar data and sends it to Azure
# Transitdata Monitor Data Collector

Collects MQTT and Pulsar data and sends it to Azure Monitor so that alerts can monitor the data and alert when needed.

### Run locally

To run `pulsar_data_collector.py`, you need to have a tunnel open to pulsar_dev_proxy so that `ADMIN_URL` env variable points to pulsar admin's port.

Have `.env` file at the project directory containing all of the secret values (you can get secrets from a pulsar-proxy VM from pulsar-dev resource group)
and then run either:
```
python3 pulsar_data_collector.py
```
or
```
python3 mqtt_data_collector.py
```

### Send custom metrics manually to Azure Monitor

If you need to send new custom metrics to Azure Monitor,
you can firstly test sending by editing
`custom_metric_example.json` and running:
```
curl -X POST https://westeurope.monitoring.azure.com/<resourceId>/metrics -H "Content-Type: application/json" -H "Authorization: Bearer <AccessToken>" -d @custom_metric_example.json
```
Notes:
- Edit what you need in `custom_metric_example.json` (at least the timestamp)
- You need a fresh `access token` for this command, you can get it by running `main.py` locally (see access_token.txt file)

### Deployment

Deployment is done with ansible on the pulsar proxy server. In order to update this app, create a new release in github: https://github.com/HSLdevcom/transitdata-monitor-data-collector/releases/new and then run the pulsar proxy playbook.
Binary file not shown.
148 changes: 148 additions & 0 deletions mqtt_data_collector.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
import paho.mqtt.client as mqtt
import time
import json
from datetime import datetime
from threading import Thread
import os
from dotenv import load_dotenv
from send_data_to_azure_monitor import send_custom_metrics_request

load_dotenv()

# How long to listen to the topics until we send data to Azure. Should be 60 in production
MONITOR_PERIOD_IN_SECONDS = 60

def main():
"""
In order for this to work, info for each topic (IP address, topic name and port) has to be defined in .env file with format: TOPIC<topic index>=<IP address, topic name, port>
Creates a thread for each topic to listen
After listening to all the threads is finished, send data to Azure Monitor
"""
topic_data_collection = {}
index = 1
threads = []
while True:
topic_data_string = os.getenv(f'TOPIC{index}')
index += 1
if (topic_data_string is None):
break

def listen_topic_thread(topic_data_string):
if topic_data_string is None or topic_data_string.count(',') != 2:
raise Exception(
f"Some topic data was missing. Required data: address,topic,port. We got: {topic_data_string}")
topic_data_array = topic_data_string.split(",")
topic_address = topic_data_array[0]
topic_name = topic_data_array[1]
topic_port = topic_data_array[2]
if (topic_address is None or topic_name is None or topic_port is None):
raise Exception(f"Some required topic data was missing, topic_address: {topic_address}, topic_name: {topic_name}, topic_port: {topic_port}")
topic_data_collection[topic_name] = 0
listen_topic(topic_data_collection, topic_address, topic_name, topic_port)
t = Thread(target=listen_topic_thread, args=(topic_data_string,))
threads.append(t)

# Start all threads simultaneously
for i in range(len(threads)):
threads[i].start()

# Wait for all the threads to finish
for i in range(len(threads)):
threads[i].join()

send_mqtt_msg_count_into_azure(topic_data_collection)

def listen_topic(topic_data_collection, address, topic, port):
"""
Documentation for paho.mqtt.python: https://github.com/eclipse/paho.mqtt.python
"""
time_end = time.time() + MONITOR_PERIOD_IN_SECONDS

client = mqtt.Client()
client.on_connect = on_connect_callback(topic)
client.on_message = on_message_callback(topic_data_collection, topic)

client.connect(address, int(port), MONITOR_PERIOD_IN_SECONDS)

# Call that processes network traffic, dispatches callbacks and
# handles reconnecting.
client.loop_start()

while time.time() < time_end:
time.sleep(1)

client.loop_stop()

# The callback for when the client receives a CONNACK response from the server.
def on_connect_callback(topic):
def on_connect(client, userdata, flags, rc):
if rc == 0:
client.subscribe(topic)
else:
print(f'Error on connecting {client}, is our IP whitelisted for the topic?')
return on_connect

# # The callback for when a PUBLISH message is received from the server.
def on_message_callback(topic_data_collection, topic):

def on_message(client, userdata, msg):
topic_data_collection[topic] += 1
# print(msg.topic+" "+str(msg.payload))

return on_message

def send_mqtt_msg_count_into_azure(topic_data_collection):
"""
Send custom metrics into azure. Documentation for the required format can be found from here:
https://docs.microsoft.com/en-us/azure/azure-monitor/essentials/metrics-custom-overview
# Subject: which Azure resource ID the custom metric is reported for.
# Is included in the URL of the API call
# Region: must be the same for the resource ID and for log analytics
# Is included in the URL of the API call
"""

# Azure wants time in UTC ISO 8601 format
time = datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S")

series_array = get_series_array(topic_data_collection)

custom_metric_object = {
# Time (timestamp): Date and time at which the metric is measured or collected
"time": time,
"data": {
"baseData": {
# Metric (name): name of the metric
"metric": "msgCount",
# Namespace: Categorize or group similar metrics together
"namespace": "MQTT",
# Dimension (dimNames): Metric has a single dimension
"dimNames": [
"Topic"
],
# Series: data for each monitored topic
"series": series_array
}
}
}

custom_metric_json = json.dumps(custom_metric_object)

send_custom_metrics_request(custom_metric_json, attempts_remaining=3)

def get_series_array(topic_data_collection):
series_array = []
for topic_name in topic_data_collection:
topic_msg_count = topic_data_collection[topic_name]
dimValue = {
"dimValues": [
topic_name
],
"sum": topic_msg_count,
"count": 1
}
series_array.append(dimValue)
return series_array

if __name__ == '__main__':
main()
145 changes: 145 additions & 0 deletions pulsar_data_collector.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
import requests
import json
from datetime import datetime
import os
from dotenv import load_dotenv
from send_data_to_azure_monitor import send_custom_metrics_request

load_dotenv()

### SECRETS / ENV VARIABLES ###

ADMIN_URL=os.getenv('ADMIN_URL')
NAMESPACE=os.getenv('NAMESPACE')

### SECRETS / ENV VARIABLES ###

METRIC_MSG_RATE_IN = "Msg Rate In"
METRIC_MSG_RATE_OUT = "Msg Rate Out"
METRIC_STORAGE_SIZE = "Storage Size"

TOPIC_NAMES_TO_COLLECT_MSG_RATE_IN = [
"hfp-mqtt-raw/v2",
"hfp-mqtt-raw-deduplicated/v2",
"hfp/v2",
"gtfs-rt/feedmessage-vehicleposition",
"metro-ats-mqtt-raw/metro-estimate",
"metro-ats-mqtt-raw-deduplicated/metro-estimate",
"source-metro-ats/metro-estimate",
"source-pt-roi/arrival",
"source-pt-roi/departure",
"internal-messages/pubtrans-stop-estimate",
"internal-messages/feedmessage-tripupdate",
"gtfs-rt/feedmessage-tripupdate",
"internal-messages/stop-cancellation"
]

TOPIC_NAMES_TO_COLLECT_MSG_RATE_OUT = [
"hfp/passenger-count",
"gtfs-rt/feedmessage-vehicleposition",
"gtfs-rt/feedmessage-tripupdate"
]

TOPIC_NAMES_TO_COLLECT_STORAGE_SIZE = [
"hfp/v2"
]


def main():
topic_data_collection = {}
# Merge all topic name lists as a single array
collect_data_from_topics_list = list(set(TOPIC_NAMES_TO_COLLECT_MSG_RATE_IN + TOPIC_NAMES_TO_COLLECT_MSG_RATE_OUT + TOPIC_NAMES_TO_COLLECT_STORAGE_SIZE))

for topic_name in collect_data_from_topics_list:
topic_data = collect_data_from_topic(topic_name)
if topic_data != None:
topic_data_collection[topic_name] = topic_data

if bool(topic_data_collection):
send_metrics_into_azure(topic_data_collection)
else:
print(f'Not sending metrics, topic_data_collection was empty.')

def collect_data_from_topic(topic_name):
pulsar_url = f'{ADMIN_URL}/admin/v2/persistent/{NAMESPACE}/{topic_name}/stats'
try:
r = requests.get(url=pulsar_url)
topic_data = r.json()
# print(f'Stats of topic {topic_data}:')
# print(f'{topic_data["msgRateIn"]}')
# print(f'{topic_data["msgRateOut"]}')
# print(f'{topic_data["storageSize"]}')
return topic_data
except Exception as e:
print(f'Failed to send a POST request to {pulsar_url}. Is pulsar running and accepting requests?')

def send_metrics_into_azure(topic_data_collection):
send_pulsar_topic_metric_into_azure(METRIC_MSG_RATE_IN, "msgRateIn", topic_data_collection, TOPIC_NAMES_TO_COLLECT_MSG_RATE_IN)
send_pulsar_topic_metric_into_azure(METRIC_MSG_RATE_OUT, "msgRateOut", topic_data_collection, TOPIC_NAMES_TO_COLLECT_MSG_RATE_OUT)
send_pulsar_topic_metric_into_azure(METRIC_STORAGE_SIZE, "storageSize", topic_data_collection, TOPIC_NAMES_TO_COLLECT_STORAGE_SIZE)
print(f'Pulsar metrics sent: {datetime.now().strftime("%Y-%m-%dT%H:%M:%S")}')

def send_pulsar_topic_metric_into_azure(
log_analytics_metric_name,
topic_data_metric_name,
topic_data_collection,
topic_names_to_collect
):
"""
Send custom metrics into azure. Documentation for the required format can be found from here:
https://docs.microsoft.com/en-us/azure/azure-monitor/essentials/metrics-custom-overview
# Subject: which Azure resource ID the custom metric is reported for.
# Is included in the URL of the API call
# Region: must be the same for the resource ID and for log analytics
# Is included in the URL of the API call
"""

# Azure wants time in UTC ISO 8601 format
time = datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S")

series_array = get_series_array(topic_data_collection, topic_data_metric_name, topic_names_to_collect)

custom_metric_object = {
# Time (timestamp): Date and time at which the metric is measured or collected
"time": time,
"data": {
"baseData": {
# Metric (name): name of the metric
"metric": log_analytics_metric_name,
# Namespace: Categorize or group similar metrics together
"namespace": "Pulsar",
# Dimension (dimNames): Metric has a single dimension
"dimNames": [
"Topic"
],
# Series: data for each monitored topic
"series": series_array
}
}
}

custom_metric_json = json.dumps(custom_metric_object)

send_custom_metrics_request(custom_metric_json, 3)

def get_series_array(topic_data_collection, topic_data_metric_name, topic_names_to_collect):
series_array = []
for topic_name in topic_names_to_collect:
topic_msg_count = topic_data_collection[topic_name][topic_data_metric_name]
# Special case: we want to multiply stop-cancellation messages by 10
# so that the data would show more likely in Azure's charts
if topic_name == "internal-messages/stop-cancellation":
topic_msg_count = topic_msg_count * 10

dimValue = {
"dimValues": [
topic_name
],
"sum": round(topic_msg_count),
"count": 1
}
series_array.append(dimValue)
return series_array

if __name__ == '__main__':
main()
2 changes: 2 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
paho-mqtt==1.6.1
python-dotenv==0.20.0
Loading

0 comments on commit 91afc88

Please sign in to comment.