97    Base asyncio userver client that implements HTTP requests to service. 
   99    Compatible with werkzeug interface. 
  101    @ingroup userver_testsuite 
  104    def __init__(self, client):
 
  111            json: annotations.JsonAnyOptional = 
None,
 
  112            data: typing.Any = 
None,
 
  113            params: typing.Optional[typing.Dict[str, str]] = 
None,
 
  114            bearer: typing.Optional[str] = 
None,
 
  115            x_real_ip: typing.Optional[str] = 
None,
 
  116            headers: typing.Optional[typing.Dict[str, str]] = 
None,
 
  118    ) -> http.ClientResponse:
 
  120        Make a HTTP POST request 
 
  138            json: annotations.JsonAnyOptional = 
None,
 
  139            data: typing.Any = 
None,
 
  140            params: typing.Optional[typing.Dict[str, str]] = 
None,
 
  141            bearer: typing.Optional[str] = 
None,
 
  142            x_real_ip: typing.Optional[str] = 
None,
 
  143            headers: typing.Optional[typing.Dict[str, str]] = 
None,
 
  145    ) -> http.ClientResponse:
 
  147        Make a HTTP PUT request 
 
  165            json: annotations.JsonAnyOptional = 
None,
 
  166            data: typing.Any = 
None,
 
  167            params: typing.Optional[typing.Dict[str, str]] = 
None,
 
  168            bearer: typing.Optional[str] = 
None,
 
  169            x_real_ip: typing.Optional[str] = 
None,
 
  170            headers: typing.Optional[typing.Dict[str, str]] = 
None,
 
  172    ) -> http.ClientResponse:
 
  174        Make a HTTP PATCH request 
 
  191            headers: typing.Optional[typing.Dict[str, str]] = 
None,
 
  192            bearer: typing.Optional[str] = 
None,
 
  193            x_real_ip: typing.Optional[str] = 
None,
 
  195    ) -> http.ClientResponse:
 
  197        Make a HTTP GET request 
 
  211            headers: typing.Optional[typing.Dict[str, str]] = 
None,
 
  212            bearer: typing.Optional[str] = 
None,
 
  213            x_real_ip: typing.Optional[str] = 
None,
 
  215    ) -> http.ClientResponse:
 
  217        Make a HTTP DELETE request 
 
  231            headers: typing.Optional[typing.Dict[str, str]] = 
None,
 
  232            bearer: typing.Optional[str] = 
None,
 
  233            x_real_ip: typing.Optional[str] = 
None,
 
  235    ) -> http.ClientResponse:
 
  237        Make a HTTP OPTIONS request 
 
  249            self, http_method: str, path: str, **kwargs,
 
  250    ) -> http.ClientResponse:
 
  252        Make a HTTP request with the specified method 
  254        response = await self.
_client.
request(http_method, path, **kwargs)
 
 
  257    def _wrap_client_response(
 
  258            self, response: aiohttp.ClientResponse,
 
  259    ) -> typing.Awaitable[http.ClientResponse]:
 
  260        return http.wrap_client_response(response)
 
  266def _wrap_client_error(func):
 
  267    async def _wrapper(*args, **kwargs):
 
  269            return await func(*args, **kwargs)
 
  270        except aiohttp.client_exceptions.ClientResponseError 
as exc:
 
  271            raise http.HttpResponseError(
 
  272                url=exc.request_info.url, status=exc.status,
 
  278class AiohttpClientMonitor(service_client.AiohttpClient):
 
  279    _config: TestsuiteClientConfig
 
  281    def __init__(self, base_url, *, config: TestsuiteClientConfig, **kwargs):
 
  282        super().__init__(base_url, **kwargs)
 
  283        self._config = config
 
  285    async def get_metrics(self, prefix=None):
 
  286        if not self._config.server_monitor_path:
 
  288                'handler-server-monitor component is not configured',
 
  290        params = {
'format': 
'internal'}
 
  291        if prefix 
is not None:
 
  292            params[
'prefix'] = prefix
 
  293        response = await self.
get(
 
  294            self._config.server_monitor_path, params=params,
 
  297            response.raise_for_status()
 
  298            return await response.json(content_type=
None)
 
  300    async def get_metric(self, metric_name):
 
  301        metrics = await self.get_metrics(metric_name)
 
  302        assert metric_name 
in metrics, (
 
  303            f
'No metric with name {metric_name!r}. ' 
  304            f
'Use "single_metric" function instead of "get_metric"' 
  306        return metrics[metric_name]
 
  308    async def metrics_raw(
 
  314            labels: typing.Optional[typing.Dict[str, str]] = 
None,
 
  316        if not self._config.server_monitor_path:
 
  318                'handler-server-monitor component is not configured',
 
  321        params = {
'format': output_format}
 
  323            params[
'prefix'] = prefix
 
  326            params[
'path'] = path
 
  329            params[
'labels'] = json.dumps(labels)
 
  331        response = await self.
get(
 
  332            self._config.server_monitor_path, params=params,
 
  335            response.raise_for_status()
 
  336            return await response.text()
 
  343            labels: typing.Optional[typing.Dict[str, str]] = 
None,
 
  344    ) -> metric_module.MetricsSnapshot:
 
  345        response = await self.metrics_raw(
 
  346            output_format=
'json', path=path, prefix=prefix, labels=labels,
 
  348        return metric_module.MetricsSnapshot.from_json(str(response))
 
  350    async def single_metric_optional(
 
  354            labels: typing.Optional[typing.Dict[str, str]] = 
None,
 
  355    ) -> typing.Optional[Metric]:
 
  356        response = await self.metrics(path=path, labels=labels)
 
  357        metrics_list = response.get(path, [])
 
  359        assert len(metrics_list) <= 1, (
 
  360            f
'More than one metric found for path {path} and labels {labels}: ' 
  367        return next(iter(metrics_list))
 
  369    async def single_metric(
 
  373            labels: typing.Optional[typing.Dict[str, str]] = 
None,
 
  375        value = await self.single_metric_optional(path, labels=labels)
 
  376        assert value 
is not None, (
 
  377            f
'No metric was found for path {path} and labels {labels}',
 
 
  533    A helper class for computing metric differences. 
  535    @see ClientMonitor.metrics_diff 
  536    @ingroup userver_testsuite 
  542            _client: ClientMonitor,
 
  543            _path: typing.Optional[str],
 
  544            _prefix: typing.Optional[str],
 
  545            _labels: typing.Optional[typing.Dict[str, str]],
 
  548        self._client = _client
 
  550        self._prefix = _prefix
 
  551        self._labels = _labels
 
  553        self.
_baseline: typing.Optional[metric_module.MetricsSnapshot] = 
None 
  554        self.
_current: typing.Optional[metric_module.MetricsSnapshot] = 
None 
  555        self.
_diff: typing.Optional[metric_module.MetricsSnapshot] = 
None 
  560    def baseline(self) -> metric_module.MetricsSnapshot:
 
  565    def baseline(self, value: metric_module.MetricsSnapshot) -> 
None:
 
  568            self.
_diff = _subtract_metrics_snapshots(
 
  573    def current(self) -> metric_module.MetricsSnapshot:
 
  574        assert self.
_current is not None, 
'Set self.current first' 
  578    def current(self, value: metric_module.MetricsSnapshot) -> 
None:
 
  580        assert self.
_baseline is not None, 
'Set self.baseline first' 
  581        self.
_diff = _subtract_metrics_snapshots(
 
  586    def diff(self) -> metric_module.MetricsSnapshot:
 
  587        assert self.
_diff is not None, 
'Set self.current first' 
  592            subpath: typing.Optional[str] = 
None,
 
  593            add_labels: typing.Optional[typing.Dict] = 
None,
 
  595            default: typing.Optional[float] = 
None,
 
  596    ) -> metric_module.MetricValue:
 
  598        Returns a single metric value at the specified path, prepending 
  599        the path provided at construction. If a dict of labels is provided, 
  600        does en exact match of labels, prepending the labels provided 
  603        @param subpath Suffix of the metric path; the path provided 
  604        at construction is prepended 
  605        @param add_labels Labels that the metric must have in addition 
  606        to the labels provided at construction 
  607        @param default An optional default value in case the metric is missing 
  608        @throws AssertionError if not one metric by path 
  610        base_path = self._path 
or self._prefix
 
  611        if base_path 
and subpath:
 
  612            path = f
'{base_path}.{subpath}' 
  614            assert base_path 
or subpath, 
'No path provided' 
  615            path = base_path 
or subpath 
or '' 
  616        labels: typing.Optional[dict] = 
None 
  617        if self._labels 
is not None or add_labels 
is not None:
 
  618            labels = {**(self._labels 
or {}), **(add_labels 
or {})}
 
  619        return self.
diff.
value_at(path, labels, default=default)
 
 
  621    async def fetch(self) -> metric_module.MetricsSnapshot:
 
  623        Fetches metric values from the service. 
  625        return await self._client.metrics(
 
  626            path=self._path, prefix=self._prefix, labels=self._labels,
 
 
  629    async def __aenter__(self) -> 'MetricsDiffer':
 
  634    async def __aexit__(self, exc_type, exc, exc_tb) -> None:
 
  641def _subtract_metrics_snapshots(
 
  642        current: metric_module.MetricsSnapshot,
 
  643        initial: metric_module.MetricsSnapshot,
 
  645) -> metric_module.MetricsSnapshot:
 
  646    return metric_module.MetricsSnapshot(
 
  649                _subtract_metrics(path, current_metric, initial, diff_gauge)
 
  650                for current_metric 
in current_group
 
  652            for path, current_group 
in current.items()
 
  657def _subtract_metrics(
 
  659        current_metric: metric_module.Metric,
 
  660        initial: metric_module.MetricsSnapshot,
 
  662) -> metric_module.Metric:
 
  663    initial_group = initial.get(path, 
None)
 
  664    if initial_group 
is None:
 
  665        return current_metric
 
  666    initial_metric = next(
 
  667        (x 
for x 
in initial_group 
if x.labels == current_metric.labels), 
None,
 
  669    if initial_metric 
is None:
 
  670        return current_metric
 
  672    return metric_module.Metric(
 
  673        labels=current_metric.labels,
 
  674        value=_subtract_metric_values(
 
  675            current=current_metric,
 
  676            initial=initial_metric,
 
  677            diff_gauge=diff_gauge,
 
  679        _type=current_metric.type(),
 
  683def _subtract_metric_values(
 
  684        current: metric_module.Metric,
 
  685        initial: metric_module.Metric,
 
  687) -> metric_module.MetricValue:
 
  688    assert current.type() 
is not metric_module.MetricType.UNSPECIFIED
 
  689    assert initial.type() 
is not metric_module.MetricType.UNSPECIFIED
 
  690    assert current.type() == initial.type()
 
  692    if isinstance(current.value, metric_module.Histogram):
 
  693        assert isinstance(initial.value, metric_module.Histogram)
 
  694        return _subtract_metric_values_hist(current=current, initial=initial)
 
  696        assert not isinstance(initial.value, metric_module.Histogram)
 
  697        return _subtract_metric_values_num(
 
  698            current=current, initial=initial, diff_gauge=diff_gauge,
 
  702def _subtract_metric_values_num(
 
  703        current: metric_module.Metric,
 
  704        initial: metric_module.Metric,
 
  707    current_value = typing.cast(float, current.value)
 
  708    initial_value = typing.cast(float, initial.value)
 
  710        current.type() 
is metric_module.MetricType.RATE
 
  711        or initial.type() 
is metric_module.MetricType.RATE
 
  714    return current_value - initial_value 
if should_diff 
else current_value
 
  717def _subtract_metric_values_hist(
 
  718        current: metric_module.Metric, initial: metric_module.Metric,
 
  719) -> metric_module.Histogram:
 
  720    current_value = typing.cast(metric_module.Histogram, current.value)
 
  721    initial_value = typing.cast(metric_module.Histogram, initial.value)
 
  722    assert current_value.bounds == initial_value.bounds
 
  723    return metric_module.Histogram(
 
  724        bounds=current_value.bounds,
 
  727            for t 
in zip(current_value.buckets, initial_value.buckets)
 
  729        inf=current_value.inf - initial_value.inf,
 
  733class AiohttpClient(service_client.AiohttpClient):
 
  734    PeriodicTaskFailed = PeriodicTaskFailed
 
  735    TestsuiteActionFailed = TestsuiteActionFailed
 
  736    TestsuiteTaskNotFound = TestsuiteTaskNotFound
 
  737    TestsuiteTaskConflict = TestsuiteTaskConflict
 
  738    TestsuiteTaskFailed = TestsuiteTaskFailed
 
  744            config: TestsuiteClientConfig,
 
  749            cache_invalidation_state,
 
  751            api_coverage_report=
None,
 
  752            periodic_tasks_state: typing.Optional[PeriodicTasksState] = 
None,
 
  753            allow_all_caches_invalidation: bool = 
True,
 
  754            cache_control: typing.Optional[caches.CacheControl] = 
None,
 
  757        super().__init__(base_url, span_id_header=span_id_header, **kwargs)
 
  758        self._config = config
 
  759        self._periodic_tasks = periodic_tasks_state
 
  760        self._testpoint = testpoint
 
  761        self._log_capture_fixture = log_capture_fixture
 
  763            mocked_time=mocked_time,
 
  764            testpoint=self._testpoint,
 
  765            testpoint_control=testpoint_control,
 
  766            invalidation_state=cache_invalidation_state,
 
  767            cache_control=cache_control,
 
  769        self._api_coverage_report = api_coverage_report
 
  770        self._allow_all_caches_invalidation = allow_all_caches_invalidation
 
  772    async def run_periodic_task(self, name):
 
  773        response = await self._testsuite_action(
'run_periodic_task', name=name)
 
  774        if not response[
'status']:
 
  775            raise self.PeriodicTaskFailed(f
'Periodic task {name} failed')
 
  777    async def suspend_periodic_tasks(self, names: typing.List[str]) -> 
None:
 
  778        if not self._periodic_tasks:
 
  780        self._periodic_tasks.tasks_to_suspend.update(names)
 
  781        await self._suspend_periodic_tasks()
 
  783    async def resume_periodic_tasks(self, names: typing.List[str]) -> 
None:
 
  784        if not self._periodic_tasks:
 
  786        self._periodic_tasks.tasks_to_suspend.difference_update(names)
 
  787        await self._suspend_periodic_tasks()
 
  789    async def resume_all_periodic_tasks(self) -> None:
 
  790        if not self._periodic_tasks:
 
  792        self._periodic_tasks.tasks_to_suspend.clear()
 
  793        await self._suspend_periodic_tasks()
 
  795    async def write_cache_dumps(
 
  796            self, names: typing.List[str], *, testsuite_skip_prepare=
False,
 
  798        await self._testsuite_action(
 
  801            testsuite_skip_prepare=testsuite_skip_prepare,
 
  804    async def read_cache_dumps(
 
  805            self, names: typing.List[str], *, testsuite_skip_prepare=
False,
 
  807        await self._testsuite_action(
 
  810            testsuite_skip_prepare=testsuite_skip_prepare,
 
  813    async def run_distlock_task(self, name: str) -> 
None:
 
  814        await self.run_task(f
'distlock/{name}')
 
  816    async def reset_metrics(self) -> None:
 
  817        await self._testsuite_action(
'reset_metrics')
 
  819    async def metrics_portability(
 
  820            self, *, prefix: typing.Optional[str] = 
None,
 
  821    ) -> typing.Dict[str, typing.List[typing.Dict[str, str]]]:
 
  822        return await self._testsuite_action(
 
  823            'metrics_portability', prefix=prefix,
 
  826    async def list_tasks(self) -> typing.List[str]:
 
  827        response = await self._do_testsuite_action(
'tasks_list')
 
  829            response.raise_for_status()
 
  830            body = await response.json(content_type=
None)
 
  833    async def run_task(self, name: str) -> 
None:
 
  834        response = await self._do_testsuite_action(
 
  835            'task_run', json={
'name': name},
 
  837        await _task_check_response(name, response)
 
  839    @contextlib.asynccontextmanager 
  840    async def spawn_task(self, name: str):
 
  841        task_id = await self._task_spawn(name)
 
  845            await self._task_stop_spawned(task_id)
 
  847    async def _task_spawn(self, name: str) -> str:
 
  848        response = await self._do_testsuite_action(
 
  849            'task_spawn', json={
'name': name},
 
  851        data = await _task_check_response(name, response)
 
  852        return data[
'task_id']
 
  854    async def _task_stop_spawned(self, task_id: str) -> 
None:
 
  855        response = await self._do_testsuite_action(
 
  856            'task_stop', json={
'task_id': task_id},
 
  858        await _task_check_response(task_id, response)
 
  860    async def http_allowed_urls_extra(
 
  861            self, http_allowed_urls_extra: typing.List[str],
 
  863        await self._do_testsuite_action(
 
  864            'http_allowed_urls_extra',
 
  865            json={
'allowed_urls_extra': http_allowed_urls_extra},
 
  866            testsuite_skip_prepare=
True,
 
  869    @contextlib.asynccontextmanager 
  870    async def capture_logs(
 
  873            log_level: str = 
'DEBUG',
 
  874            testsuite_skip_prepare: bool = 
False,
 
  876        async with self._log_capture_fixture.start_capture(
 
  879            await self._testsuite_action(
 
  882                socket_logging_duplication=
True,
 
  883                testsuite_skip_prepare=testsuite_skip_prepare,
 
  888                await self._testsuite_action(
 
  890                    log_level=self._log_capture_fixture.default_log_level,
 
  891                    socket_logging_duplication=
False,
 
  892                    testsuite_skip_prepare=testsuite_skip_prepare,
 
  895    async def log_flush(self, logger_name: typing.Optional[str] = 
None):
 
  896        await self._testsuite_action(
 
  897            'log_flush', logger_name=logger_name, testsuite_skip_prepare=
True,
 
  900    async def invalidate_caches(
 
  903            clean_update: bool = 
True,
 
  904            cache_names: typing.Optional[typing.List[str]] = 
None,
 
  905            testsuite_skip_prepare: bool = 
False,
 
  907        if cache_names 
is None and clean_update:
 
  908            if self._allow_all_caches_invalidation:
 
  909                warnings.warn(CACHE_INVALIDATION_MESSAGE, DeprecationWarning)
 
  911                __tracebackhide__ = 
True 
  912                raise RuntimeError(CACHE_INVALIDATION_MESSAGE)
 
  914        if testsuite_skip_prepare:
 
  915            await self._tests_control(
 
  917                    'invalidate_caches': {
 
  919                            'full' if clean_update 
else 'incremental' 
  921                        **({
'names': cache_names} 
if cache_names 
else {}),
 
  926            await self.tests_control(
 
  927                invalidate_caches=
True,
 
  928                clean_update=clean_update,
 
  929                cache_names=cache_names,
 
  932    async def tests_control(
 
  935            invalidate_caches: bool = 
True,
 
  936            clean_update: bool = 
True,
 
  937            cache_names: typing.Optional[typing.List[str]] = 
None,
 
  938            http_allowed_urls_extra=
None,
 
  939    ) -> typing.Dict[str, typing.Any]:
 
  942        ] = self._state_manager.get_pending_update()
 
  944        if 'invalidate_caches' in body 
and invalidate_caches:
 
  945            if not clean_update 
or cache_names:
 
  947                    'Manual cache invalidation leads to indirect initial ' 
  948                    'full cache invalidation',
 
  950                await self._prepare()
 
  953        if invalidate_caches:
 
  954            body[
'invalidate_caches'] = {
 
  955                'update_type': (
'full' if clean_update 
else 'incremental'),
 
  958                body[
'invalidate_caches'][
'names'] = cache_names
 
  960        if http_allowed_urls_extra 
is not None:
 
  961            await self.http_allowed_urls_extra(http_allowed_urls_extra)
 
  963        return await self._tests_control(body)
 
  965    async def update_server_state(self) -> None:
 
  966        await self._prepare()
 
  968    async def enable_testpoints(self, *, no_auto_cache_cleanup=False) -> None:
 
  969        if not self._testpoint:
 
  971        if no_auto_cache_cleanup:
 
  972            await self._tests_control(
 
  973                {
'testpoints': sorted(self._testpoint.keys())},
 
  976            await self.update_server_state()
 
  978    async def get_dynamic_config_defaults(
 
  980    ) -> typing.Dict[str, typing.Any]:
 
  981        return await self._testsuite_action(
 
  982            'get_dynamic_config_defaults', testsuite_skip_prepare=
True,
 
  985    async def _tests_control(self, body: dict) -> typing.Dict[str, typing.Any]:
 
  986        with self._state_manager.updating_state(body):
 
  987            async with await self._do_testsuite_action(
 
  988                    'control', json=body, testsuite_skip_prepare=
True,
 
  990                if response.status == 404:
 
  992                        'It seems that testsuite support is not enabled ' 
  995                response.raise_for_status()
 
  996                return await response.json(content_type=
None)
 
  998    async def _suspend_periodic_tasks(self):
 
 1000                self._periodic_tasks.tasks_to_suspend
 
 1001                != self._periodic_tasks.suspended_tasks
 
 1003            await self._testsuite_action(
 
 1004                'suspend_periodic_tasks',
 
 1005                names=sorted(self._periodic_tasks.tasks_to_suspend),
 
 1007            self._periodic_tasks.suspended_tasks = set(
 
 1008                self._periodic_tasks.tasks_to_suspend,
 
 1011    def _do_testsuite_action(self, action, **kwargs):
 
 1012        if not self._config.testsuite_action_path:
 
 1014                'tests-control component is not properly configured',
 
 1016        path = self._config.testsuite_action_path.format(action=action)
 
 1017        return self.post(path, **kwargs)
 
 1019    async def _testsuite_action(
 
 1020            self, action, *, testsuite_skip_prepare=False, **kwargs,
 
 1022        async with await self._do_testsuite_action(
 
 1025                testsuite_skip_prepare=testsuite_skip_prepare,
 
 1027            if response.status == 500:
 
 1028                raise TestsuiteActionFailed
 
 1029            response.raise_for_status()
 
 1030            return await response.json(content_type=
None)
 
 1032    async def _prepare(self) -> None:
 
 1033        with self._state_manager.cache_control_update() 
as pending_update:
 
 1035                await self._tests_control(pending_update)
 
 1041            headers: typing.Optional[typing.Dict[str, str]] = 
None,
 
 1042            bearer: typing.Optional[str] = 
None,
 
 1043            x_real_ip: typing.Optional[str] = 
None,
 
 1045            testsuite_skip_prepare: bool = 
False,
 
 1047    ) -> aiohttp.ClientResponse:
 
 1048        if not testsuite_skip_prepare:
 
 1049            await self._prepare()
 
 1051        response = await super()._request(
 
 1052            http_method, path, headers, bearer, x_real_ip, **kwargs,
 
 1054        if self._api_coverage_report:
 
 1055            self._api_coverage_report.update_usage_stat(
 
 1056                path, http_method, response.status, response.content_type,