userver: /data/code/userver/testsuite/pytest_plugins/pytest_userver/metrics.py Source File
⚠️ This is the documentation for an old userver version. Click here to switch to the latest version.
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages Concepts
metrics.py
1"""
2Python module that provides helpers for functional testing of metrics with
3testsuite; see
4@ref scripts/docs/en/userver/functional_testing.md for an introduction.
5
6@ingroup userver_testsuite
7"""
8
9import dataclasses
10import enum
11import itertools
12import json
13import math
14import random
15import typing
16
17
18# @cond
19class MetricType(str, enum.Enum):
20 """
21 The type of individual metric.
22
23 `UNSPECIFIED` compares equal to all `MetricType`s.
24 To disable this behavior, use `is` for comparisons.
25 """
26
27 UNSPECIFIED = 'UNSPECIFIED'
28 GAUGE = 'GAUGE'
29 RATE = 'RATE'
30 HIST_RATE = 'HIST_RATE'
31 # @endcond
32
33
34@dataclasses.dataclass
36 """
37 Represents the value of a HIST_RATE (a.k.a. Histogram) metric.
38
39 Usage example:
40 @snippet testsuite/tests/test_metrics.py histogram
41
42 Normally obtained from MetricsSnapshot
43 """
44
45 bounds: typing.List[float]
46 buckets: typing.List[int]
47 inf: int
48
49 def count(self) -> int:
50 return sum(self.bucketsbuckets) + self.inf
51
52 def percentile(self, percent: float) -> float:
53 return _do_compute_percentile(self, percent)
54
55 # @cond
56 def __post_init__(self):
57 assert len(self.bounds) == len(self.bucketsbuckets)
58 assert sorted(self.bounds) == self.bounds
59 if self.bounds:
60 assert self.bounds[0] > 0
61 assert self.bounds[-1] != math.inf
62
63 # @endcond
64
65
66MetricValue = typing.Union[float, Histogram]
67
68
69@dataclasses.dataclass(frozen=True)
70class Metric:
71 """
72 Metric type that contains the `labels: typing.Dict[str, str]` and
73 `value: int`.
74
75 The type is hashable and comparable:
76 @snippet testsuite/tests/test_metrics.py values set
77
78 @ingroup userver_testsuite
79 """
80
81 labels: typing.Dict[str, str]
82 value: MetricValue
83
84 # @cond
85 # Should not be specified explicitly, for internal use only.
86 _type: MetricType = MetricType.UNSPECIFIED
87 # @endcond
88
89 def __eq__(self, other: typing.Any) -> bool:
90 if not isinstance(other, Metric):
91 return NotImplemented
92 return (
93 self.labelslabels == other.labels
94 and self.valuevalue == other.value
95 and _type_eq(self._type, other._type)
96 )
97
98 def __hash__(self) -> int:
99 return hash(_get_labels_tuple(self))
100
101 # @cond
102 def __post_init__(self):
103 if isinstance(self.valuevalue, Histogram):
104 assert (
105 self._type == MetricType.HIST_RATE
106 or self._type == MetricType.UNSPECIFIED
107 )
108 else:
109 assert self._type is not MetricType.HIST_RATE
110
111 # For internal use only.
112 def type(self) -> MetricType:
113 return self._type
114
115 # @endcond
116
117
118class _MetricsJSONEncoder(json.JSONEncoder):
119 def default(self, o): # pylint: disable=method-hidden
120 if isinstance(o, Metric):
121 result = {'labels': o.labels, 'value': o.value}
122 if o.type() is not MetricType.UNSPECIFIED:
123 result['type'] = o.type()
124 return result
125 elif isinstance(o, Histogram):
126 return dataclasses.asdict(o)
127 if isinstance(o, set):
128 return list(o)
129 return super().default(o)
130
131
133 """
134 Snapshot of captured metrics that mimics the dict interface. Metrics have
135 the 'Dict[str(path), Set[Metric]]' format.
136
137 @snippet samples/testsuite-support/tests/test_metrics.py metrics labels
138
139 @ingroup userver_testsuite
140 """
141
142 def __init__(self, values: typing.Mapping[str, typing.Set[Metric]]):
143 self._values = values
144
145 def __getitem__(self, path: str) -> typing.Set[Metric]:
146 """ Returns a list of metrics by specified path """
147 return self._values[path]
148
149 def __len__(self) -> int:
150 """ Returns count of metrics paths """
151 return len(self._values)
152
153 def __iter__(self):
154 """ Returns a (path, list) iterable over the metrics """
155 return self._values.__iter__()
156
157 def __contains__(self, path: str) -> bool:
158 """
159 Returns True if metric with specified path is in the snapshot,
160 False otherwise.
161 """
162 return path in self._values
163
164 def __eq__(self, other: object) -> bool:
165 """
166 Compares the snapshot with a dict of metrics or with
167 another snapshot
168 """
169 return self._values == other
170
171 def __repr__(self) -> str:
172 return self._values.__repr__()
173
174 def __str__(self) -> str:
175 return self.pretty_print()
176
177 def get(self, path: str, default=None):
178 """
179 Returns an list of metrics by path or default if there's no
180 such path
181 """
182 return self._values.get(path, default)
183
184 def items(self):
185 """ Returns a (path, list) iterable over the metrics """
186 return self._values.items()
187
188 def keys(self):
189 """ Returns an iterable over paths of metrics """
190 return self._values.keys()
191
192 def values(self):
193 """ Returns an iterable over lists of metrics """
194 return self._values.values()
195
197 self,
198 path: str,
199 labels: typing.Optional[typing.Dict] = None,
200 *,
201 default: typing.Optional[MetricValue] = None,
202 ) -> MetricValue:
203 """
204 Returns a single metric value at specified path. If a dict of labels
205 is provided, does en exact match of labels (i.e. {} stands for no
206 labels; {'a': 'b', 'c': 'd'} matches only {'a': 'b', 'c': 'd'} or
207 {'c': 'd', 'a': 'b'} but neither match {'a': 'b'} nor
208 {'a': 'b', 'c': 'd', 'e': 'f'}).
209
210 @throws AssertionError if not one metric by path
211 """
212 entry = self.get(path, set())
213 assert (
214 entry or default is not None
215 ), f'No metrics found by path "{path}"'
216
217 if labels is not None:
218 entry = {x for x in entry if x.labels == labels}
219 assert (
220 entry or default is not None
221 ), f'No metrics found by path "{path}" and labels {labels}'
222 assert len(entry) <= 1, (
223 f'Multiple metrics found by path "{path}" and labels {labels}:'
224 f' {entry}'
225 )
226 else:
227 assert (
228 len(entry) <= 1
229 ), f'Multiple metrics found by path "{path}": {entry}'
230
231 if default is not None and not entry:
232 return default
233 return next(iter(entry)).value
234
236 self,
237 path: str,
238 require_labels: typing.Optional[typing.Dict] = None,
239 ) -> typing.List[Metric]:
240 """
241 Metrics path must exactly equal the given `path`.
242 A required subset of labels is specified by `require_labels`
243 Example:
244 require_labels={'a':'b', 'c':'d'}
245 { 'a':'b', 'c':'d'} - exact match
246 { 'a':'b', 'c':'d', 'e': 'f', 'h':'k'} - match
247 { 'a':'x', 'c':'d'} - no match, incorrect value for label 'a'
248 { 'a' : 'b'} - required label not found
249 Usage:
250 @code
251 for m in metrics_with_labels(path='something.something.sensor',
252 require_labels={ 'label1': 'value1' }):
253 assert m.value > 0
254 @endcode
255 """
256 entry = self.get(path, set())
257
258 def _is_labels_subset(require_labels, target_labels) -> bool:
259 for req_key, req_val in require_labels.items():
260 if target_labels.get(req_key, None) != req_val:
261 # required label is missing or its value is different
262 return False
263 return True
264
265 if require_labels is not None:
266 return list(
267 filter(
268 lambda x: _is_labels_subset(
269 require_labels=require_labels, target_labels=x.labels,
270 ),
271 entry,
272 ),
273 )
274 else:
275 return list(entry)
276
277 def has_metrics_at(
278 self,
279 path: str,
280 require_labels: typing.Optional[typing.Dict] = None,
281 ) -> bool:
282 # metrics_with_labels returns list, and pythonic way to check if list
283 # is empty is like this:
284 return bool(self.metrics_at(path, require_labels))
285
287 self,
288 other: typing.Mapping[str, typing.Set[Metric]],
289 *,
290 ignore_zeros: bool = False,
291 ) -> None:
292 """
293 Compares the snapshot with a dict of metrics or with
294 another snapshot, displaying a nice diff on mismatch
295 """
296 lhs = _flatten_snapshot(self, ignore_zeros=ignore_zeros)
297 rhs = _flatten_snapshot(other, ignore_zeros=ignore_zeros)
298 assert lhs == rhs, _diff_metric_snapshots(lhs, rhs, ignore_zeros)
299
300 def pretty_print(self) -> str:
301 """
302 Multiline linear print:
303 path: (label=value),(label=value) TYPE VALUE
304 path: (label=value),(label=value) TYPE VALUE
305 Usage:
306 @code
307 assert 'some.thing.sensor' in metric, metric.pretty_print()
308 @endcode
309 """
310
311 def _iterate_over_mset(path, mset):
312 """ print (pretty) one metrics set - for given path """
313 result = []
314 for metric in sorted(mset, key=lambda x: _get_labels_tuple(x)):
315 result.append(
316 '{}: {} {} {}'.format(
317 path,
318 # labels in form (key=value)
319 ','.join(
320 [
321 '({}={})'.format(k, v)
322 for k, v in _get_labels_tuple(metric)
323 ],
324 ),
325 metric._type.value,
326 metric.value,
327 ),
328 )
329 return result
330
331 # list of lists [ [ string1, string2, string3],
332 # [string4, string5, string6] ]
333 data_for_every_path = [
334 _iterate_over_mset(path, mset)
335 for path, mset in self._values.items()
336 ]
337 # use itertools.chain to flatten list
338 # [ string1, string2, string3, string4, string5, string6 ]
339 # and join to convert it to one multiline string
340 return '\n'.join(itertools.chain(*data_for_every_path))
341
342 @staticmethod
343 def from_json(json_str: str) -> 'MetricsSnapshot':
344 """
345 Construct MetricsSnapshot from a JSON string
346 """
347 json_data = {
348 str(path): {
349 Metric(
350 labels=element['labels'],
351 value=_parse_metric_value(element['value']),
352 _type=MetricType[element.get('type', 'UNSPECIFIED')],
353 )
354 for element in metrics_list
355 }
356 for path, metrics_list in json.loads(json_str).items()
357 }
358 return MetricsSnapshot(json_data)
359
360 def to_json(self) -> str:
361 """
362 Serialize to a JSON string
363 """
364 return json.dumps(
365 # Shuffle to disallow depending on the received metrics order.
366 {
367 path: random.sample(list(metrics), len(metrics))
368 for path, metrics in self._values.items()
369 },
370 cls=_MetricsJSONEncoder,
371 )
372
373
374def _type_eq(lhs: MetricType, rhs: MetricType) -> bool:
375 return (
376 lhs == rhs
377 or lhs == MetricType.UNSPECIFIED
378 or rhs == MetricType.UNSPECIFIED
379 )
380
381
382def _get_labels_tuple(metric: Metric) -> typing.Tuple:
383 """ Returns labels as a tuple of sorted items """
384 return tuple(sorted(metric.labels.items()))
385
386
387def _do_compute_percentile(hist: Histogram, percent: float) -> float:
388 # This implementation is O(hist.count()), which is less than perfect.
389 # So far, this was not a big enough pain to rewrite it.
390 value_lists = [
391 [bound] * bucket for (bucket, bound) in zip(hist.buckets, hist.bounds)
392 ] + [[math.inf] * hist.inf]
393 values = [item for sublist in value_lists for item in sublist]
394
395 # Implementation taken from:
396 # https://stackoverflow.com/a/2753343/5173839
397 if not values:
398 return 0
399 pivot = (len(values) - 1) * percent
400 floor = math.floor(pivot)
401 ceil = math.ceil(pivot)
402 if floor == ceil:
403 return values[int(pivot)]
404 part1 = values[int(floor)] * (ceil - pivot)
405 part2 = values[int(ceil)] * (pivot - floor)
406 return part1 + part2
407
408
409def _parse_metric_value(value: typing.Any) -> MetricValue:
410 if isinstance(value, dict):
411 return Histogram(
412 bounds=value['bounds'], buckets=value['buckets'], inf=value['inf'],
413 )
414 elif isinstance(value, float):
415 return value
416 elif isinstance(value, int):
417 return value
418 else:
419 raise Exception(f'Failed to parse metric value from {value!r}')
420
421
422_FlattenedSnapshot = typing.Set[typing.Tuple[str, Metric]]
423
424
425def _flatten_snapshot(values, ignore_zeros: bool) -> _FlattenedSnapshot:
426 return set(
427 (path, metric)
428 for path, metrics in values.items()
429 for metric in metrics
430 if metric.value != 0 or not ignore_zeros
431 )
432
433
434def _diff_metric_snapshots(
435 lhs: _FlattenedSnapshot, rhs: _FlattenedSnapshot, ignore_zeros: bool,
436) -> str:
437 def extra_metrics_message(extra, base):
438 return [
439 f' path={path!r} labels={metric.labels!r} value={metric.value}'
440 for path, metric in sorted(extra, key=lambda pair: pair[0])
441 if (path, metric) not in base
442 ]
443
444 if ignore_zeros:
445 lines = ['left.assert_equals(right, ignore_zeros=True) failed']
446 else:
447 lines = ['left.assert_equals(right) failed']
448 actual_extra = extra_metrics_message(lhs, rhs)
449 if actual_extra:
450 lines.append(' extra in left:')
451 lines += actual_extra
452
453 actual_gt = extra_metrics_message(rhs, lhs)
454 if actual_gt:
455 lines.append(' missing in left:')
456 lines += actual_gt
457
458 return '\n'.join(lines)