88    Base asyncio userver client that implements HTTP requests to service. 
   90    Compatible with werkzeug interface. 
   92    @ingroup userver_testsuite 
   95    def __init__(self, client):
 
  101            json: annotations.JsonAnyOptional = 
None,
 
  102            data: typing.Any = 
None,
 
  103            params: typing.Optional[typing.Dict[str, str]] = 
None,
 
  104            bearer: typing.Optional[str] = 
None,
 
  105            x_real_ip: typing.Optional[str] = 
None,
 
  106            headers: typing.Optional[typing.Dict[str, str]] = 
None,
 
  108    ) -> http.ClientResponse:
 
  110        Make a HTTP POST request 
 
  127            json: annotations.JsonAnyOptional = 
None,
 
  128            data: typing.Any = 
None,
 
  129            params: typing.Optional[typing.Dict[str, str]] = 
None,
 
  130            bearer: typing.Optional[str] = 
None,
 
  131            x_real_ip: typing.Optional[str] = 
None,
 
  132            headers: typing.Optional[typing.Dict[str, str]] = 
None,
 
  134    ) -> http.ClientResponse:
 
  136        Make a HTTP PUT request 
 
  153            json: annotations.JsonAnyOptional = 
None,
 
  154            data: typing.Any = 
None,
 
  155            params: typing.Optional[typing.Dict[str, str]] = 
None,
 
  156            bearer: typing.Optional[str] = 
None,
 
  157            x_real_ip: typing.Optional[str] = 
None,
 
  158            headers: typing.Optional[typing.Dict[str, str]] = 
None,
 
  160    ) -> http.ClientResponse:
 
  162        Make a HTTP PATCH request 
 
  179            headers: typing.Optional[typing.Dict[str, str]] = 
None,
 
  180            bearer: typing.Optional[str] = 
None,
 
  181            x_real_ip: typing.Optional[str] = 
None,
 
  183    ) -> http.ClientResponse:
 
  185        Make a HTTP GET request 
 
  199            headers: typing.Optional[typing.Dict[str, str]] = 
None,
 
  200            bearer: typing.Optional[str] = 
None,
 
  201            x_real_ip: typing.Optional[str] = 
None,
 
  203    ) -> http.ClientResponse:
 
  205        Make a HTTP DELETE request 
 
  219            headers: typing.Optional[typing.Dict[str, str]] = 
None,
 
  220            bearer: typing.Optional[str] = 
None,
 
  221            x_real_ip: typing.Optional[str] = 
None,
 
  223    ) -> http.ClientResponse:
 
  225        Make a HTTP OPTIONS request 
 
  237            self, http_method: str, path: str, **kwargs,
 
  238    ) -> http.ClientResponse:
 
  240        Make a HTTP request with the specified method 
  242        response = await self.
_client.
request(http_method, path, **kwargs)
 
 
  245    def _wrap_client_response(
 
  246            self, response: aiohttp.ClientResponse,
 
  247    ) -> typing.Awaitable[http.ClientResponse]:
 
  248        return http.wrap_client_response(response)
 
  254def _wrap_client_error(func):
 
  255    async def _wrapper(*args, **kwargs):
 
  257            return await func(*args, **kwargs)
 
  258        except aiohttp.client_exceptions.ClientResponseError 
as exc:
 
  259            raise http.HttpResponseError(
 
  260                url=exc.request_info.url, status=exc.status,
 
  266class AiohttpClientMonitor(service_client.AiohttpClient):
 
  267    _config: TestsuiteClientConfig
 
  269    def __init__(self, base_url, *, config: TestsuiteClientConfig, **kwargs):
 
  270        super().__init__(base_url, **kwargs)
 
  271        self._config = config
 
  273    async def get_metrics(self, prefix=None):
 
  274        if not self._config.server_monitor_path:
 
  276                'handler-server-monitor component is not configured',
 
  278        if prefix 
is not None:
 
  279            params = {
'prefix': prefix}
 
  282        response = await self.
get(
 
  283            self._config.server_monitor_path, params=params,
 
  286            response.raise_for_status()
 
  287            return await response.json(content_type=
None)
 
  289    async def get_metric(self, metric_name):
 
  290        metrics = await self.get_metrics(metric_name)
 
  291        assert metric_name 
in metrics, (
 
  292            f
'No metric with name {metric_name!r}. ' 
  293            f
'Use "single_metric" function instead of "get_metric"' 
  295        return metrics[metric_name]
 
  297    async def metrics_raw(
 
  303            labels: typing.Optional[typing.Dict[str, str]] = 
None,
 
  305        if not self._config.server_monitor_path:
 
  307                'handler-server-monitor component is not configured',
 
  310        params = {
'format': output_format}
 
  312            params[
'prefix'] = prefix
 
  315            params[
'path'] = path
 
  318            params[
'labels'] = json.dumps(labels)
 
  320        response = await self.
get(
 
  321            self._config.server_monitor_path, params=params,
 
  324            response.raise_for_status()
 
  325            return await response.text()
 
  332            labels: typing.Optional[typing.Dict[str, str]] = 
None,
 
  333    ) -> metric_module.MetricsSnapshot:
 
  334        response = await self.metrics_raw(
 
  335            output_format=
'json', path=path, prefix=prefix, labels=labels,
 
  337        return metric_module.MetricsSnapshot.from_json(str(response))
 
  339    async def single_metric_optional(
 
  343            labels: typing.Optional[typing.Dict[str, str]] = 
None,
 
  344    ) -> typing.Optional[Metric]:
 
  345        response = await self.metrics(path=path, labels=labels)
 
  346        metrics_list = response.get(path, [])
 
  348        assert len(metrics_list) <= 1, (
 
  349            f
'More than one metric found for path {path} and labels {labels}: ' 
  356        return next(iter(metrics_list))
 
  358    async def single_metric(
 
  362            labels: typing.Optional[typing.Dict[str, str]] = 
None,
 
  364        value = await self.single_metric_optional(path, labels=labels)
 
  365        assert value 
is not None, (
 
  366            f
'No metric was found for path {path} and labels {labels}',
 
 
  516    A helper class for computing metric differences. 
  518    @see ClientMonitor.metrics_diff 
  519    @ingroup userver_testsuite 
  525            _client: ClientMonitor,
 
  526            _path: typing.Optional[str],
 
  527            _prefix: typing.Optional[str],
 
  528            _labels: typing.Optional[typing.Dict[str, str]],
 
  531        self._client = _client
 
  533        self._prefix = _prefix
 
  534        self._labels = _labels
 
  536        self.
_baseline: typing.Optional[metric_module.MetricsSnapshot] = 
None 
  537        self.
_current: typing.Optional[metric_module.MetricsSnapshot] = 
None 
  538        self.
_diff: typing.Optional[metric_module.MetricsSnapshot] = 
None 
  543    def baseline(self) -> metric_module.MetricsSnapshot:
 
  548    def baseline(self, value: metric_module.MetricsSnapshot) -> 
None:
 
  551            self.
_diff = _subtract_metrics_snapshots(
 
  556    def current(self) -> metric_module.MetricsSnapshot:
 
  557        assert self.
_current is not None, 
'Set self.current first' 
  561    def current(self, value: metric_module.MetricsSnapshot) -> 
None:
 
  563        assert self.
_baseline is not None, 
'Set self.baseline first' 
  564        self.
_diff = _subtract_metrics_snapshots(
 
  569    def diff(self) -> metric_module.MetricsSnapshot:
 
  570        assert self.
_diff is not None, 
'Set self.current first' 
  575            subpath: typing.Optional[str] = 
None,
 
  576            add_labels: typing.Optional[typing.Dict] = 
None,
 
  578            default: typing.Optional[float] = 
None,
 
  581        Returns a single metric value at the specified path, prepending 
  582        the path provided at construction. If a dict of labels is provided, 
  583        does en exact match of labels, prepending the labels provided 
  586        @param subpath Suffix of the metric path; the path provided 
  587        at construction is prepended 
  588        @param add_labels Labels that the metric must have in addition 
  589        to the labels provided at construction 
  590        @param default An optional default value in case the metric is missing 
  591        @throws AssertionError if not one metric by path 
  593        base_path = self._path 
or self._prefix
 
  594        if base_path 
and subpath:
 
  595            path = f
'{base_path}.{subpath}' 
  597            assert base_path 
or subpath, 
'No path provided' 
  598            path = base_path 
or subpath 
or '' 
  599        labels: typing.Optional[dict] = 
None 
  600        if self._labels 
is not None or add_labels 
is not None:
 
  601            labels = {**(self._labels 
or {}), **(add_labels 
or {})}
 
  602        return self.
diff.
value_at(path, labels, default=default)
 
 
  604    async def fetch(self) -> metric_module.MetricsSnapshot:
 
  606        Fetches metric values from the service. 
  608        return await self._client.metrics(
 
  609            path=self._path, prefix=self._prefix, labels=self._labels,
 
 
  612    async def __aenter__(self) -> 'MetricsDiffer':
 
  617    async def __aexit__(self, exc_type, exc, tb) -> None:
 
  624def _subtract_metrics_snapshots(
 
  625        current: metric_module.MetricsSnapshot,
 
  626        initial: metric_module.MetricsSnapshot,
 
  628) -> metric_module.MetricsSnapshot:
 
  629    return metric_module.MetricsSnapshot(
 
  632                _subtract_metrics(path, current_metric, initial, diff_gauge)
 
  633                for current_metric 
in current_group
 
  635            for path, current_group 
in current.items()
 
  640def _subtract_metrics(
 
  642        current_metric: metric_module.Metric,
 
  643        initial: metric_module.MetricsSnapshot,
 
  645) -> metric_module.Metric:
 
  646    assert diff_gauge, 
'diff_gauge=False is unimplemented' 
  648    initial_group = initial.get(path, 
None)
 
  649    if initial_group 
is None:
 
  650        return current_metric
 
  651    initial_metric = next(
 
  652        (x 
for x 
in initial_group 
if x.labels == current_metric.labels), 
None,
 
  654    if initial_metric 
is None:
 
  655        return current_metric
 
  657    return metric_module.Metric(
 
  658        labels=current_metric.labels,
 
  659        value=current_metric.value - initial_metric.value,
 
  663class AiohttpClient(service_client.AiohttpClient):
 
  664    PeriodicTaskFailed = PeriodicTaskFailed
 
  665    TestsuiteActionFailed = TestsuiteActionFailed
 
  666    TestsuiteTaskNotFound = TestsuiteTaskNotFound
 
  667    TestsuiteTaskConflict = TestsuiteTaskConflict
 
  668    TestsuiteTaskFailed = TestsuiteTaskFailed
 
  674            config: TestsuiteClientConfig,
 
  679            cache_invalidation_state,
 
  681            api_coverage_report=
None,
 
  682            periodic_tasks_state: typing.Optional[PeriodicTasksState] = 
None,
 
  685        super().__init__(base_url, span_id_header=span_id_header, **kwargs)
 
  686        self._config = config
 
  687        self._periodic_tasks = periodic_tasks_state
 
  688        self._testpoint = testpoint
 
  689        self._log_capture_fixture = log_capture_fixture
 
  691            mocked_time=mocked_time,
 
  692            testpoint=self._testpoint,
 
  693            testpoint_control=testpoint_control,
 
  694            invalidation_state=cache_invalidation_state,
 
  696        self._api_coverage_report = api_coverage_report
 
  698    async def run_periodic_task(self, name):
 
  699        response = await self._testsuite_action(
'run_periodic_task', name=name)
 
  700        if not response[
'status']:
 
  701            raise self.PeriodicTaskFailed(f
'Periodic task {name} failed')
 
  703    async def suspend_periodic_tasks(self, names: typing.List[str]) -> 
None:
 
  704        if not self._periodic_tasks:
 
  706        self._periodic_tasks.tasks_to_suspend.update(names)
 
  707        await self._suspend_periodic_tasks()
 
  709    async def resume_periodic_tasks(self, names: typing.List[str]) -> 
None:
 
  710        if not self._periodic_tasks:
 
  712        self._periodic_tasks.tasks_to_suspend.difference_update(names)
 
  713        await self._suspend_periodic_tasks()
 
  715    async def resume_all_periodic_tasks(self) -> None:
 
  716        if not self._periodic_tasks:
 
  718        self._periodic_tasks.tasks_to_suspend.clear()
 
  719        await self._suspend_periodic_tasks()
 
  721    async def write_cache_dumps(
 
  722            self, names: typing.List[str], *, testsuite_skip_prepare=
False,
 
  724        await self._testsuite_action(
 
  727            testsuite_skip_prepare=testsuite_skip_prepare,
 
  730    async def read_cache_dumps(
 
  731            self, names: typing.List[str], *, testsuite_skip_prepare=
False,
 
  733        await self._testsuite_action(
 
  736            testsuite_skip_prepare=testsuite_skip_prepare,
 
  739    async def run_distlock_task(self, name: str) -> 
None:
 
  740        await self.run_task(f
'distlock/{name}')
 
  742    async def reset_metrics(self) -> None:
 
  743        await self._testsuite_action(
'reset_metrics')
 
  745    async def metrics_portability(
 
  746            self, *, prefix: typing.Optional[str] = 
None,
 
  747    ) -> typing.Dict[str, typing.List[typing.Dict[str, str]]]:
 
  748        return await self._testsuite_action(
 
  749            'metrics_portability', prefix=prefix,
 
  752    async def list_tasks(self) -> typing.List[str]:
 
  753        response = await self._do_testsuite_action(
'tasks_list')
 
  755            response.raise_for_status()
 
  756            body = await response.json(content_type=
None)
 
  759    async def run_task(self, name: str) -> 
None:
 
  760        response = await self._do_testsuite_action(
 
  761            'task_run', json={
'name': name},
 
  763        await _task_check_response(name, response)
 
  765    @contextlib.asynccontextmanager 
  766    async def spawn_task(self, name: str):
 
  767        task_id = await self._task_spawn(name)
 
  771            await self._task_stop_spawned(task_id)
 
  773    async def _task_spawn(self, name: str) -> str:
 
  774        response = await self._do_testsuite_action(
 
  775            'task_spawn', json={
'name': name},
 
  777        data = await _task_check_response(name, response)
 
  778        return data[
'task_id']
 
  780    async def _task_stop_spawned(self, task_id: str) -> 
None:
 
  781        response = await self._do_testsuite_action(
 
  782            'task_stop', json={
'task_id': task_id},
 
  784        await _task_check_response(task_id, response)
 
  786    async def http_allowed_urls_extra(
 
  787            self, http_allowed_urls_extra: typing.List[str],
 
  789        await self._do_testsuite_action(
 
  790            'http_allowed_urls_extra',
 
  791            json={
'allowed_urls_extra': http_allowed_urls_extra},
 
  792            testsuite_skip_prepare=
True,
 
  795    @contextlib.asynccontextmanager 
  796    async def capture_logs(self):
 
  797        async with self._log_capture_fixture.start_capture() 
as capture:
 
  798            await self._testsuite_action(
 
  799                'log_capture', socket_logging_duplication=
True,
 
  804                await self._testsuite_action(
 
  805                    'log_capture', socket_logging_duplication=
False,
 
  808    async def invalidate_caches(
 
  811            clean_update: bool = 
True,
 
  812            cache_names: typing.Optional[typing.List[str]] = 
None,
 
  814        await self.tests_control(
 
  815            invalidate_caches=
True,
 
  816            clean_update=clean_update,
 
  817            cache_names=cache_names,
 
  820    async def tests_control(
 
  823            invalidate_caches: bool = 
True,
 
  824            clean_update: bool = 
True,
 
  825            cache_names: typing.Optional[typing.List[str]] = 
None,
 
  826            http_allowed_urls_extra=
None,
 
  827    ) -> typing.Dict[str, typing.Any]:
 
  830        ] = self._state_manager.get_pending_update()
 
  832        if 'invalidate_caches' in body 
and invalidate_caches:
 
  833            if not clean_update 
or cache_names:
 
  835                    'Manual cache invalidation leads to indirect initial ' 
  836                    'full cache invalidation',
 
  838                await self._prepare()
 
  841        if invalidate_caches:
 
  842            body[
'invalidate_caches'] = {
 
  843                'update_type': (
'full' if clean_update 
else 'incremental'),
 
  846                body[
'invalidate_caches'][
'names'] = cache_names
 
  848        if http_allowed_urls_extra 
is not None:
 
  849            await self.http_allowed_urls_extra(http_allowed_urls_extra)
 
  851        return await self._tests_control(body)
 
  853    async def update_server_state(self) -> None:
 
  854        await self._prepare()
 
  856    async def enable_testpoints(self, *, no_auto_cache_cleanup=False) -> None:
 
  857        if not self._testpoint:
 
  859        if no_auto_cache_cleanup:
 
  860            await self._tests_control(
 
  861                {
'testpoints': sorted(self._testpoint.keys())},
 
  864            await self.update_server_state()
 
  866    async def _tests_control(self, body: dict) -> typing.Dict[str, typing.Any]:
 
  867        with self._state_manager.updating_state(body):
 
  868            async with await self._do_testsuite_action(
 
  869                    'control', json=body, testsuite_skip_prepare=
True,
 
  871                if response.status == 404:
 
  873                        'It seems that testsuite support is not enabled ' 
  876                response.raise_for_status()
 
  877                return await response.json(content_type=
None)
 
  879    async def _suspend_periodic_tasks(self):
 
  881                self._periodic_tasks.tasks_to_suspend
 
  882                != self._periodic_tasks.suspended_tasks
 
  884            await self._testsuite_action(
 
  885                'suspend_periodic_tasks',
 
  886                names=sorted(self._periodic_tasks.tasks_to_suspend),
 
  888            self._periodic_tasks.suspended_tasks = set(
 
  889                self._periodic_tasks.tasks_to_suspend,
 
  892    def _do_testsuite_action(self, action, **kwargs):
 
  893        if not self._config.testsuite_action_path:
 
  895                'tests-control component is not properly configured',
 
  897        path = self._config.testsuite_action_path.format(action=action)
 
  898        return self.post(path, **kwargs)
 
  900    async def _testsuite_action(
 
  901            self, action, *, testsuite_skip_prepare=False, **kwargs,
 
  903        async with await self._do_testsuite_action(
 
  906                testsuite_skip_prepare=testsuite_skip_prepare,
 
  908            if response.status == 500:
 
  909                raise TestsuiteActionFailed
 
  910            response.raise_for_status()
 
  911            return await response.json(content_type=
None)
 
  913    async def _prepare(self) -> None:
 
  914        pending_update = self._state_manager.get_pending_update()
 
  916            await self._tests_control(pending_update)
 
  922            headers: typing.Optional[typing.Dict[str, str]] = 
None,
 
  923            bearer: typing.Optional[str] = 
None,
 
  924            x_real_ip: typing.Optional[str] = 
None,
 
  926            testsuite_skip_prepare: bool = 
False,
 
  928    ) -> aiohttp.ClientResponse:
 
  929        if not testsuite_skip_prepare:
 
  930            await self._prepare()
 
  932        response = await super()._request(
 
  933            http_method, path, headers, bearer, x_real_ip, **kwargs,
 
  935        if self._api_coverage_report:
 
  936            self._api_coverage_report.update_usage_stat(
 
  937                path, http_method, response.status, response.content_type,
 
 
  948    Asyncio userver client, typically retrieved from 
  949    @ref service_client "plugins.service_client.service_client" 
  952    Compatible with werkzeug interface. 
  954    @ingroup userver_testsuite 
  957    PeriodicTaskFailed = PeriodicTaskFailed
 
  958    TestsuiteActionFailed = TestsuiteActionFailed
 
  959    TestsuiteTaskNotFound = TestsuiteTaskNotFound
 
  960    TestsuiteTaskConflict = TestsuiteTaskConflict
 
  961    TestsuiteTaskFailed = TestsuiteTaskFailed
 
  963    def _wrap_client_response(
 
  964            self, response: aiohttp.ClientResponse,
 
  965    ) -> typing.Awaitable[http.ClientResponse]:
 
  966        return http.wrap_client_response(
 
  967            response, json_loads=approx.json_loads,
 
  971    async def run_periodic_task(self, name):
 
  972        await self.
_client.run_periodic_task(name)
 
  975    async def suspend_periodic_tasks(self, names: typing.List[str]) -> 
None:
 
  976        await self.
_client.suspend_periodic_tasks(names)
 
  979    async def resume_periodic_tasks(self, names: typing.List[str]) -> 
None:
 
  980        await self.
_client.resume_periodic_tasks(names)
 
  983    async def resume_all_periodic_tasks(self) -> None:
 
  984        await self.
_client.resume_all_periodic_tasks()
 
  987    async def write_cache_dumps(
 
  988            self, names: typing.List[str], *, testsuite_skip_prepare=
False,
 
  990        await self.
_client.write_cache_dumps(
 
  991            names=names, testsuite_skip_prepare=testsuite_skip_prepare,
 
  995    async def read_cache_dumps(
 
  996            self, names: typing.List[str], *, testsuite_skip_prepare=
False,
 
  998        await self.
_client.read_cache_dumps(
 
  999            names=names, testsuite_skip_prepare=testsuite_skip_prepare,
 
 1002    async def run_task(self, name: str) -> 
None:
 
 1003        await self.
_client.run_task(name)
 
 1005    async def run_distlock_task(self, name: str) -> 
None:
 
 1006        await self.
_client.run_distlock_task(name)
 
 1010        Calls `ResetMetric(metric);` for each metric that has such C++ function 
 
 1015            self, *, prefix: typing.Optional[str] = 
None,
 
 1016    ) -> typing.Dict[str, typing.List[typing.Dict[str, str]]]:
 
 1018        Reports metrics related issues that could be encountered on 
 1019        different monitoring systems. 
 1021        @sa @ref utils::statistics::GetPortabilityInfo 
 
 1025    def list_tasks(self) -> typing.List[str]:
 
 1026        return self.
_client.list_tasks()
 
 1028    def spawn_task(self, name: str):
 
 1029        return self.
_client.spawn_task(name)
 
 1031    def capture_logs(self):
 
 1032        return self.
_client.capture_logs()
 
 1038            clean_update: bool = 
True,
 
 1039            cache_names: typing.Optional[typing.List[str]] = 
None,
 
 1042        Send request to service to update caches. 
 1044        @param clean_update if False, service will do a faster incremental 
 1045               update of caches whenever possible. 
 1046        @param cache_names which caches specifically should be updated; 
 1050            clean_update=clean_update, cache_names=cache_names,
 
 
 1054    async def tests_control(
 
 1055            self, *args, **kwargs,
 
 1056    ) -> typing.Dict[str, typing.Any]:
 
 1057        return await self.
_client.tests_control(*args, **kwargs)
 
 1062        Update service-side state through http call to 'tests/control': 
 1063        - clear dirty (from other tests) caches 
 1064        - set service-side mocked time, 
 1065        - resume / suspend periodic tasks 
 1067        If service is up-to-date, does nothing. 
 
 1074        Send list of handled testpoint pats to service. For these paths service 
 1075        will no more skip http calls from TESTPOINT(...) macro. 
 1077        @param no_auto_cache_cleanup prevent automatic cache cleanup. 
 1078        When calling service client first time in scope of current test, client 
 1079        makes additional http call to `tests/control` to update caches, to get 
 1080        rid of data from previous test. 
 1085@dataclasses.dataclass