userver: /data/code/userver/testsuite/pytest_plugins/pytest_userver/metrics.py Source File
Loading...
Searching...
No Matches
metrics.py
1"""
2Python module that provides helpers for functional testing of metrics with
3testsuite; see
4@ref scripts/docs/en/userver/functional_testing.md for an introduction.
5
6@ingroup userver_testsuite
7"""
8
9import dataclasses
10import enum
11import itertools
12import json
13import math
14import random
15import typing
16
17
18# @cond
19class MetricType(str, enum.Enum):
20 """
21 The type of individual metric.
22
23 `UNSPECIFIED` compares equal to all `MetricType`s.
24 To disable this behavior, use `is` for comparisons.
25 """
26
27 UNSPECIFIED = 'UNSPECIFIED'
28 GAUGE = 'GAUGE'
29 RATE = 'RATE'
30 HIST_RATE = 'HIST_RATE'
31 # @endcond
32
33
34@dataclasses.dataclass
36 """
37 Represents the value of a HIST_RATE (a.k.a. Histogram) metric.
38
39 Usage example:
40 @snippet testsuite/tests/test_metrics.py histogram
41
42 Normally obtained from MetricsSnapshot
43 """
44
45 bounds: typing.List[float]
46 buckets: typing.List[int]
47 inf: int
48
49 def count(self) -> int:
50 return sum(self.bucketsbuckets) + self.inf
51
52 def percentile(self, percent: float) -> float:
53 return _do_compute_percentile(self, percent)
54
55 # @cond
56 def __post_init__(self):
57 assert len(self.bounds) == len(self.bucketsbuckets)
58 assert sorted(self.bounds) == self.bounds
59 if self.bounds:
60 assert self.bounds[0] > 0
61 assert self.bounds[-1] != math.inf
62
63 # @endcond
64
65
66MetricValue = typing.Union[float, Histogram]
67
68
69@dataclasses.dataclass(frozen=True)
70class Metric:
71 """
72 Metric type that contains the `labels: typing.Dict[str, str]` and
73 `value: int`.
74
75 The type is hashable and comparable:
76 @snippet testsuite/tests/test_metrics.py values set
77
78 @ingroup userver_testsuite
79 """
80
81 labels: typing.Dict[str, str]
82 value: MetricValue
83
84 # @cond
85 # Should not be specified explicitly, for internal use only.
86 _type: MetricType = MetricType.UNSPECIFIED
87 # @endcond
88
89 def __eq__(self, other: typing.Any) -> bool:
90 if not isinstance(other, Metric):
91 return NotImplemented
92 return self.labelslabels == other.labels and self.value == other.value and _type_eq(self._type, other._type)
93
94 def __hash__(self) -> int:
95 return hash(_get_labels_tuple(self))
96
97 # @cond
98 def __post_init__(self):
99 if isinstance(self.value, Histogram):
100 assert self._type == MetricType.HIST_RATE or self._type == MetricType.UNSPECIFIED
101 else:
102 assert self._type is not MetricType.HIST_RATE
103
104 # For internal use only.
105 def type(self) -> MetricType:
106 return self._type
107
108 # @endcond
109
110
111class _MetricsJSONEncoder(json.JSONEncoder):
112 def default(self, o): # pylint: disable=method-hidden
113 if isinstance(o, Metric):
114 result = {'labels': o.labels, 'value': o.value}
115 if o.type() is not MetricType.UNSPECIFIED:
116 result['type'] = o.type()
117 return result
118 elif isinstance(o, Histogram):
119 return dataclasses.asdict(o)
120 if isinstance(o, set):
121 return list(o)
122 return super().default(o)
123
124
126 """
127 Snapshot of captured metrics that mimics the dict interface. Metrics have
128 the 'Dict[str(path), Set[Metric]]' format.
129
130 @snippet samples/testsuite-support/tests/test_metrics.py metrics labels
131
132 @ingroup userver_testsuite
133 """
134
135 def __init__(self, values: typing.Mapping[str, typing.Set[Metric]]):
136 self._values = values
137
138 def __getitem__(self, path: str) -> typing.Set[Metric]:
139 """Returns a list of metrics by specified path"""
140 return self._values[path]
141
142 def __len__(self) -> int:
143 """Returns count of metrics paths"""
144 return len(self._values)
145
146 def __iter__(self):
147 """Returns a (path, list) iterable over the metrics"""
148 return self._values.__iter__()
149
150 def __contains__(self, path: str) -> bool:
151 """
152 Returns True if metric with specified path is in the snapshot,
153 False otherwise.
154 """
155 return path in self._values
156
157 def __eq__(self, other: object) -> bool:
158 """
159 Compares the snapshot with a dict of metrics or with
160 another snapshot
161 """
162 return self._values == other
163
164 def __repr__(self) -> str:
165 return self._values.__repr__()
166
167 def __str__(self) -> str:
168 return self.pretty_print()
169
170 def get(self, path: str, default=None):
171 """
172 Returns an list of metrics by path or default if there's no
173 such path
174 """
175 return self._values.get(path, default)
176
177 def items(self):
178 """Returns a (path, list) iterable over the metrics"""
179 return self._values.items()
180
181 def keys(self):
182 """Returns an iterable over paths of metrics"""
183 return self._values.keys()
184
185 def values(self):
186 """Returns an iterable over lists of metrics"""
187 return self._values.values()
188
190 self,
191 path: str,
192 labels: typing.Optional[typing.Dict] = None,
193 *,
194 default: typing.Optional[MetricValue] = None,
195 ) -> MetricValue:
196 """
197 Returns a single metric value at specified path. If a dict of labels
198 is provided, does en exact match of labels (i.e. {} stands for no
199 labels; {'a': 'b', 'c': 'd'} matches only {'a': 'b', 'c': 'd'} or
200 {'c': 'd', 'a': 'b'} but neither match {'a': 'b'} nor
201 {'a': 'b', 'c': 'd', 'e': 'f'}).
202
203 @throws AssertionError if not one metric by path
204 """
205 entry = self.get(path, set())
206 assert entry or default is not None, f'No metrics found by path "{path}"'
207
208 if labels is not None:
209 entry = {x for x in entry if x.labels == labels}
210 assert entry or default is not None, f'No metrics found by path "{path}" and labels {labels}'
211 assert len(entry) <= 1, f'Multiple metrics found by path "{path}" and labels {labels}: {entry}'
212 else:
213 assert len(entry) <= 1, f'Multiple metrics found by path "{path}": {entry}'
214
215 if default is not None and not entry:
216 return default
217 return next(iter(entry)).value
218
220 self,
221 path: str,
222 require_labels: typing.Optional[typing.Dict] = None,
223 ) -> typing.List[Metric]:
224 """
225 Metrics path must exactly equal the given `path`.
226 A required subset of labels is specified by `require_labels`
227 Example:
228 require_labels={'a':'b', 'c':'d'}
229 { 'a':'b', 'c':'d'} - exact match
230 { 'a':'b', 'c':'d', 'e': 'f', 'h':'k'} - match
231 { 'a':'x', 'c':'d'} - no match, incorrect value for label 'a'
232 { 'a' : 'b'} - required label not found
233 Usage:
234 @code
235 for m in metrics_with_labels(path='something.something.sensor',
236 require_labels={ 'label1': 'value1' }):
237 assert m.value > 0
238 @endcode
239 """
240 entry = self.get(path, set())
241
242 def _is_labels_subset(require_labels, target_labels) -> bool:
243 for req_key, req_val in require_labels.items():
244 if target_labels.get(req_key, None) != req_val:
245 # required label is missing or its value is different
246 return False
247 return True
248
249 if require_labels is not None:
250 return list(
251 filter(
252 lambda x: _is_labels_subset(
253 require_labels=require_labels,
254 target_labels=x.labels,
255 ),
256 entry,
257 ),
258 )
259 else:
260 return list(entry)
261
262 def has_metrics_at(
263 self,
264 path: str,
265 require_labels: typing.Optional[typing.Dict] = None,
266 ) -> bool:
267 # metrics_with_labels returns list, and pythonic way to check if list
268 # is empty is like this:
269 return bool(self.metrics_at(path, require_labels))
270
272 self,
273 other: typing.Mapping[str, typing.Set[Metric]],
274 *,
275 ignore_zeros: bool = False,
276 ) -> None:
277 """
278 Compares the snapshot with a dict of metrics or with
279 another snapshot, displaying a nice diff on mismatch
280 """
281 lhs = _flatten_snapshot(self, ignore_zeros=ignore_zeros)
282 rhs = _flatten_snapshot(other, ignore_zeros=ignore_zeros)
283 assert lhs == rhs, _diff_metric_snapshots(lhs, rhs, ignore_zeros)
284
285 def pretty_print(self) -> str:
286 """
287 Multiline linear print:
288 path: (label=value),(label=value) TYPE VALUE
289 path: (label=value),(label=value) TYPE VALUE
290 Usage:
291 @code
292 assert 'some.thing.sensor' in metric, metric.pretty_print()
293 @endcode
294 """
295
296 def _iterate_over_mset(path, mset):
297 """print (pretty) one metrics set - for given path"""
298 result = []
299 for metric in sorted(mset, key=lambda x: _get_labels_tuple(x)):
300 result.append(
301 '{}: {} {} {}'.format(
302 path,
303 # labels in form (key=value)
304 ','.join(['({}={})'.format(k, v) for k, v in _get_labels_tuple(metric)]),
305 metric._type.value,
306 metric.value,
307 ),
308 )
309 return result
310
311 # list of lists [ [ string1, string2, string3],
312 # [string4, string5, string6] ]
313 data_for_every_path = [_iterate_over_mset(path, mset) for path, mset in self._values.items()]
314 # use itertools.chain to flatten list
315 # [ string1, string2, string3, string4, string5, string6 ]
316 # and join to convert it to one multiline string
317 return '\n'.join(itertools.chain(*data_for_every_path))
318
319 @staticmethod
320 def from_json(json_str: str) -> 'MetricsSnapshot':
321 """
322 Construct MetricsSnapshot from a JSON string
323 """
324 json_data = {
325 str(path): {
326 Metric(
327 labels=element['labels'],
328 value=_parse_metric_value(element['value']),
329 _type=MetricType[element.get('type', 'UNSPECIFIED')],
330 )
331 for element in metrics_list
332 }
333 for path, metrics_list in json.loads(json_str).items()
334 }
335 return MetricsSnapshot(json_data)
336
337 def to_json(self) -> str:
338 """
339 Serialize to a JSON string
340 """
341 return json.dumps(
342 # Shuffle to disallow depending on the received metrics order.
343 {path: random.sample(list(metrics), len(metrics)) for path, metrics in self._values.items()},
344 cls=_MetricsJSONEncoder,
345 )
346
347
348def _type_eq(lhs: MetricType, rhs: MetricType) -> bool:
349 return lhs == rhs or lhs == MetricType.UNSPECIFIED or rhs == MetricType.UNSPECIFIED
350
351
352def _get_labels_tuple(metric: Metric) -> typing.Tuple:
353 """Returns labels as a tuple of sorted items"""
354 return tuple(sorted(metric.labels.items()))
355
356
357def _do_compute_percentile(hist: Histogram, percent: float) -> float:
358 # This implementation is O(hist.count()), which is less than perfect.
359 # So far, this was not a big enough pain to rewrite it.
360 value_lists = [[bound] * bucket for (bucket, bound) in zip(hist.buckets, hist.bounds)] + [[math.inf] * hist.inf]
361 values = [item for sublist in value_lists for item in sublist]
362
363 # Implementation taken from:
364 # https://stackoverflow.com/a/2753343/5173839
365 if not values:
366 return 0
367 pivot = (len(values) - 1) * percent
368 floor = math.floor(pivot)
369 ceil = math.ceil(pivot)
370 if floor == ceil:
371 return values[int(pivot)]
372 part1 = values[int(floor)] * (ceil - pivot)
373 part2 = values[int(ceil)] * (pivot - floor)
374 return part1 + part2
375
376
377def _parse_metric_value(value: typing.Any) -> MetricValue:
378 if isinstance(value, dict):
379 return Histogram(
380 bounds=value['bounds'],
381 buckets=value['buckets'],
382 inf=value['inf'],
383 )
384 elif isinstance(value, float):
385 return value
386 elif isinstance(value, int):
387 return value
388 else:
389 raise Exception(f'Failed to parse metric value from {value!r}')
390
391
392_FlattenedSnapshot = typing.Set[typing.Tuple[str, Metric]]
393
394
395def _flatten_snapshot(values, ignore_zeros: bool) -> _FlattenedSnapshot:
396 return set(
397 (path, metric)
398 for path, metrics in values.items()
399 for metric in metrics
400 if metric.value != 0 or not ignore_zeros
401 )
402
403
404def _diff_metric_snapshots(
405 lhs: _FlattenedSnapshot,
406 rhs: _FlattenedSnapshot,
407 ignore_zeros: bool,
408) -> str:
409 def extra_metrics_message(extra, base):
410 return [
411 f' path={path!r} labels={metric.labels!r} value={metric.value}'
412 for path, metric in sorted(extra, key=lambda pair: pair[0])
413 if (path, metric) not in base
414 ]
415
416 if ignore_zeros:
417 lines = ['left.assert_equals(right, ignore_zeros=True) failed']
418 else:
419 lines = ['left.assert_equals(right) failed']
420 actual_extra = extra_metrics_message(lhs, rhs)
421 if actual_extra:
422 lines.append(' extra in left:')
423 lines += actual_extra
424
425 actual_gt = extra_metrics_message(rhs, lhs)
426 if actual_gt:
427 lines.append(' missing in left:')
428 lines += actual_gt
429
430 return '\n'.join(lines)