userver: /data/code/userver/testsuite/pytest_plugins/pytest_userver/metrics.py Source File
Loading...
Searching...
No Matches
metrics.py
1"""
2Python module that provides helpers for functional testing of metrics with
3testsuite; see
4@ref scripts/docs/en/userver/functional_testing.md for an introduction.
5
6@ingroup userver_testsuite
7"""
8
9from __future__ import annotations
10
11from collections.abc import Mapping
12from collections.abc import Set
13import dataclasses
14import enum
15import itertools
16import json
17import math
18import random
19from typing import Any
20from typing import TypeAlias
21
22
23# @cond
24class MetricType(str, enum.Enum):
25 """
26 The type of individual metric.
27
28 `UNSPECIFIED` compares equal to all `MetricType`s.
29 To disable this behavior, use `is` for comparisons.
30 """
31
32 UNSPECIFIED = 'UNSPECIFIED'
33 GAUGE = 'GAUGE'
34 RATE = 'RATE'
35 HIST_RATE = 'HIST_RATE'
36 # @endcond
37
38
39@dataclasses.dataclass
41 """
42 Represents the value of a HIST_RATE (a.k.a. Histogram) metric.
43
44 Usage example:
45 @snippet testsuite/tests/metrics/test_metrics.py histogram
46
47 Normally obtained from MetricsSnapshot
48 """
49
50 bounds: list[float]
51 buckets: list[int]
52 inf: int
53
54 def count(self) -> int:
55 return sum(self.buckets) + self.inf
56
57 def percentile(self, percent: float) -> float:
58 return _do_compute_percentile(self, percent)
59
60 # @cond
61 def __post_init__(self):
62 assert len(self.bounds) == len(self.buckets)
63 assert sorted(self.bounds) == self.bounds
64 if self.bounds:
65 assert self.bounds[0] > 0
66 assert self.bounds[-1] != math.inf
67
68 # @endcond
69
70
71MetricValue: TypeAlias = float | Histogram
72
73
74@dataclasses.dataclass(frozen=True)
75class Metric:
76 """
77 Metric type that contains the `labels: dict[str, str]` and
78 `value: int`.
79
80 The type is hashable and comparable:
81 @snippet testsuite/tests/metrics/test_metrics.py values set
82
83 @ingroup userver_testsuite
84 """
85
86 labels: dict[str, str]
87 value: MetricValue
88
89 # @cond
90 # Should not be specified explicitly, for internal use only.
91 _type: MetricType = MetricType.UNSPECIFIED
92 # @endcond
93
94 def __eq__(self, other: object) -> bool:
95 if not isinstance(other, Metric):
96 return NotImplemented
97 return self.labels == other.labels and self.value == other.value and _type_eq(self._type, other._type)
98
99 def __hash__(self) -> int:
100 return hash(_get_labels_tuple(self))
101
102 # @cond
103 def __post_init__(self):
104 if isinstance(self.value, Histogram):
105 assert self._type in (MetricType.HIST_RATE, MetricType.UNSPECIFIED)
106 else:
107 assert self._type is not MetricType.HIST_RATE
108
109 # For internal use only.
110 def type(self) -> MetricType:
111 return self._type
112
113 # @endcond
114
115
116class _MetricsJSONEncoder(json.JSONEncoder):
117 def default(self, o): # pylint: disable=method-hidden
118 if isinstance(o, Metric):
119 result = {'labels': o.labels, 'value': o.value}
120 if o.type() is not MetricType.UNSPECIFIED:
121 result['type'] = o.type()
122 return result
123 elif isinstance(o, Histogram):
124 return dataclasses.asdict(o)
125 if isinstance(o, set):
126 return list(o)
127 return super().default(o)
128
129
131 """
132 Snapshot of captured metrics that mimics the dict interface. Metrics have
133 the 'dict[str(path), Set[Metric]]' format.
134
135 @snippet samples/testsuite-support/tests/test_metrics.py metrics labels
136
137 @ingroup userver_testsuite
138 """
139
140 def __init__(self, values: Mapping[str, Set[Metric]]):
141 self._values = values
142
143 def __getitem__(self, path: str) -> Set[Metric]:
144 """Returns a list of metrics by specified path"""
145 return self._values[path]
146
147 def __len__(self) -> int:
148 """Returns count of metrics paths"""
149 return len(self._values)
150
151 def __iter__(self):
152 """Returns a (path, list) iterable over the metrics"""
153 return self._values.__iter__()
154
155 def __contains__(self, path: str) -> bool:
156 """
157 Returns True if metric with specified path is in the snapshot,
158 False otherwise.
159 """
160 return path in self._values
161
162 def __eq__(self, other: object) -> bool:
163 """
164 Compares the snapshot with a dict of metrics or with
165 another snapshot
166 """
167 return self._values == other
168
169 def __repr__(self) -> str:
170 return self._values.__repr__()
171
172 def __str__(self) -> str:
173 return self.pretty_print()
174
175 def get(self, path: str, default=None):
176 """
177 Returns an list of metrics by path or default if there's no
178 such path
179 """
180 return self._values.get(path, default)
181
182 def items(self):
183 """Returns a (path, list) iterable over the metrics"""
184 return self._values.items()
185
186 def keys(self):
187 """Returns an iterable over paths of metrics"""
188 return self._values.keys()
189
190 def values(self):
191 """Returns an iterable over lists of metrics"""
192 return self._values.values()
193
195 self,
196 path: str,
197 labels: dict[str, str] | None = None,
198 *,
199 default: MetricValue | None = None,
200 ) -> MetricValue:
201 """
202 Returns a single metric value at specified path. If a dict of labels
203 is provided, does en exact match of labels (i.e. {} stands for no
204 labels; {'a': 'b', 'c': 'd'} matches only {'a': 'b', 'c': 'd'} or
205 {'c': 'd', 'a': 'b'} but neither match {'a': 'b'} nor
206 {'a': 'b', 'c': 'd', 'e': 'f'}).
207
208 @throws AssertionError if not one metric by path
209 """
210 entry = self.get(path, set())
211 assert entry or default is not None, f'No metrics found by path "{path}"'
212
213 if labels is not None:
214 entry = {x for x in entry if x.labels == labels}
215 assert entry or default is not None, f'No metrics found by path "{path}" and labels {labels}'
216 assert len(entry) <= 1, f'Multiple metrics found by path "{path}" and labels {labels}: {entry}'
217 else:
218 assert len(entry) <= 1, f'Multiple metrics found by path "{path}": {entry}'
219
220 if default is not None and not entry:
221 return default
222 return next(iter(entry)).value
223
225 self,
226 path: str,
227 require_labels: dict[str, str] | None = None,
228 ) -> list[Metric]:
229 """
230 Metrics path must exactly equal the given `path`.
231 A required subset of labels is specified by `require_labels`
232 Example:
233 require_labels={'a':'b', 'c':'d'}
234 { 'a':'b', 'c':'d'} - exact match
235 { 'a':'b', 'c':'d', 'e': 'f', 'h':'k'} - match
236 { 'a':'x', 'c':'d'} - no match, incorrect value for label 'a'
237 { 'a' : 'b'} - required label not found
238 Usage:
239 @code
240 for m in metrics_with_labels(path='something.something.sensor',
241 require_labels={ 'label1': 'value1' }):
242 assert m.value > 0
243 @endcode
244 """
245 entry = self.get(path, set())
246
247 def _is_labels_subset(require_labels, target_labels) -> bool:
248 for req_key, req_val in require_labels.items():
249 if target_labels.get(req_key, None) != req_val:
250 # required label is missing or its value is different
251 return False
252 return True
253
254 if require_labels is not None:
255 return list(
256 filter(
257 lambda x: _is_labels_subset(
258 require_labels=require_labels,
259 target_labels=x.labels,
260 ),
261 entry,
262 ),
263 )
264 else:
265 return list(entry)
266
267 def has_metrics_at(
268 self,
269 path: str,
270 require_labels: dict[str, str] | None = None,
271 ) -> bool:
272 # metrics_with_labels returns list, and pythonic way to check if list
273 # is empty is like this:
274 return bool(self.metrics_at(path, require_labels))
275
277 self,
278 other: Mapping[str, Set[Metric]],
279 *,
280 ignore_zeros: bool = False,
281 ) -> None:
282 """
283 Compares the snapshot with a dict of metrics or with
284 another snapshot, displaying a nice diff on mismatch
285 """
286 lhs = _flatten_snapshot(self, ignore_zeros=ignore_zeros)
287 rhs = _flatten_snapshot(other, ignore_zeros=ignore_zeros)
288 assert lhs == rhs, _diff_metric_snapshots(lhs, rhs, ignore_zeros)
289
290 def pretty_print(self) -> str:
291 """
292 Multiline linear print:
293 path: (label=value),(label=value) TYPE VALUE
294 path: (label=value),(label=value) TYPE VALUE
295 Usage:
296 @code
297 assert 'some.thing.sensor' in metric, metric.pretty_print()
298 @endcode
299 """
300
301 def _iterate_over_mset(path, mset):
302 """print (pretty) one metrics set - for given path"""
303 result = []
304 for metric in sorted(mset, key=lambda x: _get_labels_tuple(x)):
305 result.append(
306 '{}: {} {} {}'.format(
307 path,
308 # labels in form (key=value)
309 ','.join(['({}={})'.format(k, v) for k, v in _get_labels_tuple(metric)]),
310 metric._type.value,
311 metric.value,
312 ),
313 )
314 return result
315
316 # list of lists [ [ string1, string2, string3],
317 # [string4, string5, string6] ]
318 data_for_every_path = [_iterate_over_mset(path, mset) for path, mset in self._values.items()]
319 # use itertools.chain to flatten list
320 # [ string1, string2, string3, string4, string5, string6 ]
321 # and join to convert it to one multiline string
322 return '\n'.join(itertools.chain(*data_for_every_path))
323
324 @staticmethod
325 def from_json(json_str: str) -> MetricsSnapshot:
326 """
327 Construct MetricsSnapshot from a JSON string
328 """
329 json_data = {
330 str(path): {
331 Metric(
332 labels=element['labels'],
333 value=_parse_metric_value(element['value']),
334 _type=MetricType[element.get('type', 'UNSPECIFIED')],
335 )
336 for element in metrics_list
337 }
338 for path, metrics_list in json.loads(json_str).items()
339 }
340 return MetricsSnapshot(json_data)
341
342 def to_json(self) -> str:
343 """
344 Serialize to a JSON string
345 """
346 return json.dumps(
347 # Shuffle to disallow depending on the received metrics order.
348 {path: random.sample(list(metrics), len(metrics)) for path, metrics in self._values.items()},
349 cls=_MetricsJSONEncoder,
350 )
351
352
353def _type_eq(lhs: MetricType, rhs: MetricType) -> bool:
354 return lhs == rhs or lhs == MetricType.UNSPECIFIED or rhs == MetricType.UNSPECIFIED # noqa: PLR1714
355
356
357def _get_labels_tuple(metric: Metric) -> tuple[tuple[str, str], ...]:
358 """Returns labels as a tuple of sorted items"""
359 return tuple(sorted(metric.labels.items()))
360
361
362def _do_compute_percentile(hist: Histogram, percent: float) -> float:
363 # This implementation is O(hist.count()), which is less than perfect.
364 # So far, this was not a big enough pain to rewrite it.
365 value_lists = [[bound] * bucket for (bucket, bound) in zip(hist.buckets, hist.bounds, strict=True)] + [
366 [math.inf] * hist.inf
367 ]
368 values = [item for sublist in value_lists for item in sublist]
369
370 # Implementation taken from:
371 # https://stackoverflow.com/a/2753343/5173839
372 if not values:
373 return 0
374 pivot = (len(values) - 1) * percent
375 floor = math.floor(pivot)
376 ceil = math.ceil(pivot)
377 if floor == ceil:
378 return values[int(pivot)]
379 part1 = values[int(floor)] * (ceil - pivot)
380 part2 = values[int(ceil)] * (pivot - floor)
381 return part1 + part2
382
383
384def _parse_metric_value(value: Any) -> MetricValue:
385 if isinstance(value, dict):
386 return Histogram(
387 bounds=value['bounds'],
388 buckets=value['buckets'],
389 inf=value['inf'],
390 )
391 elif isinstance(value, float):
392 return value
393 elif isinstance(value, int):
394 return value
395 else:
396 raise Exception(f'Failed to parse metric value from {value!r}')
397
398
399_FlattenedSnapshot: TypeAlias = Set[tuple[str, Metric]]
400
401
402def _flatten_snapshot(values, ignore_zeros: bool) -> _FlattenedSnapshot:
403 return {
404 (path, metric)
405 for path, metrics in values.items()
406 for metric in metrics
407 if metric.value != 0 or not ignore_zeros
408 }
409
410
411def _diff_metric_snapshots(
412 lhs: _FlattenedSnapshot,
413 rhs: _FlattenedSnapshot,
414 ignore_zeros: bool,
415) -> str:
416 def extra_metrics_message(extra, base):
417 return [
418 f' path={path!r} labels={metric.labels!r} value={metric.value}'
419 for path, metric in sorted(extra, key=lambda pair: pair[0])
420 if (path, metric) not in base
421 ]
422
423 if ignore_zeros:
424 lines = ['left.assert_equals(right, ignore_zeros=True) failed']
425 else:
426 lines = ['left.assert_equals(right) failed']
427 actual_extra = extra_metrics_message(lhs, rhs)
428 if actual_extra:
429 lines.append(' extra in left:')
430 lines += actual_extra
431
432 actual_gt = extra_metrics_message(rhs, lhs)
433 if actual_gt:
434 lines.append(' missing in left:')
435 lines += actual_gt
436
437 return '\n'.join(lines)