88 Base asyncio userver client that implements HTTP requests to service.
90 Compatible with werkzeug interface.
92 @ingroup userver_testsuite
95 def __init__(self, client):
101 json: annotations.JsonAnyOptional =
None,
102 data: typing.Any =
None,
103 params: typing.Optional[typing.Dict[str, str]] =
None,
104 bearer: typing.Optional[str] =
None,
105 x_real_ip: typing.Optional[str] =
None,
106 headers: typing.Optional[typing.Dict[str, str]] =
None,
108 ) -> http.ClientResponse:
110 Make a HTTP POST request
127 json: annotations.JsonAnyOptional =
None,
128 data: typing.Any =
None,
129 params: typing.Optional[typing.Dict[str, str]] =
None,
130 bearer: typing.Optional[str] =
None,
131 x_real_ip: typing.Optional[str] =
None,
132 headers: typing.Optional[typing.Dict[str, str]] =
None,
134 ) -> http.ClientResponse:
136 Make a HTTP PUT request
153 json: annotations.JsonAnyOptional =
None,
154 data: typing.Any =
None,
155 params: typing.Optional[typing.Dict[str, str]] =
None,
156 bearer: typing.Optional[str] =
None,
157 x_real_ip: typing.Optional[str] =
None,
158 headers: typing.Optional[typing.Dict[str, str]] =
None,
160 ) -> http.ClientResponse:
162 Make a HTTP PATCH request
179 headers: typing.Optional[typing.Dict[str, str]] =
None,
180 bearer: typing.Optional[str] =
None,
181 x_real_ip: typing.Optional[str] =
None,
183 ) -> http.ClientResponse:
185 Make a HTTP GET request
199 headers: typing.Optional[typing.Dict[str, str]] =
None,
200 bearer: typing.Optional[str] =
None,
201 x_real_ip: typing.Optional[str] =
None,
203 ) -> http.ClientResponse:
205 Make a HTTP DELETE request
219 headers: typing.Optional[typing.Dict[str, str]] =
None,
220 bearer: typing.Optional[str] =
None,
221 x_real_ip: typing.Optional[str] =
None,
223 ) -> http.ClientResponse:
225 Make a HTTP OPTIONS request
237 self, http_method: str, path: str, **kwargs,
238 ) -> http.ClientResponse:
240 Make a HTTP request with the specified method
242 response = await self.
_client.
request(http_method, path, **kwargs)
245 def _wrap_client_response(
246 self, response: aiohttp.ClientResponse,
247 ) -> typing.Awaitable[http.ClientResponse]:
248 return http.wrap_client_response(response)
254def _wrap_client_error(func):
255 async def _wrapper(*args, **kwargs):
257 return await func(*args, **kwargs)
258 except aiohttp.client_exceptions.ClientResponseError
as exc:
259 raise http.HttpResponseError(
260 url=exc.request_info.url, status=exc.status,
266class AiohttpClientMonitor(service_client.AiohttpClient):
267 _config: TestsuiteClientConfig
269 def __init__(self, base_url, *, config: TestsuiteClientConfig, **kwargs):
270 super().__init__(base_url, **kwargs)
271 self._config = config
273 async def get_metrics(self, prefix=None):
274 if not self._config.server_monitor_path:
276 'handler-server-monitor component is not configured',
278 if prefix
is not None:
279 params = {
'prefix': prefix}
282 response = await self.
get(
283 self._config.server_monitor_path, params=params,
286 response.raise_for_status()
287 return await response.json(content_type=
None)
289 async def get_metric(self, metric_name):
290 metrics = await self.get_metrics(metric_name)
291 assert metric_name
in metrics, (
292 f
'No metric with name {metric_name!r}. '
293 f
'Use "single_metric" function instead of "get_metric"'
295 return metrics[metric_name]
297 async def metrics_raw(
303 labels: typing.Optional[typing.Dict[str, str]] =
None,
305 if not self._config.server_monitor_path:
307 'handler-server-monitor component is not configured',
310 params = {
'format': output_format}
312 params[
'prefix'] = prefix
315 params[
'path'] = path
318 params[
'labels'] = json.dumps(labels)
320 response = await self.
get(
321 self._config.server_monitor_path, params=params,
324 response.raise_for_status()
325 return await response.text()
332 labels: typing.Optional[typing.Dict[str, str]] =
None,
333 ) -> metric_module.MetricsSnapshot:
334 response = await self.metrics_raw(
335 output_format=
'json', path=path, prefix=prefix, labels=labels,
337 return metric_module.MetricsSnapshot.from_json(str(response))
339 async def single_metric_optional(
343 labels: typing.Optional[typing.Dict[str, str]] =
None,
344 ) -> typing.Optional[Metric]:
345 response = await self.metrics(path=path, labels=labels)
346 metrics_list = response.get(path, [])
348 assert len(metrics_list) <= 1, (
349 f
'More than one metric found for path {path} and labels {labels}: '
356 return next(iter(metrics_list))
358 async def single_metric(
362 labels: typing.Optional[typing.Dict[str, str]] =
None,
364 value = await self.single_metric_optional(path, labels=labels)
365 assert value
is not None, (
366 f
'No metric was found for path {path} and labels {labels}',
516 A helper class for computing metric differences.
518 @see ClientMonitor.metrics_diff
519 @ingroup userver_testsuite
525 _client: ClientMonitor,
526 _path: typing.Optional[str],
527 _prefix: typing.Optional[str],
528 _labels: typing.Optional[typing.Dict[str, str]],
531 self._client = _client
533 self._prefix = _prefix
534 self._labels = _labels
536 self.
_baseline: typing.Optional[metric_module.MetricsSnapshot] =
None
537 self.
_current: typing.Optional[metric_module.MetricsSnapshot] =
None
538 self.
_diff: typing.Optional[metric_module.MetricsSnapshot] =
None
543 def baseline(self) -> metric_module.MetricsSnapshot:
548 def baseline(self, value: metric_module.MetricsSnapshot) ->
None:
551 self.
_diff = _subtract_metrics_snapshots(
556 def current(self) -> metric_module.MetricsSnapshot:
557 assert self.
_current is not None,
'Set self.current first'
561 def current(self, value: metric_module.MetricsSnapshot) ->
None:
563 assert self.
_baseline is not None,
'Set self.baseline first'
564 self.
_diff = _subtract_metrics_snapshots(
569 def diff(self) -> metric_module.MetricsSnapshot:
570 assert self.
_diff is not None,
'Set self.current first'
575 subpath: typing.Optional[str] =
None,
576 add_labels: typing.Optional[typing.Dict] =
None,
578 default: typing.Optional[float] =
None,
581 Returns a single metric value at the specified path, prepending
582 the path provided at construction. If a dict of labels is provided,
583 does en exact match of labels, prepending the labels provided
586 @param subpath Suffix of the metric path; the path provided
587 at construction is prepended
588 @param add_labels Labels that the metric must have in addition
589 to the labels provided at construction
590 @param default An optional default value in case the metric is missing
591 @throws AssertionError if not one metric by path
593 base_path = self._path
or self._prefix
594 if base_path
and subpath:
595 path = f
'{base_path}.{subpath}'
597 assert base_path
or subpath,
'No path provided'
598 path = base_path
or subpath
or ''
599 labels: typing.Optional[dict] =
None
600 if self._labels
is not None or add_labels
is not None:
601 labels = {**(self._labels
or {}), **(add_labels
or {})}
602 return self.
diff.
value_at(path, labels, default=default)
604 async def fetch(self) -> metric_module.MetricsSnapshot:
606 Fetches metric values from the service.
608 return await self._client.metrics(
609 path=self._path, prefix=self._prefix, labels=self._labels,
612 async def __aenter__(self) -> 'MetricsDiffer':
617 async def __aexit__(self, exc_type, exc, tb) -> None:
624def _subtract_metrics_snapshots(
625 current: metric_module.MetricsSnapshot,
626 initial: metric_module.MetricsSnapshot,
628) -> metric_module.MetricsSnapshot:
629 return metric_module.MetricsSnapshot(
632 _subtract_metrics(path, current_metric, initial, diff_gauge)
633 for current_metric
in current_group
635 for path, current_group
in current.items()
640def _subtract_metrics(
642 current_metric: metric_module.Metric,
643 initial: metric_module.MetricsSnapshot,
645) -> metric_module.Metric:
646 assert diff_gauge,
'diff_gauge=False is unimplemented'
648 initial_group = initial.get(path,
None)
649 if initial_group
is None:
650 return current_metric
651 initial_metric = next(
652 (x
for x
in initial_group
if x.labels == current_metric.labels),
None,
654 if initial_metric
is None:
655 return current_metric
657 return metric_module.Metric(
658 labels=current_metric.labels,
659 value=current_metric.value - initial_metric.value,
663class AiohttpClient(service_client.AiohttpClient):
664 PeriodicTaskFailed = PeriodicTaskFailed
665 TestsuiteActionFailed = TestsuiteActionFailed
666 TestsuiteTaskNotFound = TestsuiteTaskNotFound
667 TestsuiteTaskConflict = TestsuiteTaskConflict
668 TestsuiteTaskFailed = TestsuiteTaskFailed
674 config: TestsuiteClientConfig,
679 cache_invalidation_state,
681 api_coverage_report=
None,
682 periodic_tasks_state: typing.Optional[PeriodicTasksState] =
None,
685 super().__init__(base_url, span_id_header=span_id_header, **kwargs)
686 self._config = config
687 self._periodic_tasks = periodic_tasks_state
688 self._testpoint = testpoint
689 self._log_capture_fixture = log_capture_fixture
691 mocked_time=mocked_time,
692 testpoint=self._testpoint,
693 testpoint_control=testpoint_control,
694 invalidation_state=cache_invalidation_state,
696 self._api_coverage_report = api_coverage_report
698 async def run_periodic_task(self, name):
699 response = await self._testsuite_action(
'run_periodic_task', name=name)
700 if not response[
'status']:
701 raise self.PeriodicTaskFailed(f
'Periodic task {name} failed')
703 async def suspend_periodic_tasks(self, names: typing.List[str]) ->
None:
704 if not self._periodic_tasks:
706 self._periodic_tasks.tasks_to_suspend.update(names)
707 await self._suspend_periodic_tasks()
709 async def resume_periodic_tasks(self, names: typing.List[str]) ->
None:
710 if not self._periodic_tasks:
712 self._periodic_tasks.tasks_to_suspend.difference_update(names)
713 await self._suspend_periodic_tasks()
715 async def resume_all_periodic_tasks(self) -> None:
716 if not self._periodic_tasks:
718 self._periodic_tasks.tasks_to_suspend.clear()
719 await self._suspend_periodic_tasks()
721 async def write_cache_dumps(
722 self, names: typing.List[str], *, testsuite_skip_prepare=
False,
724 await self._testsuite_action(
727 testsuite_skip_prepare=testsuite_skip_prepare,
730 async def read_cache_dumps(
731 self, names: typing.List[str], *, testsuite_skip_prepare=
False,
733 await self._testsuite_action(
736 testsuite_skip_prepare=testsuite_skip_prepare,
739 async def run_distlock_task(self, name: str) ->
None:
740 await self.run_task(f
'distlock/{name}')
742 async def reset_metrics(self) -> None:
743 await self._testsuite_action(
'reset_metrics')
745 async def metrics_portability(
746 self, *, prefix: typing.Optional[str] =
None,
747 ) -> typing.Dict[str, typing.List[typing.Dict[str, str]]]:
748 return await self._testsuite_action(
749 'metrics_portability', prefix=prefix,
752 async def list_tasks(self) -> typing.List[str]:
753 response = await self._do_testsuite_action(
'tasks_list')
755 response.raise_for_status()
756 body = await response.json(content_type=
None)
759 async def run_task(self, name: str) ->
None:
760 response = await self._do_testsuite_action(
761 'task_run', json={
'name': name},
763 await _task_check_response(name, response)
765 @contextlib.asynccontextmanager
766 async def spawn_task(self, name: str):
767 task_id = await self._task_spawn(name)
771 await self._task_stop_spawned(task_id)
773 async def _task_spawn(self, name: str) -> str:
774 response = await self._do_testsuite_action(
775 'task_spawn', json={
'name': name},
777 data = await _task_check_response(name, response)
778 return data[
'task_id']
780 async def _task_stop_spawned(self, task_id: str) ->
None:
781 response = await self._do_testsuite_action(
782 'task_stop', json={
'task_id': task_id},
784 await _task_check_response(task_id, response)
786 async def http_allowed_urls_extra(
787 self, http_allowed_urls_extra: typing.List[str],
789 await self._do_testsuite_action(
790 'http_allowed_urls_extra',
791 json={
'allowed_urls_extra': http_allowed_urls_extra},
792 testsuite_skip_prepare=
True,
795 @contextlib.asynccontextmanager
796 async def capture_logs(self):
797 async with self._log_capture_fixture.start_capture()
as capture:
798 await self._testsuite_action(
799 'log_capture', socket_logging_duplication=
True,
804 await self._testsuite_action(
805 'log_capture', socket_logging_duplication=
False,
808 async def invalidate_caches(
811 clean_update: bool =
True,
812 cache_names: typing.Optional[typing.List[str]] =
None,
814 await self.tests_control(
815 invalidate_caches=
True,
816 clean_update=clean_update,
817 cache_names=cache_names,
820 async def tests_control(
823 invalidate_caches: bool =
True,
824 clean_update: bool =
True,
825 cache_names: typing.Optional[typing.List[str]] =
None,
826 http_allowed_urls_extra=
None,
827 ) -> typing.Dict[str, typing.Any]:
830 ] = self._state_manager.get_pending_update()
832 if 'invalidate_caches' in body
and invalidate_caches:
833 if not clean_update
or cache_names:
835 'Manual cache invalidation leads to indirect initial '
836 'full cache invalidation',
838 await self._prepare()
841 if invalidate_caches:
842 body[
'invalidate_caches'] = {
843 'update_type': (
'full' if clean_update
else 'incremental'),
846 body[
'invalidate_caches'][
'names'] = cache_names
848 if http_allowed_urls_extra
is not None:
849 await self.http_allowed_urls_extra(http_allowed_urls_extra)
851 return await self._tests_control(body)
853 async def update_server_state(self) -> None:
854 await self._prepare()
856 async def enable_testpoints(self, *, no_auto_cache_cleanup=False) -> None:
857 if not self._testpoint:
859 if no_auto_cache_cleanup:
860 await self._tests_control(
861 {
'testpoints': sorted(self._testpoint.keys())},
864 await self.update_server_state()
866 async def _tests_control(self, body: dict) -> typing.Dict[str, typing.Any]:
867 with self._state_manager.updating_state(body):
868 async with await self._do_testsuite_action(
869 'control', json=body, testsuite_skip_prepare=
True,
871 if response.status == 404:
873 'It seems that testsuite support is not enabled '
876 response.raise_for_status()
877 return await response.json(content_type=
None)
879 async def _suspend_periodic_tasks(self):
881 self._periodic_tasks.tasks_to_suspend
882 != self._periodic_tasks.suspended_tasks
884 await self._testsuite_action(
885 'suspend_periodic_tasks',
886 names=sorted(self._periodic_tasks.tasks_to_suspend),
888 self._periodic_tasks.suspended_tasks = set(
889 self._periodic_tasks.tasks_to_suspend,
892 def _do_testsuite_action(self, action, **kwargs):
893 if not self._config.testsuite_action_path:
895 'tests-control component is not properly configured',
897 path = self._config.testsuite_action_path.format(action=action)
898 return self.post(path, **kwargs)
900 async def _testsuite_action(
901 self, action, *, testsuite_skip_prepare=False, **kwargs,
903 async with await self._do_testsuite_action(
906 testsuite_skip_prepare=testsuite_skip_prepare,
908 if response.status == 500:
909 raise TestsuiteActionFailed
910 response.raise_for_status()
911 return await response.json(content_type=
None)
913 async def _prepare(self) -> None:
914 pending_update = self._state_manager.get_pending_update()
916 await self._tests_control(pending_update)
922 headers: typing.Optional[typing.Dict[str, str]] =
None,
923 bearer: typing.Optional[str] =
None,
924 x_real_ip: typing.Optional[str] =
None,
926 testsuite_skip_prepare: bool =
False,
928 ) -> aiohttp.ClientResponse:
929 if not testsuite_skip_prepare:
930 await self._prepare()
932 response = await super()._request(
933 http_method, path, headers, bearer, x_real_ip, **kwargs,
935 if self._api_coverage_report:
936 self._api_coverage_report.update_usage_stat(
937 path, http_method, response.status, response.content_type,
948 Asyncio userver client, typically retrieved from
949 @ref service_client "plugins.service_client.service_client"
952 Compatible with werkzeug interface.
954 @ingroup userver_testsuite
957 PeriodicTaskFailed = PeriodicTaskFailed
958 TestsuiteActionFailed = TestsuiteActionFailed
959 TestsuiteTaskNotFound = TestsuiteTaskNotFound
960 TestsuiteTaskConflict = TestsuiteTaskConflict
961 TestsuiteTaskFailed = TestsuiteTaskFailed
963 def _wrap_client_response(
964 self, response: aiohttp.ClientResponse,
965 ) -> typing.Awaitable[http.ClientResponse]:
966 return http.wrap_client_response(
967 response, json_loads=approx.json_loads,
971 async def run_periodic_task(self, name):
972 await self.
_client.run_periodic_task(name)
975 async def suspend_periodic_tasks(self, names: typing.List[str]) ->
None:
976 await self.
_client.suspend_periodic_tasks(names)
979 async def resume_periodic_tasks(self, names: typing.List[str]) ->
None:
980 await self.
_client.resume_periodic_tasks(names)
983 async def resume_all_periodic_tasks(self) -> None:
984 await self.
_client.resume_all_periodic_tasks()
987 async def write_cache_dumps(
988 self, names: typing.List[str], *, testsuite_skip_prepare=
False,
990 await self.
_client.write_cache_dumps(
991 names=names, testsuite_skip_prepare=testsuite_skip_prepare,
995 async def read_cache_dumps(
996 self, names: typing.List[str], *, testsuite_skip_prepare=
False,
998 await self.
_client.read_cache_dumps(
999 names=names, testsuite_skip_prepare=testsuite_skip_prepare,
1002 async def run_task(self, name: str) ->
None:
1003 await self.
_client.run_task(name)
1005 async def run_distlock_task(self, name: str) ->
None:
1006 await self.
_client.run_distlock_task(name)
1010 Calls `ResetMetric(metric);` for each metric that has such C++ function
1015 self, *, prefix: typing.Optional[str] =
None,
1016 ) -> typing.Dict[str, typing.List[typing.Dict[str, str]]]:
1018 Reports metrics related issues that could be encountered on
1019 different monitoring systems.
1021 @sa @ref utils::statistics::GetPortabilityInfo
1025 def list_tasks(self) -> typing.List[str]:
1026 return self.
_client.list_tasks()
1028 def spawn_task(self, name: str):
1029 return self.
_client.spawn_task(name)
1031 def capture_logs(self):
1032 return self.
_client.capture_logs()
1038 clean_update: bool =
True,
1039 cache_names: typing.Optional[typing.List[str]] =
None,
1042 Send request to service to update caches.
1044 @param clean_update if False, service will do a faster incremental
1045 update of caches whenever possible.
1046 @param cache_names which caches specifically should be updated;
1050 clean_update=clean_update, cache_names=cache_names,
1054 async def tests_control(
1055 self, *args, **kwargs,
1056 ) -> typing.Dict[str, typing.Any]:
1057 return await self.
_client.tests_control(*args, **kwargs)
1062 Update service-side state through http call to 'tests/control':
1063 - clear dirty (from other tests) caches
1064 - set service-side mocked time,
1065 - resume / suspend periodic tasks
1067 If service is up-to-date, does nothing.
1074 Send list of handled testpoint pats to service. For these paths service
1075 will no more skip http calls from TESTPOINT(...) macro.
1077 @param no_auto_cache_cleanup prevent automatic cache cleanup.
1078 When calling service client first time in scope of current test, client
1079 makes additional http call to `tests/control` to update caches, to get
1080 rid of data from previous test.
1085@dataclasses.dataclass