2Python module that provides helpers for functional testing of metrics with 
    4@ref scripts/docs/en/userver/functional_testing.md for an introduction. 
    6@ingroup userver_testsuite 
   19class MetricType(str, enum.Enum):
 
   21    The type of individual metric. 
   23    `UNSPECIFIED` compares equal to all `MetricType`s. 
   24    To disable this behavior, use `is` for comparisons. 
   27    UNSPECIFIED = 
'UNSPECIFIED' 
   30    HIST_RATE = 
'HIST_RATE' 
   37    Represents the value of a HIST_RATE (a.k.a. Histogram) metric. 
   40    @snippet testsuite/tests/test_metrics.py  histogram 
   42    Normally obtained from MetricsSnapshot 
   45    bounds: typing.List[float]
 
   46    buckets: typing.List[int]
 
   49    def count(self) -> int:
 
   52    def percentile(self, percent: float) -> float:
 
   53        return _do_compute_percentile(self, percent)
 
   56    def __post_init__(self):
 
   61            assert self.
bounds[-1] != math.inf
 
 
   66MetricValue = typing.Union[float, Histogram]
 
   69@dataclasses.dataclass(frozen=True) 
   72    Metric type that contains the `labels: typing.Dict[str, str]` and 
   75    The type is hashable and comparable: 
   76    @snippet testsuite/tests/test_metrics.py  values set 
   78    @ingroup userver_testsuite 
   81    labels: typing.Dict[str, str]
 
   86    _type: MetricType = MetricType.UNSPECIFIED
 
   89    def __eq__(self, other: typing.Any) -> bool:
 
   90        if not isinstance(other, Metric):
 
   95            and _type_eq(self.
_type, other._type)
 
   98    def __hash__(self) -> int:
 
  102    def __post_init__(self):
 
  105                self.
_type == MetricType.HIST_RATE
 
  106                or self.
_type == MetricType.UNSPECIFIED
 
  109            assert self.
_type is not MetricType.HIST_RATE
 
  112    def type(self) -> MetricType:
 
 
  119    def default(self, o):  
 
  120        if isinstance(o, Metric):
 
  121            result = {
'labels': o.labels, 
'value': o.value}
 
  122            if o.type() 
is not MetricType.UNSPECIFIED:
 
  123                result[
'type'] = o.type()
 
  125        elif isinstance(o, Histogram):
 
  126            return dataclasses.asdict(o)
 
  127        if isinstance(o, set):
 
  129        return super().default(o)
 
 
  134    Snapshot of captured metrics that mimics the dict interface. Metrics have 
  135    the 'Dict[str(path), Set[Metric]]' format. 
  137    @snippet samples/testsuite-support/tests/test_metrics.py metrics labels 
  139    @ingroup userver_testsuite 
  142    def __init__(self, values: typing.Mapping[str, typing.Set[Metric]]):
 
  146        """ Returns a list of metrics by specified path """ 
 
  150        """ Returns count of metrics paths """ 
 
  154        """ Returns a (path, list) iterable over the metrics """ 
 
  159        Returns True if metric with specified path is in the snapshot, 
 
  166        Compares the snapshot with a dict of metrics or with 
 
  171    def __repr__(self) -> str:
 
  174    def __str__(self) -> str:
 
  177    def get(self, path: str, default=
None):
 
  179        Returns an list of metrics by path or default if there's no 
 
  185        """ Returns a (path, list) iterable over the metrics """ 
 
  189        """ Returns an iterable over paths of metrics """ 
 
  193        """ Returns an iterable over lists of metrics """ 
 
  199            labels: typing.Optional[typing.Dict] = 
None,
 
  201            default: typing.Optional[MetricValue] = 
None,
 
  204        Returns a single metric value at specified path. If a dict of labels 
  205        is provided, does en exact match of labels (i.e. {} stands for no 
  206        labels; {'a': 'b', 'c': 'd'} matches only {'a': 'b', 'c': 'd'} or 
  207        {'c': 'd', 'a': 'b'} but neither match {'a': 'b'} nor 
  208        {'a': 'b', 'c': 'd', 'e': 'f'}). 
  210        @throws AssertionError if not one metric by path 
  212        entry = self.
get(path, set())
 
  214            entry 
or default 
is not None 
  215        ), f
'No metrics found by path "{path}"' 
  217        if labels 
is not None:
 
  218            entry = {x 
for x 
in entry 
if x.labels == labels}
 
  220                entry 
or default 
is not None 
  221            ), f
'No metrics found by path "{path}" and labels {labels}' 
  222            assert len(entry) <= 1, (
 
  223                f
'Multiple metrics found by path "{path}" and labels {labels}:' 
  229            ), f
'Multiple metrics found by path "{path}": {entry}' 
  231        if default 
is not None and not entry:
 
  233        return next(iter(entry)).value
 
 
  238            require_labels: typing.Optional[typing.Dict] = 
None,
 
  239    ) -> typing.List[Metric]:
 
  241        Metrics path must exactly equal the given `path`. 
  242        A required subset of labels is specified by `require_labels` 
  244        require_labels={'a':'b', 'c':'d'} 
  245        { 'a':'b', 'c':'d'} - exact match 
  246        { 'a':'b', 'c':'d', 'e': 'f', 'h':'k'} - match 
  247        { 'a':'x', 'c':'d'} - no match, incorrect value for label 'a' 
  248        { 'a' : 'b'} - required label not found 
  251        for m in metrics_with_labels(path='something.something.sensor', 
  252          require_labels={ 'label1': 'value1' }): 
  256        entry = self.
get(path, set())
 
  258        def _is_labels_subset(require_labels, target_labels) -> bool:
 
  259            for req_key, req_val 
in require_labels.items():
 
  260                if target_labels.get(req_key, 
None) != req_val:
 
  265        if require_labels 
is not None:
 
  268                    lambda x: _is_labels_subset(
 
  269                        require_labels=require_labels, target_labels=x.labels,
 
 
  280            require_labels: typing.Optional[typing.Dict] = 
None,
 
  284        return bool(self.
metrics_at(path, require_labels))
 
  288            other: typing.Mapping[str, typing.Set[Metric]],
 
  290            ignore_zeros: bool = 
False,
 
  293        Compares the snapshot with a dict of metrics or with 
  294        another snapshot, displaying a nice diff on mismatch 
  296        lhs = _flatten_snapshot(self, ignore_zeros=ignore_zeros)
 
  297        rhs = _flatten_snapshot(other, ignore_zeros=ignore_zeros)
 
  298        assert lhs == rhs, _diff_metric_snapshots(lhs, rhs, ignore_zeros)
 
 
  302        Multiline linear print: 
  303          path:  (label=value),(label=value) TYPE VALUE 
  304          path:  (label=value),(label=value) TYPE VALUE 
  307         assert 'some.thing.sensor' in metric, metric.pretty_print() 
  311        def _iterate_over_mset(path, mset):
 
  312            """ print (pretty) one metrics set - for given path """ 
  316                    '{}: {} {} {}'.format(
 
  321                                '({}={})'.format(k, v)
 
  333        data_for_every_path = [
 
  334            _iterate_over_mset(path, mset)
 
  340        return '\n'.join(itertools.chain(*data_for_every_path))
 
 
  345        Construct MetricsSnapshot from a JSON string 
  350                    labels=element[
'labels'],
 
  351                    value=_parse_metric_value(element[
'value']),
 
  352                    _type=MetricType[element.get(
'type', 
'UNSPECIFIED')],
 
  354                for element 
in metrics_list
 
  356            for path, metrics_list 
in json.loads(json_str).
items()
 
 
  362        Serialize to a JSON string 
  367                path: random.sample(list(metrics), len(metrics))
 
  370            cls=_MetricsJSONEncoder,
 
 
 
  374def _type_eq(lhs: MetricType, rhs: MetricType) -> bool:
 
  377        or lhs == MetricType.UNSPECIFIED
 
  378        or rhs == MetricType.UNSPECIFIED
 
  383    """ Returns labels as a tuple of sorted items """ 
  384    return tuple(sorted(metric.labels.items()))
 
 
  387def _do_compute_percentile(hist: Histogram, percent: float) -> float:
 
  391        [bound] * bucket 
for (bucket, bound) 
in zip(hist.buckets, hist.bounds)
 
  392    ] + [[math.inf] * hist.inf]
 
  393    values = [item 
for sublist 
in value_lists 
for item 
in sublist]
 
  399    pivot = (len(values) - 1) * percent
 
  400    floor = math.floor(pivot)
 
  401    ceil = math.ceil(pivot)
 
  403        return values[int(pivot)]
 
  404    part1 = values[int(floor)] * (ceil - pivot)
 
  405    part2 = values[int(ceil)] * (pivot - floor)
 
  409def _parse_metric_value(value: typing.Any) -> MetricValue:
 
  410    if isinstance(value, dict):
 
  412            bounds=value[
'bounds'], buckets=value[
'buckets'], inf=value[
'inf'],
 
  414    elif isinstance(value, float):
 
  416    elif isinstance(value, int):
 
  419        raise Exception(f
'Failed to parse metric value from {value!r}')
 
  422_FlattenedSnapshot = typing.Set[typing.Tuple[str, Metric]]
 
  425def _flatten_snapshot(values, ignore_zeros: bool) -> _FlattenedSnapshot:
 
  428        for path, metrics 
in values.items()
 
  429        for metric 
in metrics
 
  430        if metric.value != 0 
or not ignore_zeros
 
  434def _diff_metric_snapshots(
 
  435        lhs: _FlattenedSnapshot, rhs: _FlattenedSnapshot, ignore_zeros: bool,
 
  437    def extra_metrics_message(extra, base):
 
  439            f
'    path={path!r} labels={metric.labels!r} value={metric.value}' 
  440            for path, metric 
in sorted(extra, key=
lambda pair: pair[0])
 
  441            if (path, metric) 
not in base
 
  445        lines = [
'left.assert_equals(right, ignore_zeros=True) failed']
 
  447        lines = [
'left.assert_equals(right) failed']
 
  448    actual_extra = extra_metrics_message(lhs, rhs)
 
  450        lines.append(
'  extra in left:')
 
  451        lines += actual_extra
 
  453    actual_gt = extra_metrics_message(rhs, lhs)
 
  455        lines.append(
'  missing in left:')
 
  458    return '\n'.join(lines)