97 Base asyncio userver client that implements HTTP requests to service.
99 Compatible with werkzeug interface.
101 @ingroup userver_testsuite
104 def __init__(self, client):
111 json: annotations.JsonAnyOptional =
None,
112 data: typing.Any =
None,
113 params: typing.Optional[typing.Dict[str, str]] =
None,
114 bearer: typing.Optional[str] =
None,
115 x_real_ip: typing.Optional[str] =
None,
116 headers: typing.Optional[typing.Dict[str, str]] =
None,
118 ) -> http.ClientResponse:
120 Make a HTTP POST request
138 json: annotations.JsonAnyOptional =
None,
139 data: typing.Any =
None,
140 params: typing.Optional[typing.Dict[str, str]] =
None,
141 bearer: typing.Optional[str] =
None,
142 x_real_ip: typing.Optional[str] =
None,
143 headers: typing.Optional[typing.Dict[str, str]] =
None,
145 ) -> http.ClientResponse:
147 Make a HTTP PUT request
165 json: annotations.JsonAnyOptional =
None,
166 data: typing.Any =
None,
167 params: typing.Optional[typing.Dict[str, str]] =
None,
168 bearer: typing.Optional[str] =
None,
169 x_real_ip: typing.Optional[str] =
None,
170 headers: typing.Optional[typing.Dict[str, str]] =
None,
172 ) -> http.ClientResponse:
174 Make a HTTP PATCH request
191 headers: typing.Optional[typing.Dict[str, str]] =
None,
192 bearer: typing.Optional[str] =
None,
193 x_real_ip: typing.Optional[str] =
None,
195 ) -> http.ClientResponse:
197 Make a HTTP GET request
211 headers: typing.Optional[typing.Dict[str, str]] =
None,
212 bearer: typing.Optional[str] =
None,
213 x_real_ip: typing.Optional[str] =
None,
215 ) -> http.ClientResponse:
217 Make a HTTP DELETE request
231 headers: typing.Optional[typing.Dict[str, str]] =
None,
232 bearer: typing.Optional[str] =
None,
233 x_real_ip: typing.Optional[str] =
None,
235 ) -> http.ClientResponse:
237 Make a HTTP OPTIONS request
253 ) -> http.ClientResponse:
255 Make a HTTP request with the specified method
257 response = await self.
_client.
request(http_method, path, **kwargs)
263 @deprecated Use pytest_userver.client.Client directly instead.
267 def _wrap_client_response(
269 response: aiohttp.ClientResponse,
270 ) -> typing.Awaitable[http.ClientResponse]:
271 return http.wrap_client_response(response)
277def _wrap_client_error(func):
278 async def _wrapper(*args, **kwargs):
280 return await func(*args, **kwargs)
281 except aiohttp.client_exceptions.ClientResponseError
as exc:
282 raise http.HttpResponseError(
283 url=exc.request_info.url,
290class AiohttpClientMonitor(service_client.AiohttpClient):
291 _config: TestsuiteClientConfig
293 def __init__(self, base_url, *, config: TestsuiteClientConfig, **kwargs):
294 super().__init__(base_url, **kwargs)
295 self._config = config
297 async def get_metrics(self, prefix=None):
298 if not self._config.server_monitor_path:
300 'handler-server-monitor component is not configured',
302 params = {
'format':
'internal'}
303 if prefix
is not None:
304 params[
'prefix'] = prefix
305 response = await self.
get(
306 self._config.server_monitor_path,
310 response.raise_for_status()
311 return await response.json(content_type=
None)
313 async def get_metric(self, metric_name):
314 metrics = await self.get_metrics(metric_name)
315 assert metric_name
in metrics, (
316 f
'No metric with name {metric_name!r}. Use "single_metric" function instead of "get_metric"'
318 return metrics[metric_name]
320 async def metrics_raw(
326 labels: typing.Optional[typing.Dict[str, str]] =
None,
328 if not self._config.server_monitor_path:
330 'handler-server-monitor component is not configured',
333 params = {
'format': output_format}
335 params[
'prefix'] = prefix
338 params[
'path'] = path
341 params[
'labels'] = json.dumps(labels)
343 response = await self.
get(
344 self._config.server_monitor_path,
348 response.raise_for_status()
349 return await response.text()
356 labels: typing.Optional[typing.Dict[str, str]] =
None,
357 ) -> metric_module.MetricsSnapshot:
358 response = await self.metrics_raw(
359 output_format=
'json',
364 return metric_module.MetricsSnapshot.from_json(str(response))
366 async def single_metric_optional(
370 labels: typing.Optional[typing.Dict[str, str]] =
None,
371 ) -> typing.Optional[Metric]:
372 response = await self.metrics(path=path, labels=labels)
373 metrics_list = response.get(path, [])
375 assert len(metrics_list) <= 1, (f
'More than one metric found for path {path} and labels {labels}: {response}',)
380 return next(iter(metrics_list))
382 async def single_metric(
386 labels: typing.Optional[typing.Dict[str, str]] =
None,
388 value = await self.single_metric_optional(path, labels=labels)
389 assert value
is not None, (f
'No metric was found for path {path} and labels {labels}',)
546 A helper class for computing metric differences.
548 @see ClientMonitor.metrics_diff
549 @ingroup userver_testsuite
555 _client: ClientMonitor,
556 _path: typing.Optional[str],
557 _prefix: typing.Optional[str],
558 _labels: typing.Optional[typing.Dict[str, str]],
561 self._client = _client
563 self._prefix = _prefix
564 self._labels = _labels
566 self.
_baseline: typing.Optional[metric_module.MetricsSnapshot] =
None
567 self.
_current: typing.Optional[metric_module.MetricsSnapshot] =
None
568 self.
_diff: typing.Optional[metric_module.MetricsSnapshot] =
None
573 def baseline(self) -> metric_module.MetricsSnapshot:
578 def baseline(self, value: metric_module.MetricsSnapshot) ->
None:
581 self.
_diff = _subtract_metrics_snapshots(
588 def current(self) -> metric_module.MetricsSnapshot:
589 assert self.
_current is not None,
'Set self.current first'
593 def current(self, value: metric_module.MetricsSnapshot) ->
None:
595 assert self.
_baseline is not None,
'Set self.baseline first'
596 self.
_diff = _subtract_metrics_snapshots(
603 def diff(self) -> metric_module.MetricsSnapshot:
604 assert self.
_diff is not None,
'Set self.current first'
609 subpath: typing.Optional[str] =
None,
610 add_labels: typing.Optional[typing.Dict] =
None,
612 default: typing.Optional[float] =
None,
613 ) -> metric_module.MetricValue:
615 Returns a single metric value at the specified path, prepending
616 the path provided at construction. If a dict of labels is provided,
617 does en exact match of labels, prepending the labels provided
620 @param subpath Suffix of the metric path; the path provided
621 at construction is prepended
622 @param add_labels Labels that the metric must have in addition
623 to the labels provided at construction
624 @param default An optional default value in case the metric is missing
625 @throws AssertionError if not one metric by path
627 base_path = self._path
or self._prefix
628 if base_path
and subpath:
629 path = f
'{base_path}.{subpath}'
631 assert base_path
or subpath,
'No path provided'
632 path = base_path
or subpath
or ''
633 labels: typing.Optional[dict] =
None
634 if self._labels
is not None or add_labels
is not None:
635 labels = {**(self._labels
or {}), **(add_labels
or {})}
636 return self.
diff.
value_at(path, labels, default=default)
638 async def fetch(self) -> metric_module.MetricsSnapshot:
640 Fetches metric values from the service.
642 return await self._client.metrics(
648 async def __aenter__(self) -> 'MetricsDiffer':
653 async def __aexit__(self, exc_type, exc, exc_tb) -> None:
660def _subtract_metrics_snapshots(
661 current: metric_module.MetricsSnapshot,
662 initial: metric_module.MetricsSnapshot,
664) -> metric_module.MetricsSnapshot:
665 return metric_module.MetricsSnapshot({
666 path: {_subtract_metrics(path, current_metric, initial, diff_gauge)
for current_metric
in current_group}
667 for path, current_group
in current.items()
671def _subtract_metrics(
673 current_metric: metric_module.Metric,
674 initial: metric_module.MetricsSnapshot,
676) -> metric_module.Metric:
677 initial_group = initial.get(path,
None)
678 if initial_group
is None:
679 return current_metric
680 initial_metric = next(
681 (x
for x
in initial_group
if x.labels == current_metric.labels),
684 if initial_metric
is None:
685 return current_metric
687 return metric_module.Metric(
688 labels=current_metric.labels,
689 value=_subtract_metric_values(
690 current=current_metric,
691 initial=initial_metric,
692 diff_gauge=diff_gauge,
694 _type=current_metric.type(),
698def _subtract_metric_values(
699 current: metric_module.Metric,
700 initial: metric_module.Metric,
702) -> metric_module.MetricValue:
703 assert current.type()
is not metric_module.MetricType.UNSPECIFIED
704 assert initial.type()
is not metric_module.MetricType.UNSPECIFIED
705 assert current.type() == initial.type()
707 if isinstance(current.value, metric_module.Histogram):
708 assert isinstance(initial.value, metric_module.Histogram)
709 return _subtract_metric_values_hist(current=current, initial=initial)
711 assert not isinstance(initial.value, metric_module.Histogram)
712 return _subtract_metric_values_num(
715 diff_gauge=diff_gauge,
719def _subtract_metric_values_num(
720 current: metric_module.Metric,
721 initial: metric_module.Metric,
724 current_value = typing.cast(float, current.value)
725 initial_value = typing.cast(float, initial.value)
727 current.type()
is metric_module.MetricType.RATE
or initial.type()
is metric_module.MetricType.RATE
or diff_gauge
729 return current_value - initial_value
if should_diff
else current_value
732def _subtract_metric_values_hist(
733 current: metric_module.Metric,
734 initial: metric_module.Metric,
735) -> metric_module.Histogram:
736 current_value = typing.cast(metric_module.Histogram, current.value)
737 initial_value = typing.cast(metric_module.Histogram, initial.value)
738 assert current_value.bounds == initial_value.bounds
739 return metric_module.Histogram(
740 bounds=current_value.bounds,
741 buckets=[t[0] - t[1]
for t
in zip(current_value.buckets, initial_value.buckets)],
742 inf=current_value.inf - initial_value.inf,
746class AiohttpClient(service_client.AiohttpClient):
747 PeriodicTaskFailed = PeriodicTaskFailed
748 TestsuiteActionFailed = TestsuiteActionFailed
749 TestsuiteTaskNotFound = TestsuiteTaskNotFound
750 TestsuiteTaskConflict = TestsuiteTaskConflict
751 TestsuiteTaskFailed = TestsuiteTaskFailed
757 config: TestsuiteClientConfig,
762 cache_invalidation_state,
764 api_coverage_report=
None,
765 periodic_tasks_state: typing.Optional[PeriodicTasksState] =
None,
766 allow_all_caches_invalidation: bool =
True,
767 cache_control: typing.Optional[caches.CacheControl] =
None,
771 super().__init__(base_url, span_id_header=span_id_header, **kwargs)
772 self._config = config
773 self._periodic_tasks = periodic_tasks_state
774 self._testpoint = testpoint
775 self._log_capture_fixture = log_capture_fixture
777 mocked_time=mocked_time,
778 testpoint=self._testpoint,
779 testpoint_control=testpoint_control,
780 invalidation_state=cache_invalidation_state,
781 cache_control=cache_control,
783 self._api_coverage_report = api_coverage_report
784 self._allow_all_caches_invalidation = allow_all_caches_invalidation
785 self._asyncexc_check = asyncexc_check
787 async def run_periodic_task(self, name):
788 response = await self._testsuite_action(
'run_periodic_task', name=name)
789 if not response[
'status']:
790 raise self.PeriodicTaskFailed(f
'Periodic task {name} failed')
792 async def suspend_periodic_tasks(self, names: typing.List[str]) ->
None:
793 if not self._periodic_tasks:
795 self._periodic_tasks.tasks_to_suspend.update(names)
796 await self._suspend_periodic_tasks()
798 async def resume_periodic_tasks(self, names: typing.List[str]) ->
None:
799 if not self._periodic_tasks:
801 self._periodic_tasks.tasks_to_suspend.difference_update(names)
802 await self._suspend_periodic_tasks()
804 async def resume_all_periodic_tasks(self) -> None:
805 if not self._periodic_tasks:
807 self._periodic_tasks.tasks_to_suspend.clear()
808 await self._suspend_periodic_tasks()
810 async def write_cache_dumps(
812 names: typing.List[str],
814 testsuite_skip_prepare=
False,
816 await self._testsuite_action(
819 testsuite_skip_prepare=testsuite_skip_prepare,
822 async def read_cache_dumps(
824 names: typing.List[str],
826 testsuite_skip_prepare=
False,
828 await self._testsuite_action(
831 testsuite_skip_prepare=testsuite_skip_prepare,
834 async def run_distlock_task(self, name: str) ->
None:
835 await self.run_task(f
'distlock/{name}')
837 async def reset_metrics(self) -> None:
838 await self._testsuite_action(
'reset_metrics')
840 async def metrics_portability(
843 prefix: typing.Optional[str] =
None,
844 ) -> typing.Dict[str, typing.List[typing.Dict[str, str]]]:
845 return await self._testsuite_action(
846 'metrics_portability',
850 async def list_tasks(self) -> typing.List[str]:
851 response = await self._do_testsuite_action(
'tasks_list')
853 response.raise_for_status()
854 body = await response.json(content_type=
None)
857 async def run_task(self, name: str) ->
None:
858 response = await self._do_testsuite_action(
862 await _task_check_response(name, response)
864 @contextlib.asynccontextmanager
865 async def spawn_task(self, name: str):
866 task_id = await self._task_spawn(name)
870 await self._task_stop_spawned(task_id)
872 async def _task_spawn(self, name: str) -> str:
873 response = await self._do_testsuite_action(
877 data = await _task_check_response(name, response)
878 return data[
'task_id']
880 async def _task_stop_spawned(self, task_id: str) ->
None:
881 response = await self._do_testsuite_action(
883 json={
'task_id': task_id},
885 await _task_check_response(task_id, response)
887 async def http_allowed_urls_extra(
889 http_allowed_urls_extra: typing.List[str],
891 await self._do_testsuite_action(
892 'http_allowed_urls_extra',
893 json={
'allowed_urls_extra': http_allowed_urls_extra},
894 testsuite_skip_prepare=
True,
897 @contextlib.asynccontextmanager
898 async def capture_logs(
901 log_level: str =
'DEBUG',
902 testsuite_skip_prepare: bool =
False,
904 async with self._log_capture_fixture.start_capture(
907 logger.debug(
'Starting logcapture')
908 await self._testsuite_action(
911 socket_logging_duplication=
True,
912 testsuite_skip_prepare=testsuite_skip_prepare,
916 await self._log_capture_fixture.wait_for_client()
919 logger.debug(
'Finishing logcapture')
920 await self._testsuite_action(
922 log_level=self._log_capture_fixture.default_log_level,
923 socket_logging_duplication=
False,
924 testsuite_skip_prepare=testsuite_skip_prepare,
927 async def log_flush(self, logger_name: typing.Optional[str] =
None):
928 await self._testsuite_action(
930 logger_name=logger_name,
931 testsuite_skip_prepare=
True,
934 async def invalidate_caches(
937 clean_update: bool =
True,
938 cache_names: typing.Optional[typing.List[str]] =
None,
939 testsuite_skip_prepare: bool =
False,
941 if cache_names
is None and clean_update:
942 if self._allow_all_caches_invalidation:
943 warnings.warn(CACHE_INVALIDATION_MESSAGE, DeprecationWarning)
945 __tracebackhide__ =
True
946 raise RuntimeError(CACHE_INVALIDATION_MESSAGE)
948 if testsuite_skip_prepare:
949 await self._tests_control({
950 'invalidate_caches': {
951 'update_type': (
'full' if clean_update
else 'incremental'),
952 **({
'names': cache_names}
if cache_names
else {}),
956 await self.tests_control(
957 invalidate_caches=
True,
958 clean_update=clean_update,
959 cache_names=cache_names,
962 async def tests_control(
965 invalidate_caches: bool =
True,
966 clean_update: bool =
True,
967 cache_names: typing.Optional[typing.List[str]] =
None,
968 http_allowed_urls_extra=
None,
969 ) -> typing.Dict[str, typing.Any]:
970 body: typing.Dict[str, typing.Any] = self._state_manager.get_pending_update()
972 if 'invalidate_caches' in body
and invalidate_caches:
973 if not clean_update
or cache_names:
975 'Manual cache invalidation leads to indirect initial full cache invalidation',
977 await self._prepare()
980 if invalidate_caches:
981 body[
'invalidate_caches'] = {
982 'update_type': (
'full' if clean_update
else 'incremental'),
985 body[
'invalidate_caches'][
'names'] = cache_names
987 if http_allowed_urls_extra
is not None:
988 await self.http_allowed_urls_extra(http_allowed_urls_extra)
990 return await self._tests_control(body)
992 async def update_server_state(self) -> None:
993 await self._prepare()
995 async def enable_testpoints(self, *, no_auto_cache_cleanup=False) -> None:
996 if not self._testpoint:
998 if no_auto_cache_cleanup:
999 await self._tests_control({
1000 'testpoints': sorted(self._testpoint.keys()),
1003 await self.update_server_state()
1005 async def get_dynamic_config_defaults(
1007 ) -> typing.Dict[str, typing.Any]:
1008 return await self._testsuite_action(
1009 'get_dynamic_config_defaults',
1010 testsuite_skip_prepare=
True,
1013 async def _tests_control(self, body: dict) -> typing.Dict[str, typing.Any]:
1014 with self._state_manager.updating_state(body):
1015 async with await self._do_testsuite_action(
1018 testsuite_skip_prepare=
True,
1020 if response.status == 404:
1022 'It seems that testsuite support is not enabled for your service',
1024 response.raise_for_status()
1025 return await response.json(content_type=
None)
1027 async def _suspend_periodic_tasks(self):
1028 if self._periodic_tasks.tasks_to_suspend != self._periodic_tasks.suspended_tasks:
1029 await self._testsuite_action(
1030 'suspend_periodic_tasks',
1031 names=sorted(self._periodic_tasks.tasks_to_suspend),
1033 self._periodic_tasks.suspended_tasks = set(
1034 self._periodic_tasks.tasks_to_suspend,
1037 def _do_testsuite_action(self, action, **kwargs):
1038 if not self._config.testsuite_action_path:
1040 'tests-control component is not properly configured',
1042 path = self._config.testsuite_action_path.format(action=action)
1043 return self.post(path, **kwargs)
1045 async def _testsuite_action(
1049 testsuite_skip_prepare=False,
1052 async with await self._do_testsuite_action(
1055 testsuite_skip_prepare=testsuite_skip_prepare,
1057 if response.status == 500:
1058 raise TestsuiteActionFailed
1059 response.raise_for_status()
1060 return await response.json(content_type=
None)
1062 async def _prepare(self) -> None:
1063 with self._state_manager.cache_control_update()
as pending_update:
1065 await self._tests_control(pending_update)
1071 headers: typing.Optional[typing.Dict[str, str]] =
None,
1072 bearer: typing.Optional[str] =
None,
1073 x_real_ip: typing.Optional[str] =
None,
1075 testsuite_skip_prepare: bool =
False,
1077 ) -> aiohttp.ClientResponse:
1078 if self._asyncexc_check:
1080 self._asyncexc_check()
1082 if not testsuite_skip_prepare:
1083 await self._prepare()
1085 response = await super()._request(
1093 if self._api_coverage_report:
1094 self._api_coverage_report.update_usage_stat(
1098 response.content_type,
1101 if self._asyncexc_check:
1103 self._asyncexc_check()