2Python module that provides helpers for functional testing of metrics with
4@ref scripts/docs/en/userver/functional_testing.md for an introduction.
6@ingroup userver_testsuite
9from __future__
import annotations
11from collections.abc
import Mapping
12from collections.abc
import Set
20from typing
import TypeAlias
24class MetricType(str, enum.Enum):
26 The type of individual metric.
28 `UNSPECIFIED` compares equal to all `MetricType`s.
29 To disable this behavior, use `is` for comparisons.
32 UNSPECIFIED =
'UNSPECIFIED'
35 HIST_RATE =
'HIST_RATE'
42 Represents the value of a HIST_RATE (a.k.a. Histogram) metric.
45 @snippet testsuite/tests/metrics/test_metrics.py histogram
47 Normally obtained from MetricsSnapshot
54 def count(self) -> int:
55 return sum(self.
buckets) + self.inf
57 def percentile(self, percent: float) -> float:
58 return _do_compute_percentile(self, percent)
61 def __post_init__(self):
66 assert self.
bounds[-1] != math.inf
71MetricValue: TypeAlias = float | Histogram
74@dataclasses.dataclass(frozen=True)
77 Metric type that contains the `labels: dict[str, str]` and
80 The type is hashable and comparable:
81 @snippet testsuite/tests/metrics/test_metrics.py values set
83 @ingroup userver_testsuite
86 labels: dict[str, str]
91 _type: MetricType = MetricType.UNSPECIFIED
94 def __eq__(self, other: object) -> bool:
95 if not isinstance(other, Metric):
97 return self.
labels == other.labels
and self.value == other.value
and _type_eq(self._type, other._type)
99 def __hash__(self) -> int:
103 def __post_init__(self):
104 if isinstance(self.value, Histogram):
105 assert self._type
in (MetricType.HIST_RATE, MetricType.UNSPECIFIED)
107 assert self._type
is not MetricType.HIST_RATE
110 def type(self) -> MetricType:
117 def default(self, o):
118 if isinstance(o, Metric):
119 result = {
'labels': o.labels,
'value': o.value}
120 if o.type()
is not MetricType.UNSPECIFIED:
121 result[
'type'] = o.type()
123 elif isinstance(o, Histogram):
124 return dataclasses.asdict(o)
125 if isinstance(o, set):
127 return super().default(o)
132 Snapshot of captured metrics that mimics the dict interface. Metrics have
133 the 'dict[str(path), Set[Metric]]' format.
135 @snippet samples/testsuite-support/tests/test_metrics.py metrics labels
137 @ingroup userver_testsuite
140 def __init__(self, values: Mapping[str, Set[Metric]]):
144 """Returns a list of metrics by specified path"""
148 """Returns count of metrics paths"""
152 """Returns a (path, list) iterable over the metrics"""
157 Returns True if metric with specified path is in the snapshot,
164 Compares the snapshot with a dict of metrics or with
169 def __repr__(self) -> str:
172 def __str__(self) -> str:
175 def get(self, path: str, default=
None):
177 Returns an list of metrics by path or default if there's no
183 """Returns a (path, list) iterable over the metrics"""
187 """Returns an iterable over paths of metrics"""
191 """Returns an iterable over lists of metrics"""
197 labels: dict[str, str] |
None =
None,
199 default: MetricValue |
None =
None,
202 Returns a single metric value at specified path. If a dict of labels
203 is provided, does en exact match of labels (i.e. {} stands for no
204 labels; {'a': 'b', 'c': 'd'} matches only {'a': 'b', 'c': 'd'} or
205 {'c': 'd', 'a': 'b'} but neither match {'a': 'b'} nor
206 {'a': 'b', 'c': 'd', 'e': 'f'}).
208 @throws AssertionError if not one metric by path
210 entry = self.
get(path, set())
211 assert entry
or default
is not None, f
'No metrics found by path "{path}"'
213 if labels
is not None:
214 entry = {x
for x
in entry
if x.labels == labels}
215 assert entry
or default
is not None, f
'No metrics found by path "{path}" and labels {labels}'
216 assert len(entry) <= 1, f
'Multiple metrics found by path "{path}" and labels {labels}: {entry}'
218 assert len(entry) <= 1, f
'Multiple metrics found by path "{path}": {entry}'
220 if default
is not None and not entry:
222 return next(iter(entry)).value
227 require_labels: dict[str, str] |
None =
None,
230 Metrics path must exactly equal the given `path`.
231 A required subset of labels is specified by `require_labels`
233 require_labels={'a':'b', 'c':'d'}
234 { 'a':'b', 'c':'d'} - exact match
235 { 'a':'b', 'c':'d', 'e': 'f', 'h':'k'} - match
236 { 'a':'x', 'c':'d'} - no match, incorrect value for label 'a'
237 { 'a' : 'b'} - required label not found
240 for m in metrics_with_labels(path='something.something.sensor',
241 require_labels={ 'label1': 'value1' }):
245 entry = self.
get(path, set())
247 def _is_labels_subset(require_labels, target_labels) -> bool:
248 for req_key, req_val
in require_labels.items():
249 if target_labels.get(req_key,
None) != req_val:
254 if require_labels
is not None:
257 lambda x: _is_labels_subset(
258 require_labels=require_labels,
259 target_labels=x.labels,
270 require_labels: dict[str, str] |
None =
None,
274 return bool(self.
metrics_at(path, require_labels))
278 other: Mapping[str, Set[Metric]],
280 ignore_zeros: bool =
False,
283 Compares the snapshot with a dict of metrics or with
284 another snapshot, displaying a nice diff on mismatch
286 lhs = _flatten_snapshot(self, ignore_zeros=ignore_zeros)
287 rhs = _flatten_snapshot(other, ignore_zeros=ignore_zeros)
288 assert lhs == rhs, _diff_metric_snapshots(lhs, rhs, ignore_zeros)
292 Multiline linear print:
293 path: (label=value),(label=value) TYPE VALUE
294 path: (label=value),(label=value) TYPE VALUE
297 assert 'some.thing.sensor' in metric, metric.pretty_print()
301 def _iterate_over_mset(path, mset):
302 """print (pretty) one metrics set - for given path"""
306 '{}: {} {} {}'.format(
318 data_for_every_path = [_iterate_over_mset(path, mset)
for path, mset
in self.
_values.
items()]
322 return '\n'.join(itertools.chain(*data_for_every_path))
327 Construct MetricsSnapshot from a JSON string
332 labels=element[
'labels'],
333 value=_parse_metric_value(element[
'value']),
334 _type=MetricType[element.get(
'type',
'UNSPECIFIED')],
336 for element
in metrics_list
338 for path, metrics_list
in json.loads(json_str).
items()
344 Serialize to a JSON string
348 {path: random.sample(list(metrics), len(metrics))
for path, metrics
in self.
_values.
items()},
349 cls=_MetricsJSONEncoder,
353def _type_eq(lhs: MetricType, rhs: MetricType) -> bool:
354 return lhs == rhs
or lhs == MetricType.UNSPECIFIED
or rhs == MetricType.UNSPECIFIED
358 """Returns labels as a tuple of sorted items"""
359 return tuple(sorted(metric.labels.items()))
362def _do_compute_percentile(hist: Histogram, percent: float) -> float:
365 value_lists = [[bound] * bucket
for (bucket, bound)
in zip(hist.buckets, hist.bounds, strict=
True)] + [
366 [math.inf] * hist.inf
368 values = [item
for sublist
in value_lists
for item
in sublist]
374 pivot = (len(values) - 1) * percent
375 floor = math.floor(pivot)
376 ceil = math.ceil(pivot)
378 return values[int(pivot)]
379 part1 = values[int(floor)] * (ceil - pivot)
380 part2 = values[int(ceil)] * (pivot - floor)
384def _parse_metric_value(value: Any) -> MetricValue:
385 if isinstance(value, dict):
387 bounds=value[
'bounds'],
388 buckets=value[
'buckets'],
391 elif isinstance(value, float):
393 elif isinstance(value, int):
396 raise Exception(f
'Failed to parse metric value from {value!r}')
399_FlattenedSnapshot: TypeAlias = Set[tuple[str, Metric]]
402def _flatten_snapshot(values, ignore_zeros: bool) -> _FlattenedSnapshot:
405 for path, metrics
in values.items()
406 for metric
in metrics
407 if metric.value != 0
or not ignore_zeros
411def _diff_metric_snapshots(
412 lhs: _FlattenedSnapshot,
413 rhs: _FlattenedSnapshot,
416 def extra_metrics_message(extra, base):
418 f
' path={path!r} labels={metric.labels!r} value={metric.value}'
419 for path, metric
in sorted(extra, key=
lambda pair: pair[0])
420 if (path, metric)
not in base
424 lines = [
'left.assert_equals(right, ignore_zeros=True) failed']
426 lines = [
'left.assert_equals(right) failed']
427 actual_extra = extra_metrics_message(lhs, rhs)
429 lines.append(
' extra in left:')
430 lines += actual_extra
432 actual_gt = extra_metrics_message(rhs, lhs)
434 lines.append(
' missing in left:')
437 return '\n'.join(lines)