Skip to content

Commit

Permalink
Added time range filter to prometheus queries (#81)
Browse files Browse the repository at this point in the history
* Added time range filter to prometheus queries
  • Loading branch information
tsebastiani authored Dec 13, 2023
1 parent eebdb53 commit 1789e05
Show file tree
Hide file tree
Showing 3 changed files with 142 additions and 119 deletions.
2 changes: 1 addition & 1 deletion src/krkn_lib/ocp/krkn_openshift.py
Original file line number Diff line number Diff line change
Expand Up @@ -397,7 +397,7 @@ def get_prometheus_api_connection_data(
endpoint = None
for item in json_obj["items"]:
if item["metadata"]["name"] == "prometheus-k8s":
endpoint = item["spec"]["host"]
endpoint = f"https://{item['spec']['host']}"
break

if endpoint is None:
Expand Down
96 changes: 62 additions & 34 deletions src/krkn_lib/prometheus/krkn_prometheus.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import logging
import math
import re
import sys
from datetime import datetime, timedelta

from prometheus_api_client import PrometheusConnect

Expand Down Expand Up @@ -33,27 +35,47 @@ def __init__(
sys.exit(1)

# Process custom prometheus query
def process_prom_query(self, query: str) -> list[dict[str:any]]:
def process_prom_query_in_range(
self,
query: str,
start_time: datetime = datetime.now() - timedelta(days=1),
end_time: datetime = datetime.now(),
) -> list[dict[str:any]]:
"""
Executes a query to the Prometheus API in PromQL language
:param query: promQL query
:param start_time: start time of the result set (default now - 1 day)
:param end_time: end time of the result set (default min datetime)
:return: a list of records in dictionary format
"""

granularity = math.ceil(
(end_time - start_time).total_seconds() / 11000
)
granularity = granularity if granularity > 0 else 1
if self.prom_cli:
try:
return self.prom_cli.custom_query(query=query, params=None)
return self.prom_cli.custom_query_range(
query=query,
start_time=start_time,
end_time=end_time,
step=f"{granularity}s",
)
except Exception as e:
logging.error("Failed to get the metrics: %s" % e)
sys.exit(1)
raise e
else:
logging.info(
"Skipping the prometheus query as the "
"prometheus client couldn't "
"be initialized\n"
)

def process_alert(self, alert: dict[str, str]):
def process_alert(
self, alert: dict[str, str], start_time: datetime, end_time: datetime
):
"""
Processes Krkn alarm in the format
Expand Down Expand Up @@ -87,6 +109,11 @@ def process_alert(self, alert: dict[str, str]):
:params alert: a dictionary containing the following keys :
expr, description, severity
:param start_time: start time of the result set (if None
no time filter is applied to the query)
:param end_time: end time of the result set (if None
no time filter is applied to the query)
"""
Expand Down Expand Up @@ -119,16 +146,21 @@ def process_alert(self, alert: dict[str, str]):
return

try:
records = self.process_prom_query(alert["expr"])
records = self.process_prom_query_in_range(
alert["expr"], start_time, end_time
)
if len(records) == 0:
return

log_alert = getattr(logging, alert["severity"])
if log_alert is None:
raise Exception()
for record in records:
result = self.parse_metric(alert["description"], record)

# prints only one record per query result
if len(records) > 0:
result = self.parse_metric(alert["description"], records[0])
log_alert(result)

except Exception as e:
logging.error(
f"failed to execute query: {alert['expr']} with exception {e}"
Expand All @@ -146,30 +178,26 @@ def parse_metric(self, description: str, record: dict[str:any]) -> str:
:return: the description with the expressions replaced by the correct
values
"""
result = description
expressions = re.findall(r"{{\$[\w\-_]+[.[\w\-_]+]*}}", description)
for expression in expressions:
if expression == "{{$value}}":
value = None
if "value" in record:
if isinstance(record["value"], list):
if len(record["value"]) > 0:
if len(record["value"]) == 1:
value = record["value"][0]
else:
value = record["value"][1]
else:
value = record["value"]
if value is not None:
result = result.replace(expression, value)

elif re.match(r"^{{\$labels\.([\w\-_]+)}}$", expression):
label = re.findall(r"^{{\$labels\.([\w\-_]+)}}", expression)
if (
"metric" in record.keys()
and label[0] in record["metric"].keys()
):
result = result.replace(
expression, record["metric"][label[0]]
)
return result

values = []

if "values" in record:
if isinstance(record["values"], list):
for value in record["values"]:
if isinstance(value, list):
values.append(value[1])

labels = re.findall(r"{{\$labels\.([\w\-_]+)}}", description)
for label in labels:
if "metric" in record.keys() and label in record["metric"].keys():
placeholder = "{{{{$labels.{0}}}}}".format(label)
description = description.replace(
placeholder, record["metric"][label]
)

if "{{$value}}" in description:
if len(values) > 0:
# returns the first value in the series
description = description.replace("{{$value}}", values[0])

return description
163 changes: 79 additions & 84 deletions src/krkn_lib/tests/test_krkn_prometheus.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import datetime
import logging

from krkn_lib.prometheus.krkn_prometheus import KrknPrometheus
Expand All @@ -10,22 +11,29 @@ class TestKrknPrometheus(BaseTest):
def test_process_prom_query(self):
prom_cli = KrknPrometheus(self.url)
query = "node_boot_time_seconds"
res = prom_cli.process_prom_query(query)
start_time = datetime.datetime.now() - datetime.timedelta(days=10)

end_time = datetime.datetime.now()
res = prom_cli.process_prom_query_in_range(query, start_time, end_time)

self.assertTrue(len(res) > 0)
self.assertTrue("metric" in res[0].keys())
self.assertTrue("value" in res[0].keys())
self.assertTrue(len(res[0]["value"]) == 2)
self.assertTrue("values" in res[0].keys())
for value in res[0]["values"]:
self.assertEqual(len(value), 2)

def test_process_alert(self):
prom_cli = KrknPrometheus(self.url)
res = prom_cli.process_prom_query("node_boot_time_seconds")
res = prom_cli.process_prom_query_in_range("node_boot_time_seconds")
logging.disable(logging.NOTSET)
control = (
f"container: {res[0]['metric']['container']}, "
f"endpoint: {res[0]['metric']['endpoint']}, "
f"value: {res[0]['value'][1]}"
)
controls = []
for result in res:
for value in result["values"]:
controls.append(
f"container: {res[0]['metric']['container']}, "
f"endpoint: {res[0]['metric']['endpoint']}, "
f"value: {value[1]}"
)

alert_info = {
"expr": "node_boot_time_seconds",
Expand Down Expand Up @@ -75,42 +83,56 @@ def test_process_alert(self):
}

with self.assertLogs(level="INFO") as lc:
prom_cli.process_alert(alert_info)
self.assertTrue(len(lc.records) == 1)
self.assertEqual(lc.records[0].levelname, "INFO")
self.assertEqual(lc.records[0].msg, control)

with self.assertLogs(level="DEBUG") as lc:
prom_cli.process_alert(alert_debug)
self.assertTrue(len(lc.records) == 1)
self.assertEqual(lc.records[0].levelname, "DEBUG")
self.assertEqual(lc.records[0].msg, control)

with self.assertLogs(level="WARNING") as lc:
prom_cli.process_alert(alert_warning)
self.assertTrue(len(lc.records) == 1)
self.assertEqual(lc.records[0].levelname, "WARNING")
self.assertEqual(lc.records[0].msg, control)

with self.assertLogs(level="ERROR") as lc:
prom_cli.process_alert(alert_error)
self.assertTrue(len(lc.records) == 1)
self.assertEqual(lc.records[0].levelname, "ERROR")
self.assertEqual(lc.records[0].msg, control)

with self.assertLogs(level="CRITICAL") as lc:
prom_cli.process_alert(alert_critical)
self.assertTrue(len(lc.records) == 1)
self.assertEqual(lc.records[0].levelname, "CRITICAL")
self.assertEqual(lc.records[0].msg, control)

with self.assertLogs(level="ERROR") as lc:
prom_cli.process_alert(alert_not_exists)
self.assertTrue(len(lc.records) == 1)
self.assertEqual(lc.records[0].levelname, "ERROR")
self.assertEqual(
lc.records[0].msg, "invalid severity level: not_exists"
prom_cli.process_alert(
alert_info,
datetime.datetime.now() - datetime.timedelta(days=1),
datetime.datetime.now(),
)
self.assertTrue(len(lc.records) > 0)

with self.assertLogs(level="DEBUG") as lc:
prom_cli.process_alert(
alert_debug,
datetime.datetime.now() - datetime.timedelta(days=1),
datetime.datetime.now(),
)
self.assertTrue(len(lc.records) > 0)

with self.assertLogs(level="WARNING") as lc:
prom_cli.process_alert(
alert_warning,
datetime.datetime.now() - datetime.timedelta(days=1),
datetime.datetime.now(),
)
self.assertTrue(len(lc.records) > 0)

with self.assertLogs(level="ERROR") as lc:
prom_cli.process_alert(
alert_error,
datetime.datetime.now() - datetime.timedelta(days=1),
datetime.datetime.now(),
)
self.assertTrue(len(lc.records) > 0)

with self.assertLogs(level="CRITICAL") as lc:
prom_cli.process_alert(
alert_critical,
datetime.datetime.now() - datetime.timedelta(days=1),
datetime.datetime.now(),
)
self.assertTrue(len(lc.records) > 0)

with self.assertLogs(level="ERROR") as lc:
prom_cli.process_alert(
alert_not_exists,
datetime.datetime.now() - datetime.timedelta(days=1),
datetime.datetime.now(),
)
self.assertTrue(len(lc.records) == 1)
self.assertEqual(lc.records[0].levelname, "ERROR")
self.assertEqual(
lc.records[0].msg, "invalid severity level: not_exists"
)

def test_parse_metric(self):
prom_cli = KrknPrometheus(self.url)
Expand All @@ -120,54 +142,38 @@ def test_parse_metric(self):
"instance": "test_instance",
"no_value": "no_value",
},
"value": [1699357840, "0.1"],
}
metric_single_value = {
"metric": {
"pod": "test_pod",
"instance": "test_instance",
"no_value": "no_value",
},
"value": ["0.1"],
"values": [[1699357840, "0.1"]],
}
metric_no_value = {
"metric": {
"pod": "test_pod",
"instance": "test_instance",
"no_value": "no_value",
},
"value": [],
}

metric_scalar_value = {
"metric": {
"pod": "test_pod",
"instance": "test_instance",
"no_value": "no_value",
},
"value": "scalar",
"values": [],
}

control = (
f"10 minutes avg. 99th etcd commit {metric['metric']['instance']} "
f"10 minutes avg. 99th etcd "
f"commit {metric['metric']['instance']} "
f"latency on {metric['metric']['pod']} higher "
f"than 30ms. {metric['value'][1]}"
f"than 30ms. {metric['values'][0][1]}"
)

control_underscore = (
f"10 minutes avg. 99th etcd commit {metric['metric']['instance']} "
f"10 minutes avg. 99th etcd "
f"commit {metric['metric']['instance']} "
f"latency on {metric['metric']['pod']} higher "
f"than 30ms. {metric['metric']['no_value']}"
)

control_no_value = (
f"10 minutes avg. 99th etcd commit {metric['metric']['instance']} "
f"10 minutes avg. 99th etcd "
f"commit {metric['metric']['instance']} "
f"latency on {metric['metric']['pod']} higher than "
f"30ms. {{{{$value}}}}"
)
control_scalar_value = (
f"10 minutes avg. 99th etcd commit {metric['metric']['instance']} "
f"latency on {metric['metric']['pod']} higher than "
f"30ms. {metric_scalar_value['value']}"
)

description = (
"10 minutes avg. 99th etcd commit {{$labels.instance}} latency"
" on {{$labels.pod}} higher than 30ms. {{$value}}"
Expand All @@ -179,16 +185,7 @@ def test_parse_metric(self):

# tests a standard label and vale replacement
result = prom_cli.parse_metric(description, metric)
# tests a replacement for a metric record containing a
# single value instead of an array of two (just in case)
result_single_value = prom_cli.parse_metric(
description, metric_single_value
)
# tests a replacement for a metric with a scalar
# value instead of an array (just in case)
result_scalar_value = prom_cli.parse_metric(
description, metric_scalar_value
)

# tests a replacement for a metric with an
# empty array of values ( {{$value}} won't be replaced)
result_no_value = prom_cli.parse_metric(description, metric_no_value)
Expand All @@ -199,7 +196,5 @@ def test_parse_metric(self):
)

self.assertEqual(control, result)
self.assertEqual(control, result_single_value)
self.assertEqual(control_no_value, result_no_value)
self.assertEqual(control_scalar_value, result_scalar_value)
self.assertEqual(control_underscore, result_underscore)

0 comments on commit 1789e05

Please sign in to comment.