userver: /data/code/userver/testsuite/pytest_plugins/pytest_userver/metrics.py Source File
Loading...
Searching...
No Matches
metrics.py
1"""
2Python module that provides helpers for functional testing of metrics with
3testsuite; see
4@ref scripts/docs/en/userver/functional_testing.md for an introduction.
5
6@ingroup userver_testsuite
7"""
8
9import dataclasses
10import enum
11import itertools
12import json
13import math
14import random
15import typing
16
17
18# @cond
19class MetricType(str, enum.Enum):
20 """
21 The type of individual metric.
22
23 `UNSPECIFIED` compares equal to all `MetricType`s.
24 To disable this behavior, use `is` for comparisons.
25 """
26
27 UNSPECIFIED = 'UNSPECIFIED'
28 GAUGE = 'GAUGE'
29 RATE = 'RATE'
30 HIST_RATE = 'HIST_RATE'
31 # @endcond
32
33
34@dataclasses.dataclass
36 """
37 Represents the value of a HIST_RATE (a.k.a. Histogram) metric.
38
39 Usage example:
40 @snippet testsuite/tests/test_metrics.py histogram
41
42 Normally obtained from MetricsSnapshot
43 """
44
45 bounds: typing.List[float]
46 buckets: typing.List[int]
47 inf: int
48
49 def count(self) -> int:
50 return sum(self.bucketsbuckets) + self.inf
51
52 def percentile(self, percent: float) -> float:
53 return _do_compute_percentile(self, percent)
54
55 # @cond
56 def __post_init__(self):
57 assert len(self.bounds) == len(self.bucketsbuckets)
58 assert sorted(self.bounds) == self.bounds
59 if self.bounds:
60 assert self.bounds[0] > 0
61 assert self.bounds[-1] != math.inf
62
63 # @endcond
64
65
66MetricValue = typing.Union[float, Histogram]
67
68
69@dataclasses.dataclass(frozen=True)
70class Metric:
71 """
72 Metric type that contains the `labels: typing.Dict[str, str]` and
73 `value: int`.
74
75 The type is hashable and comparable:
76 @snippet testsuite/tests/test_metrics.py values set
77
78 @ingroup userver_testsuite
79 """
80
81 labels: typing.Dict[str, str]
82 value: MetricValue
83
84 # @cond
85 # Should not be specified explicitly, for internal use only.
86 _type: MetricType = MetricType.UNSPECIFIED
87 # @endcond
88
89 def __eq__(self, other: typing.Any) -> bool:
90 if not isinstance(other, Metric):
91 return NotImplemented
92 return (
93 self.labelslabels == other.labels
94 and self.valuevalue == other.value
95 and _type_eq(self._type, other._type)
96 )
97
98 def __hash__(self) -> int:
99 return hash(_get_labels_tuple(self))
100
101 # @cond
102 def __post_init__(self):
103 if isinstance(self.valuevalue, Histogram):
104 assert (
105 self._type == MetricType.HIST_RATE
106 or self._type == MetricType.UNSPECIFIED
107 )
108 else:
109 assert self._type is not MetricType.HIST_RATE
110
111 # For internal use only.
112 def type(self) -> MetricType:
113 return self._type
114
115 # @endcond
116
117
118class _MetricsJSONEncoder(json.JSONEncoder):
119 def default(self, o): # pylint: disable=method-hidden
120 if isinstance(o, Metric):
121 result = {'labels': o.labels, 'value': o.value}
122 if o.type() is not MetricType.UNSPECIFIED:
123 result['type'] = o.type()
124 return result
125 elif isinstance(o, Histogram):
126 return dataclasses.asdict(o)
127 if isinstance(o, set):
128 return list(o)
129 return super().default(o)
130
131
133 """
134 Snapshot of captured metrics that mimics the dict interface. Metrics have
135 the 'Dict[str(path), Set[Metric]]' format.
136
137 @snippet samples/testsuite-support/tests/test_metrics.py metrics labels
138
139 @ingroup userver_testsuite
140 """
141
142 def __init__(self, values: typing.Mapping[str, typing.Set[Metric]]):
143 self._values = values
144
145 def __getitem__(self, path: str) -> typing.Set[Metric]:
146 """ Returns a list of metrics by specified path """
147 return self._values[path]
148
149 def __len__(self) -> int:
150 """ Returns count of metrics paths """
151 return len(self._values)
152
153 def __iter__(self):
154 """ Returns a (path, list) iterable over the metrics """
155 return self._values.__iter__()
156
157 def __contains__(self, path: str) -> bool:
158 """
159 Returns True if metric with specified path is in the snapshot,
160 False otherwise.
161 """
162 return path in self._values
163
164 def __eq__(self, other: object) -> bool:
165 """
166 Compares the snapshot with a dict of metrics or with
167 another snapshot
168 """
169 return self._values == other
170
171 def __repr__(self) -> str:
172 return self._values.__repr__()
173
174 def __str__(self) -> str:
175 return self.pretty_print()
176
177 def get(self, path: str, default=None):
178 """
179 Returns an list of metrics by path or default if there's no
180 such path
181 """
182 return self._values.get(path, default)
183
184 def items(self):
185 """ Returns a (path, list) iterable over the metrics """
186 return self._values.items()
187
188 def keys(self):
189 """ Returns an iterable over paths of metrics """
190 return self._values.keys()
191
192 def values(self):
193 """ Returns an iterable over lists of metrics """
194 return self._values.values()
195
197 self,
198 path: str,
199 labels: typing.Optional[typing.Dict] = None,
200 *,
201 default: typing.Optional[MetricValue] = None,
202 ) -> MetricValue:
203 """
204 Returns a single metric value at specified path. If a dict of labels
205 is provided, does en exact match of labels (i.e. {} stands for no
206 labels; {'a': 'b', 'c': 'd'} matches only {'a': 'b', 'c': 'd'} or
207 {'c': 'd', 'a': 'b'} but neither match {'a': 'b'} nor
208 {'a': 'b', 'c': 'd', 'e': 'f'}).
209
210 @throws AssertionError if not one metric by path
211 """
212 entry = self.get(path, set())
213 assert (
214 entry or default is not None
215 ), f'No metrics found by path "{path}"'
216
217 if labels is not None:
218 entry = {x for x in entry if x.labels == labels}
219 assert (
220 entry or default is not None
221 ), f'No metrics found by path "{path}" and labels {labels}'
222 assert len(entry) <= 1, (
223 f'Multiple metrics found by path "{path}" and labels {labels}:'
224 f' {entry}'
225 )
226 else:
227 assert (
228 len(entry) <= 1
229 ), f'Multiple metrics found by path "{path}": {entry}'
230
231 if default is not None and not entry:
232 return default
233 return next(iter(entry)).value
234
236 self,
237 path: str,
238 require_labels: typing.Optional[typing.Dict] = None,
239 ) -> typing.List[Metric]:
240 """
241 Metrics path must exactly equal the given `path`.
242 A required subset of labels is specified by `require_labels`
243 Example:
244 require_labels={'a':'b', 'c':'d'}
245 { 'a':'b', 'c':'d'} - exact match
246 { 'a':'b', 'c':'d', 'e': 'f', 'h':'k'} - match
247 { 'a':'x', 'c':'d'} - no match, incorrect value for label 'a'
248 { 'a' : 'b'} - required label not found
249 Usage:
250 @code
251 for m in metrics_with_labels(path='something.something.sensor',
252 require_labels={ 'label1': 'value1' }):
253 assert m.value > 0
254 @endcode
255 """
256 entry = self.get(path, set())
257
258 def _is_labels_subset(require_labels, target_labels) -> bool:
259 for req_key, req_val in require_labels.items():
260 if target_labels.get(req_key, None) != req_val:
261 # required label is missing or its value is different
262 return False
263 return True
264
265 if require_labels is not None:
266 return list(
267 filter(
268 lambda x: _is_labels_subset(
269 require_labels=require_labels, target_labels=x.labels,
270 ),
271 entry,
272 ),
273 )
274 else:
275 return list(entry)
276
277 def has_metrics_at(
278 self,
279 path: str,
280 require_labels: typing.Optional[typing.Dict] = None,
281 ) -> bool:
282 # metrics_with_labels returns list, and pythonic way to check if list
283 # is empty is like this:
284 return bool(self.metrics_at(path, require_labels))
285
287 self,
288 other: typing.Mapping[str, typing.Set[Metric]],
289 *,
290 ignore_zeros: bool = False,
291 ) -> None:
292 """
293 Compares the snapshot with a dict of metrics or with
294 another snapshot, displaying a nice diff on mismatch
295 """
296 lhs = _flatten_snapshot(self, ignore_zeros=ignore_zeros)
297 rhs = _flatten_snapshot(other, ignore_zeros=ignore_zeros)
298 assert lhs == rhs, _diff_metric_snapshots(lhs, rhs, ignore_zeros)
299
300 def pretty_print(self) -> str:
301 """
302 Multiline linear print:
303 path: (label=value),(label=value) TYPE VALUE
304 path: (label=value),(label=value) TYPE VALUE
305 Usage:
306 @code
307 assert 'some.thing.sensor' in metric, metric.pretty_print()
308 @endcode
309 """
310
311 def _iterate_over_mset(path, mset):
312 """ print (pretty) one metrics set - for given path """
313 result = []
314 for metric in sorted(mset, key=lambda x: _get_labels_tuple(x)):
315 result.append(
316 '{}: {} {} {}'.format(
317 path,
318 # labels in form (key=value)
319 ','.join(
320 [
321 '({}={})'.format(k, v)
322 for k, v in _get_labels_tuple(metric)
323 ],
324 ),
325 metric._type.value,
326 metric.value,
327 ),
328 )
329 return result
330
331 # list of lists [ [ string1, string2, string3],
332 # [string4, string5, string6] ]
333 data_for_every_path = [
334 _iterate_over_mset(path, mset)
335 for path, mset in self._values.items()
336 ]
337 # use itertools.chain to flatten list
338 # [ string1, string2, string3, string4, string5, string6 ]
339 # and join to convert it to one multiline string
340 return '\n'.join(itertools.chain(*data_for_every_path))
341
342 @staticmethod
343 def from_json(json_str: str) -> 'MetricsSnapshot':
344 """
345 Construct MetricsSnapshot from a JSON string
346 """
347 json_data = {
348 str(path): {
349 Metric(
350 labels=element['labels'],
351 value=_parse_metric_value(element['value']),
352 _type=MetricType[element.get('type', 'UNSPECIFIED')],
353 )
354 for element in metrics_list
355 }
356 for path, metrics_list in json.loads(json_str).items()
357 }
358 return MetricsSnapshot(json_data)
359
360 def to_json(self) -> str:
361 """
362 Serialize to a JSON string
363 """
364 return json.dumps(
365 # Shuffle to disallow depending on the received metrics order.
366 {
367 path: random.sample(list(metrics), len(metrics))
368 for path, metrics in self._values.items()
369 },
370 cls=_MetricsJSONEncoder,
371 )
372
373
374def _type_eq(lhs: MetricType, rhs: MetricType) -> bool:
375 return (
376 lhs == rhs
377 or lhs == MetricType.UNSPECIFIED
378 or rhs == MetricType.UNSPECIFIED
379 )
380
381
382def _get_labels_tuple(metric: Metric) -> typing.Tuple:
383 """ Returns labels as a tuple of sorted items """
384 return tuple(sorted(metric.labels.items()))
385
386
387def _do_compute_percentile(hist: Histogram, percent: float) -> float:
388 # This implementation is O(hist.count()), which is less than perfect.
389 # So far, this was not a big enough pain to rewrite it.
390 value_lists = [
391 [bound] * bucket for (bucket, bound) in zip(hist.buckets, hist.bounds)
392 ] + [[math.inf] * hist.inf]
393 values = [item for sublist in value_lists for item in sublist]
394
395 # Implementation taken from:
396 # https://stackoverflow.com/a/2753343/5173839
397 if not values:
398 return 0
399 pivot = (len(values) - 1) * percent
400 floor = math.floor(pivot)
401 ceil = math.ceil(pivot)
402 if floor == ceil:
403 return values[int(pivot)]
404 part1 = values[int(floor)] * (ceil - pivot)
405 part2 = values[int(ceil)] * (pivot - floor)
406 return part1 + part2
407
408
409def _parse_metric_value(value: typing.Any) -> MetricValue:
410 if isinstance(value, dict):
411 return Histogram(
412 bounds=value['bounds'], buckets=value['buckets'], inf=value['inf'],
413 )
414 elif isinstance(value, float):
415 return value
416 elif isinstance(value, int):
417 return value
418 else:
419 raise Exception(f'Failed to parse metric value from {value!r}')
420
421
422_FlattenedSnapshot = typing.Set[typing.Tuple[str, Metric]]
423
424
425def _flatten_snapshot(values, ignore_zeros: bool) -> _FlattenedSnapshot:
426 return set(
427 (path, metric)
428 for path, metrics in values.items()
429 for metric in metrics
430 if metric.value != 0 or not ignore_zeros
431 )
432
433
434def _diff_metric_snapshots(
435 lhs: _FlattenedSnapshot, rhs: _FlattenedSnapshot, ignore_zeros: bool,
436) -> str:
437 def extra_metrics_message(extra, base):
438 return [
439 f' path={path!r} labels={metric.labels!r} value={metric.value}'
440 for path, metric in sorted(extra, key=lambda pair: pair[0])
441 if (path, metric) not in base
442 ]
443
444 if ignore_zeros:
445 lines = ['left.assert_equals(right, ignore_zeros=True) failed']
446 else:
447 lines = ['left.assert_equals(right) failed']
448 actual_extra = extra_metrics_message(lhs, rhs)
449 if actual_extra:
450 lines.append(' extra in left:')
451 lines += actual_extra
452
453 actual_gt = extra_metrics_message(rhs, lhs)
454 if actual_gt:
455 lines.append(' missing in left:')
456 lines += actual_gt
457
458 return '\n'.join(lines)