-
Notifications
You must be signed in to change notification settings - Fork 28
/
metrics.py
118 lines (89 loc) · 4.33 KB
/
metrics.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
from collections import Counter, defaultdict
from typing import List
import numpy as np
def get_servers_metrics(model_reports) -> List[str]:
servers_num_total = 0
servers_num_relay = 0
num_peers = 0
pings = []
num_ping_infs = 0
version_counts = Counter()
result = ["# SERVER LEVEL METRICS"]
for model_reports in model_reports:
for server in model_reports["server_rows"]:
if server["span"].server_info is not None:
next_pings = server["span"].server_info.next_pings
if next_pings is not None:
servers_num_total += 1
num_peers += len(next_pings)
pings_not_inf = [v for k, v in next_pings.items() if v != float("inf")]
pings.extend(pings_not_inf)
num_ping_infs += len([v for v in next_pings.values() if v == float("inf")])
if server["span"].server_info.using_relay:
servers_num_relay += 1
version = server["span"].server_info.version
if version:
version_counts[version] += 1
if servers_num_total > 0 and pings:
peers_per_srv = (len(pings) + num_ping_infs) / servers_num_total
pings_inf_share = num_ping_infs / (num_ping_infs + len(pings))
result.extend(
[
f"peers_per_srv {peers_per_srv:.1f}",
f"pings_inf_share {pings_inf_share:.3f}",
]
)
result.append(f"servers_num_total {servers_num_total}")
result.append(f"servers_num_relay {servers_num_relay}")
if pings:
result.append("# PINGS")
pings = np.sort(pings).tolist()
for pct in (25, 50, 75, 90, 95):
result.append(f'ping_pct{{pct="{pct}"}} {np.percentile(pings, pct):.4f}')
result.append("# VERSIONS")
for version_number, version_count in version_counts.items():
result.append(f'server_version{{version_number="{version_number}"}} {version_count}')
return result
def get_models_metrics(model_reports) -> List[str]:
result = [
"# MODEL LEVEL METRICS",
]
for model_reports in model_reports:
model_name = model_reports["dht_prefix"]
result.append(f"# MODEL: {model_name} {'-' * 50}")
blocks = defaultdict(lambda: np.zeros(model_reports["num_blocks"]))
for server in model_reports["server_rows"]:
for block_idx in range(server["span"].start, server["span"].end):
blocks["total"][block_idx] += 1
blocks[server["state"]][block_idx] += 1
if server["span"].server_info is not None:
for rps in ("network_rps", "inference_rps", "forward_rps"):
rps_value = getattr(server["span"].server_info, rps, 0)
if rps_value is not None:
blocks[rps][block_idx] += rps_value
result.extend(
[
f'n_blocks{{model="{model_name}"}} {model_reports["num_blocks"]}',
f'servers_num{{model="{model_name}"}} {len(model_reports["server_rows"])}',
f'blocks_total{{model="{model_name}"}} {blocks["total"].sum()}',
f'blocks_online_min{{model="{model_name}"}} {blocks["online"].min()}',
]
)
for block_state in ("online", "joining", "offline", "unreachable"):
result.append(f'blocks{{model="{model_name}",state="{block_state}"}} {blocks[block_state].sum():.0f}')
for rps in ("network_rps", "inference_rps", "forward_rps"):
rps_type = rps.split("_")[0]
result.append(f'rps_avg{{model="{model_name}",rps="{rps_type}"}} {blocks[rps].mean():.1f}')
result.append(f'rps_min{{model="{model_name}",rps="{rps_type}"}} {blocks[rps].min():.1f}')
return result
def get_prometheus_metrics(state_dict) -> str:
"""prepares metrics in Prometeus format
description: https://prometheus.io/docs/instrumenting/exposition_formats/
returns multline string with single metric per line
"""
result = []
result.append("# GENERAL METRICS")
result.append(f"update_duration {state_dict.get('update_duration', None):.1f}")
result.extend(get_servers_metrics(state_dict["model_reports"]))
result.extend(get_models_metrics(state_dict["model_reports"]))
return "\n".join(result)