97 Base asyncio userver client that implements HTTP requests to service.
99 Compatible with werkzeug interface.
101 @ingroup userver_testsuite
104 def __init__(self, client):
111 json: annotations.JsonAnyOptional =
None,
112 data: typing.Any =
None,
113 params: typing.Optional[typing.Dict[str, str]] =
None,
114 bearer: typing.Optional[str] =
None,
115 x_real_ip: typing.Optional[str] =
None,
116 headers: typing.Optional[typing.Dict[str, str]] =
None,
118 ) -> http.ClientResponse:
120 Make a HTTP POST request
138 json: annotations.JsonAnyOptional =
None,
139 data: typing.Any =
None,
140 params: typing.Optional[typing.Dict[str, str]] =
None,
141 bearer: typing.Optional[str] =
None,
142 x_real_ip: typing.Optional[str] =
None,
143 headers: typing.Optional[typing.Dict[str, str]] =
None,
145 ) -> http.ClientResponse:
147 Make a HTTP PUT request
165 json: annotations.JsonAnyOptional =
None,
166 data: typing.Any =
None,
167 params: typing.Optional[typing.Dict[str, str]] =
None,
168 bearer: typing.Optional[str] =
None,
169 x_real_ip: typing.Optional[str] =
None,
170 headers: typing.Optional[typing.Dict[str, str]] =
None,
172 ) -> http.ClientResponse:
174 Make a HTTP PATCH request
191 headers: typing.Optional[typing.Dict[str, str]] =
None,
192 bearer: typing.Optional[str] =
None,
193 x_real_ip: typing.Optional[str] =
None,
195 ) -> http.ClientResponse:
197 Make a HTTP GET request
211 headers: typing.Optional[typing.Dict[str, str]] =
None,
212 bearer: typing.Optional[str] =
None,
213 x_real_ip: typing.Optional[str] =
None,
215 ) -> http.ClientResponse:
217 Make a HTTP DELETE request
231 headers: typing.Optional[typing.Dict[str, str]] =
None,
232 bearer: typing.Optional[str] =
None,
233 x_real_ip: typing.Optional[str] =
None,
235 ) -> http.ClientResponse:
237 Make a HTTP OPTIONS request
249 self, http_method: str, path: str, **kwargs,
250 ) -> http.ClientResponse:
252 Make a HTTP request with the specified method
254 response = await self.
_client.
request(http_method, path, **kwargs)
257 def _wrap_client_response(
258 self, response: aiohttp.ClientResponse,
259 ) -> typing.Awaitable[http.ClientResponse]:
260 return http.wrap_client_response(response)
266def _wrap_client_error(func):
267 async def _wrapper(*args, **kwargs):
269 return await func(*args, **kwargs)
270 except aiohttp.client_exceptions.ClientResponseError
as exc:
271 raise http.HttpResponseError(
272 url=exc.request_info.url, status=exc.status,
278class AiohttpClientMonitor(service_client.AiohttpClient):
279 _config: TestsuiteClientConfig
281 def __init__(self, base_url, *, config: TestsuiteClientConfig, **kwargs):
282 super().__init__(base_url, **kwargs)
283 self._config = config
285 async def get_metrics(self, prefix=None):
286 if not self._config.server_monitor_path:
288 'handler-server-monitor component is not configured',
290 params = {
'format':
'internal'}
291 if prefix
is not None:
292 params[
'prefix'] = prefix
293 response = await self.
get(
294 self._config.server_monitor_path, params=params,
297 response.raise_for_status()
298 return await response.json(content_type=
None)
300 async def get_metric(self, metric_name):
301 metrics = await self.get_metrics(metric_name)
302 assert metric_name
in metrics, (
303 f
'No metric with name {metric_name!r}. '
304 f
'Use "single_metric" function instead of "get_metric"'
306 return metrics[metric_name]
308 async def metrics_raw(
314 labels: typing.Optional[typing.Dict[str, str]] =
None,
316 if not self._config.server_monitor_path:
318 'handler-server-monitor component is not configured',
321 params = {
'format': output_format}
323 params[
'prefix'] = prefix
326 params[
'path'] = path
329 params[
'labels'] = json.dumps(labels)
331 response = await self.
get(
332 self._config.server_monitor_path, params=params,
335 response.raise_for_status()
336 return await response.text()
343 labels: typing.Optional[typing.Dict[str, str]] =
None,
344 ) -> metric_module.MetricsSnapshot:
345 response = await self.metrics_raw(
346 output_format=
'json', path=path, prefix=prefix, labels=labels,
348 return metric_module.MetricsSnapshot.from_json(str(response))
350 async def single_metric_optional(
354 labels: typing.Optional[typing.Dict[str, str]] =
None,
355 ) -> typing.Optional[Metric]:
356 response = await self.metrics(path=path, labels=labels)
357 metrics_list = response.get(path, [])
359 assert len(metrics_list) <= 1, (
360 f
'More than one metric found for path {path} and labels {labels}: '
367 return next(iter(metrics_list))
369 async def single_metric(
373 labels: typing.Optional[typing.Dict[str, str]] =
None,
375 value = await self.single_metric_optional(path, labels=labels)
376 assert value
is not None, (
377 f
'No metric was found for path {path} and labels {labels}',
533 A helper class for computing metric differences.
535 @see ClientMonitor.metrics_diff
536 @ingroup userver_testsuite
542 _client: ClientMonitor,
543 _path: typing.Optional[str],
544 _prefix: typing.Optional[str],
545 _labels: typing.Optional[typing.Dict[str, str]],
548 self._client = _client
550 self._prefix = _prefix
551 self._labels = _labels
553 self.
_baseline: typing.Optional[metric_module.MetricsSnapshot] =
None
554 self.
_current: typing.Optional[metric_module.MetricsSnapshot] =
None
555 self.
_diff: typing.Optional[metric_module.MetricsSnapshot] =
None
560 def baseline(self) -> metric_module.MetricsSnapshot:
565 def baseline(self, value: metric_module.MetricsSnapshot) ->
None:
568 self.
_diff = _subtract_metrics_snapshots(
573 def current(self) -> metric_module.MetricsSnapshot:
574 assert self.
_current is not None,
'Set self.current first'
578 def current(self, value: metric_module.MetricsSnapshot) ->
None:
580 assert self.
_baseline is not None,
'Set self.baseline first'
581 self.
_diff = _subtract_metrics_snapshots(
586 def diff(self) -> metric_module.MetricsSnapshot:
587 assert self.
_diff is not None,
'Set self.current first'
592 subpath: typing.Optional[str] =
None,
593 add_labels: typing.Optional[typing.Dict] =
None,
595 default: typing.Optional[float] =
None,
596 ) -> metric_module.MetricValue:
598 Returns a single metric value at the specified path, prepending
599 the path provided at construction. If a dict of labels is provided,
600 does en exact match of labels, prepending the labels provided
603 @param subpath Suffix of the metric path; the path provided
604 at construction is prepended
605 @param add_labels Labels that the metric must have in addition
606 to the labels provided at construction
607 @param default An optional default value in case the metric is missing
608 @throws AssertionError if not one metric by path
610 base_path = self._path
or self._prefix
611 if base_path
and subpath:
612 path = f
'{base_path}.{subpath}'
614 assert base_path
or subpath,
'No path provided'
615 path = base_path
or subpath
or ''
616 labels: typing.Optional[dict] =
None
617 if self._labels
is not None or add_labels
is not None:
618 labels = {**(self._labels
or {}), **(add_labels
or {})}
619 return self.
diff.
value_at(path, labels, default=default)
621 async def fetch(self) -> metric_module.MetricsSnapshot:
623 Fetches metric values from the service.
625 return await self._client.metrics(
626 path=self._path, prefix=self._prefix, labels=self._labels,
629 async def __aenter__(self) -> 'MetricsDiffer':
634 async def __aexit__(self, exc_type, exc, exc_tb) -> None:
641def _subtract_metrics_snapshots(
642 current: metric_module.MetricsSnapshot,
643 initial: metric_module.MetricsSnapshot,
645) -> metric_module.MetricsSnapshot:
646 return metric_module.MetricsSnapshot(
649 _subtract_metrics(path, current_metric, initial, diff_gauge)
650 for current_metric
in current_group
652 for path, current_group
in current.items()
657def _subtract_metrics(
659 current_metric: metric_module.Metric,
660 initial: metric_module.MetricsSnapshot,
662) -> metric_module.Metric:
663 initial_group = initial.get(path,
None)
664 if initial_group
is None:
665 return current_metric
666 initial_metric = next(
667 (x
for x
in initial_group
if x.labels == current_metric.labels),
None,
669 if initial_metric
is None:
670 return current_metric
672 return metric_module.Metric(
673 labels=current_metric.labels,
674 value=_subtract_metric_values(
675 current=current_metric,
676 initial=initial_metric,
677 diff_gauge=diff_gauge,
679 _type=current_metric.type(),
683def _subtract_metric_values(
684 current: metric_module.Metric,
685 initial: metric_module.Metric,
687) -> metric_module.MetricValue:
688 assert current.type()
is not metric_module.MetricType.UNSPECIFIED
689 assert initial.type()
is not metric_module.MetricType.UNSPECIFIED
690 assert current.type() == initial.type()
692 if isinstance(current.value, metric_module.Histogram):
693 assert isinstance(initial.value, metric_module.Histogram)
694 return _subtract_metric_values_hist(current=current, initial=initial)
696 assert not isinstance(initial.value, metric_module.Histogram)
697 return _subtract_metric_values_num(
698 current=current, initial=initial, diff_gauge=diff_gauge,
702def _subtract_metric_values_num(
703 current: metric_module.Metric,
704 initial: metric_module.Metric,
707 current_value = typing.cast(float, current.value)
708 initial_value = typing.cast(float, initial.value)
710 current.type()
is metric_module.MetricType.RATE
711 or initial.type()
is metric_module.MetricType.RATE
714 return current_value - initial_value
if should_diff
else current_value
717def _subtract_metric_values_hist(
718 current: metric_module.Metric, initial: metric_module.Metric,
719) -> metric_module.Histogram:
720 current_value = typing.cast(metric_module.Histogram, current.value)
721 initial_value = typing.cast(metric_module.Histogram, initial.value)
722 assert current_value.bounds == initial_value.bounds
723 return metric_module.Histogram(
724 bounds=current_value.bounds,
727 for t
in zip(current_value.buckets, initial_value.buckets)
729 inf=current_value.inf - initial_value.inf,
733class AiohttpClient(service_client.AiohttpClient):
734 PeriodicTaskFailed = PeriodicTaskFailed
735 TestsuiteActionFailed = TestsuiteActionFailed
736 TestsuiteTaskNotFound = TestsuiteTaskNotFound
737 TestsuiteTaskConflict = TestsuiteTaskConflict
738 TestsuiteTaskFailed = TestsuiteTaskFailed
744 config: TestsuiteClientConfig,
749 cache_invalidation_state,
751 api_coverage_report=
None,
752 periodic_tasks_state: typing.Optional[PeriodicTasksState] =
None,
753 allow_all_caches_invalidation: bool =
True,
754 cache_control: typing.Optional[caches.CacheControl] =
None,
757 super().__init__(base_url, span_id_header=span_id_header, **kwargs)
758 self._config = config
759 self._periodic_tasks = periodic_tasks_state
760 self._testpoint = testpoint
761 self._log_capture_fixture = log_capture_fixture
763 mocked_time=mocked_time,
764 testpoint=self._testpoint,
765 testpoint_control=testpoint_control,
766 invalidation_state=cache_invalidation_state,
767 cache_control=cache_control,
769 self._api_coverage_report = api_coverage_report
770 self._allow_all_caches_invalidation = allow_all_caches_invalidation
772 async def run_periodic_task(self, name):
773 response = await self._testsuite_action(
'run_periodic_task', name=name)
774 if not response[
'status']:
775 raise self.PeriodicTaskFailed(f
'Periodic task {name} failed')
777 async def suspend_periodic_tasks(self, names: typing.List[str]) ->
None:
778 if not self._periodic_tasks:
780 self._periodic_tasks.tasks_to_suspend.update(names)
781 await self._suspend_periodic_tasks()
783 async def resume_periodic_tasks(self, names: typing.List[str]) ->
None:
784 if not self._periodic_tasks:
786 self._periodic_tasks.tasks_to_suspend.difference_update(names)
787 await self._suspend_periodic_tasks()
789 async def resume_all_periodic_tasks(self) -> None:
790 if not self._periodic_tasks:
792 self._periodic_tasks.tasks_to_suspend.clear()
793 await self._suspend_periodic_tasks()
795 async def write_cache_dumps(
796 self, names: typing.List[str], *, testsuite_skip_prepare=
False,
798 await self._testsuite_action(
801 testsuite_skip_prepare=testsuite_skip_prepare,
804 async def read_cache_dumps(
805 self, names: typing.List[str], *, testsuite_skip_prepare=
False,
807 await self._testsuite_action(
810 testsuite_skip_prepare=testsuite_skip_prepare,
813 async def run_distlock_task(self, name: str) ->
None:
814 await self.run_task(f
'distlock/{name}')
816 async def reset_metrics(self) -> None:
817 await self._testsuite_action(
'reset_metrics')
819 async def metrics_portability(
820 self, *, prefix: typing.Optional[str] =
None,
821 ) -> typing.Dict[str, typing.List[typing.Dict[str, str]]]:
822 return await self._testsuite_action(
823 'metrics_portability', prefix=prefix,
826 async def list_tasks(self) -> typing.List[str]:
827 response = await self._do_testsuite_action(
'tasks_list')
829 response.raise_for_status()
830 body = await response.json(content_type=
None)
833 async def run_task(self, name: str) ->
None:
834 response = await self._do_testsuite_action(
835 'task_run', json={
'name': name},
837 await _task_check_response(name, response)
839 @contextlib.asynccontextmanager
840 async def spawn_task(self, name: str):
841 task_id = await self._task_spawn(name)
845 await self._task_stop_spawned(task_id)
847 async def _task_spawn(self, name: str) -> str:
848 response = await self._do_testsuite_action(
849 'task_spawn', json={
'name': name},
851 data = await _task_check_response(name, response)
852 return data[
'task_id']
854 async def _task_stop_spawned(self, task_id: str) ->
None:
855 response = await self._do_testsuite_action(
856 'task_stop', json={
'task_id': task_id},
858 await _task_check_response(task_id, response)
860 async def http_allowed_urls_extra(
861 self, http_allowed_urls_extra: typing.List[str],
863 await self._do_testsuite_action(
864 'http_allowed_urls_extra',
865 json={
'allowed_urls_extra': http_allowed_urls_extra},
866 testsuite_skip_prepare=
True,
869 @contextlib.asynccontextmanager
870 async def capture_logs(
873 log_level: str =
'DEBUG',
874 testsuite_skip_prepare: bool =
False,
876 async with self._log_capture_fixture.start_capture(
879 await self._testsuite_action(
882 socket_logging_duplication=
True,
883 testsuite_skip_prepare=testsuite_skip_prepare,
888 await self._testsuite_action(
890 log_level=self._log_capture_fixture.default_log_level,
891 socket_logging_duplication=
False,
892 testsuite_skip_prepare=testsuite_skip_prepare,
895 async def log_flush(self, logger_name: typing.Optional[str] =
None):
896 await self._testsuite_action(
897 'log_flush', logger_name=logger_name, testsuite_skip_prepare=
True,
900 async def invalidate_caches(
903 clean_update: bool =
True,
904 cache_names: typing.Optional[typing.List[str]] =
None,
905 testsuite_skip_prepare: bool =
False,
907 if cache_names
is None and clean_update:
908 if self._allow_all_caches_invalidation:
909 warnings.warn(CACHE_INVALIDATION_MESSAGE, DeprecationWarning)
911 __tracebackhide__ =
True
912 raise RuntimeError(CACHE_INVALIDATION_MESSAGE)
914 if testsuite_skip_prepare:
915 await self._tests_control(
917 'invalidate_caches': {
919 'full' if clean_update
else 'incremental'
921 **({
'names': cache_names}
if cache_names
else {}),
926 await self.tests_control(
927 invalidate_caches=
True,
928 clean_update=clean_update,
929 cache_names=cache_names,
932 async def tests_control(
935 invalidate_caches: bool =
True,
936 clean_update: bool =
True,
937 cache_names: typing.Optional[typing.List[str]] =
None,
938 http_allowed_urls_extra=
None,
939 ) -> typing.Dict[str, typing.Any]:
942 ] = self._state_manager.get_pending_update()
944 if 'invalidate_caches' in body
and invalidate_caches:
945 if not clean_update
or cache_names:
947 'Manual cache invalidation leads to indirect initial '
948 'full cache invalidation',
950 await self._prepare()
953 if invalidate_caches:
954 body[
'invalidate_caches'] = {
955 'update_type': (
'full' if clean_update
else 'incremental'),
958 body[
'invalidate_caches'][
'names'] = cache_names
960 if http_allowed_urls_extra
is not None:
961 await self.http_allowed_urls_extra(http_allowed_urls_extra)
963 return await self._tests_control(body)
965 async def update_server_state(self) -> None:
966 await self._prepare()
968 async def enable_testpoints(self, *, no_auto_cache_cleanup=False) -> None:
969 if not self._testpoint:
971 if no_auto_cache_cleanup:
972 await self._tests_control(
973 {
'testpoints': sorted(self._testpoint.keys())},
976 await self.update_server_state()
978 async def get_dynamic_config_defaults(
980 ) -> typing.Dict[str, typing.Any]:
981 return await self._testsuite_action(
982 'get_dynamic_config_defaults', testsuite_skip_prepare=
True,
985 async def _tests_control(self, body: dict) -> typing.Dict[str, typing.Any]:
986 with self._state_manager.updating_state(body):
987 async with await self._do_testsuite_action(
988 'control', json=body, testsuite_skip_prepare=
True,
990 if response.status == 404:
992 'It seems that testsuite support is not enabled '
995 response.raise_for_status()
996 return await response.json(content_type=
None)
998 async def _suspend_periodic_tasks(self):
1000 self._periodic_tasks.tasks_to_suspend
1001 != self._periodic_tasks.suspended_tasks
1003 await self._testsuite_action(
1004 'suspend_periodic_tasks',
1005 names=sorted(self._periodic_tasks.tasks_to_suspend),
1007 self._periodic_tasks.suspended_tasks = set(
1008 self._periodic_tasks.tasks_to_suspend,
1011 def _do_testsuite_action(self, action, **kwargs):
1012 if not self._config.testsuite_action_path:
1014 'tests-control component is not properly configured',
1016 path = self._config.testsuite_action_path.format(action=action)
1017 return self.post(path, **kwargs)
1019 async def _testsuite_action(
1020 self, action, *, testsuite_skip_prepare=False, **kwargs,
1022 async with await self._do_testsuite_action(
1025 testsuite_skip_prepare=testsuite_skip_prepare,
1027 if response.status == 500:
1028 raise TestsuiteActionFailed
1029 response.raise_for_status()
1030 return await response.json(content_type=
None)
1032 async def _prepare(self) -> None:
1033 with self._state_manager.cache_control_update()
as pending_update:
1035 await self._tests_control(pending_update)
1041 headers: typing.Optional[typing.Dict[str, str]] =
None,
1042 bearer: typing.Optional[str] =
None,
1043 x_real_ip: typing.Optional[str] =
None,
1045 testsuite_skip_prepare: bool =
False,
1047 ) -> aiohttp.ClientResponse:
1048 if not testsuite_skip_prepare:
1049 await self._prepare()
1051 response = await super()._request(
1052 http_method, path, headers, bearer, x_real_ip, **kwargs,
1054 if self._api_coverage_report:
1055 self._api_coverage_report.update_usage_stat(
1056 path, http_method, response.status, response.content_type,