From b36fece805a30b18fd44a1b6f528b6379f91cce1 Mon Sep 17 00:00:00 2001 From: niemijoe Date: Mon, 12 Sep 2022 14:38:43 +0300 Subject: [PATCH] Try sending MQTT msg count data multiple times --- mqtt_data_collector.py | 47 ++++++++++++++--------------------- send_data_to_azure_monitor.py | 19 ++++++++++---- 2 files changed, 33 insertions(+), 33 deletions(-) diff --git a/mqtt_data_collector.py b/mqtt_data_collector.py index de8ee41..9d49db8 100644 --- a/mqtt_data_collector.py +++ b/mqtt_data_collector.py @@ -141,30 +141,8 @@ def main(): topic_data_map[topic_data_map_key] = topic.msg_count topic.msg_count = 0 - try: - with time_limit(10): - send_mqtt_msg_count_to_azure(topic_data_map) - except TimeoutException as e: - print("Sending data to Azure timed out.") - # TODO: remove this logging later when not needed - print("Sent mqtt msg count to Azure.") - -class TimeoutException(Exception): pass - -@contextmanager -def time_limit(seconds): - """ - Puts a timer to a function call so that it won't hang forever and stop execution of the script. - Taken from https://stackoverflow.com/a/601168/4282381 - """ - def signal_handler(signum, frame): - raise TimeoutException("Timed out!") - signal.signal(signal.SIGALRM, signal_handler) - signal.alarm(seconds) - try: - yield - finally: - signal.alarm(0) + t = Thread(target=send_mqtt_msg_count_to_azure, args=(topic_data_map,)) + t.start() def send_mqtt_msg_count_to_azure(topic_data_map): """ @@ -177,13 +155,13 @@ def send_mqtt_msg_count_to_azure(topic_data_map): """ # Azure wants time in UTC ISO 8601 format - time = datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S") + time_str = datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S") series_array = get_series_array(topic_data_map) custom_metric_object = { # Time (timestamp): Date and time at which the metric is measured or collected - "time": time, + "time": time_str, "data": { "baseData": { # Metric (name): name of the metric @@ -205,8 +183,21 @@ def send_mqtt_msg_count_to_azure(topic_data_map): if IS_DEBUG: print(custom_metric_json) else: - send_custom_metrics_request(custom_metric_json, attempts_remaining=3) - print(f"Mqtt metrics sent: {datetime.now().strftime('%Y-%m-%dT%H:%M:%S')}") + # Try sending data to Azure multiple times, wait between attempts + is_ok = send_custom_metrics_request(custom_metric_json=custom_metric_json, attempts_remaining=3) + if is_ok == False: + print("Sending data to Azure failed, trying again in 5 minutes.") + time.sleep(300) # Wait 5 minutes before the next attempt + is_ok = send_custom_metrics_request(custom_metric_json=custom_metric_json, attempts_remaining=3) + if is_ok == False: + print("Sending data to Azure failed, trying again in 10 minutes.") + time.sleep(600) # Wait 10 minutes before the next attempt + is_ok = send_custom_metrics_request(custom_metric_json=custom_metric_json, attempts_remaining=3) + + if is_ok: + print(f"Mqtt metrics sent: {datetime.now().strftime('%Y-%m-%dT%H:%M:%S')}") + else: + print("Failed to send metrics to Azure.") def get_series_array(topic_data_map): series_array = [] diff --git a/send_data_to_azure_monitor.py b/send_data_to_azure_monitor.py index 69185e4..eecbb85 100644 --- a/send_data_to_azure_monitor.py +++ b/send_data_to_azure_monitor.py @@ -16,9 +16,15 @@ ### SECRETS / ENV VARIABLES ### def send_custom_metrics_request(custom_metric_json, attempts_remaining): + """ + Sends custom metrics request to Azure. Tries to send as many times as given attempts_remaining. + When sending is successful, returns True, otherwise returns False + """ + # Exit if number of attempts reaches zero. if attempts_remaining == 0: - return + return False + attempts_remaining = attempts_remaining - 1 make_sure_access_token_file_exists() @@ -29,11 +35,11 @@ def send_custom_metrics_request(custom_metric_json, attempts_remaining): request_url = f'https://westeurope.monitoring.azure.com/{MONITOR_DATA_COLLECTOR_RESOURCE_ID}/metrics' headers = {'Content-type': 'application/json', 'Authorization': f'Bearer {existing_access_token}'} - response = requests.post(request_url, data=custom_metric_json, headers=headers) + response = requests.post(request_url, data=custom_metric_json, headers=headers, timeout=10) # Return if response is successful if response.status_code == 200: - return + return True # Try catch because json.loads(response.text) might not be available try: @@ -41,16 +47,19 @@ def send_custom_metrics_request(custom_metric_json, attempts_remaining): if response_dict['Error']['Code'] == 'TokenExpired': print("Currently stored access token has expired, getting a new access token.") request_new_access_token_and_write_it_on_disk() - send_custom_metrics_request(custom_metric_json, attempts_remaining) + return send_custom_metrics_request(custom_metric_json, attempts_remaining) elif response_dict['Error']['Code'] == 'InvalidToken': print("Currently stored access token is invalid, getting a new access token.") request_new_access_token_and_write_it_on_disk() - send_custom_metrics_request(custom_metric_json, attempts_remaining) + return send_custom_metrics_request(custom_metric_json, attempts_remaining) else: print(f'Request failed for an unknown reason, response: {response_dict}.') except Exception as e: print(f'Request failed for an unknown reason, response: {response}.') + print("Returning False as sending data to Azure was not successful.") + return False + def make_sure_access_token_file_exists(): try: f = open(ACCESS_TOKEN_PATH, "r")