97 Base asyncio userver client that implements HTTP requests to service.
99 Compatible with werkzeug interface.
101 @ingroup userver_testsuite
104 def __init__(self, client):
111 json: annotations.JsonAnyOptional =
None,
112 data: typing.Any =
None,
113 params: typing.Optional[typing.Dict[str, str]] =
None,
114 bearer: typing.Optional[str] =
None,
115 x_real_ip: typing.Optional[str] =
None,
116 headers: typing.Optional[typing.Dict[str, str]] =
None,
118 ) -> http.ClientResponse:
120 Make a HTTP POST request
138 json: annotations.JsonAnyOptional =
None,
139 data: typing.Any =
None,
140 params: typing.Optional[typing.Dict[str, str]] =
None,
141 bearer: typing.Optional[str] =
None,
142 x_real_ip: typing.Optional[str] =
None,
143 headers: typing.Optional[typing.Dict[str, str]] =
None,
145 ) -> http.ClientResponse:
147 Make a HTTP PUT request
165 json: annotations.JsonAnyOptional =
None,
166 data: typing.Any =
None,
167 params: typing.Optional[typing.Dict[str, str]] =
None,
168 bearer: typing.Optional[str] =
None,
169 x_real_ip: typing.Optional[str] =
None,
170 headers: typing.Optional[typing.Dict[str, str]] =
None,
172 ) -> http.ClientResponse:
174 Make a HTTP PATCH request
191 headers: typing.Optional[typing.Dict[str, str]] =
None,
192 bearer: typing.Optional[str] =
None,
193 x_real_ip: typing.Optional[str] =
None,
195 ) -> http.ClientResponse:
197 Make a HTTP GET request
200 path, headers=headers, bearer=bearer, x_real_ip=x_real_ip, **kwargs,
207 headers: typing.Optional[typing.Dict[str, str]] =
None,
208 bearer: typing.Optional[str] =
None,
209 x_real_ip: typing.Optional[str] =
None,
211 ) -> http.ClientResponse:
213 Make a HTTP DELETE request
216 path, headers=headers, bearer=bearer, x_real_ip=x_real_ip, **kwargs,
223 headers: typing.Optional[typing.Dict[str, str]] =
None,
224 bearer: typing.Optional[str] =
None,
225 x_real_ip: typing.Optional[str] =
None,
227 ) -> http.ClientResponse:
229 Make a HTTP OPTIONS request
232 path, headers=headers, bearer=bearer, x_real_ip=x_real_ip, **kwargs,
237 self, http_method: str, path: str, **kwargs,
238 ) -> http.ClientResponse:
240 Make a HTTP request with the specified method
242 response = await self.
_client.
request(http_method, path, **kwargs)
245 def _wrap_client_response(
246 self, response: aiohttp.ClientResponse,
247 ) -> typing.Awaitable[http.ClientResponse]:
248 return http.wrap_client_response(response)
254def _wrap_client_error(func):
255 async def _wrapper(*args, **kwargs):
257 return await func(*args, **kwargs)
258 except aiohttp.client_exceptions.ClientResponseError
as exc:
259 raise http.HttpResponseError(
260 url=exc.request_info.url, status=exc.status,
266class AiohttpClientMonitor(service_client.AiohttpClient):
267 _config: TestsuiteClientConfig
269 def __init__(self, base_url, *, config: TestsuiteClientConfig, **kwargs):
270 super().__init__(base_url, **kwargs)
271 self._config = config
273 async def get_metrics(self, prefix=None):
274 if not self._config.server_monitor_path:
276 'handler-server-monitor component is not configured',
278 params = {
'format':
'internal'}
279 if prefix
is not None:
280 params[
'prefix'] = prefix
281 response = await self.
get(
282 self._config.server_monitor_path, params=params,
285 response.raise_for_status()
286 return await response.json(content_type=
None)
288 async def get_metric(self, metric_name):
289 metrics = await self.get_metrics(metric_name)
290 assert metric_name
in metrics, (
291 f
'No metric with name {metric_name!r}. '
292 f
'Use "single_metric" function instead of "get_metric"'
294 return metrics[metric_name]
296 async def metrics_raw(
302 labels: typing.Optional[typing.Dict[str, str]] =
None,
304 if not self._config.server_monitor_path:
306 'handler-server-monitor component is not configured',
309 params = {
'format': output_format}
311 params[
'prefix'] = prefix
314 params[
'path'] = path
317 params[
'labels'] = json.dumps(labels)
319 response = await self.
get(
320 self._config.server_monitor_path, params=params,
323 response.raise_for_status()
324 return await response.text()
331 labels: typing.Optional[typing.Dict[str, str]] =
None,
332 ) -> metric_module.MetricsSnapshot:
333 response = await self.metrics_raw(
334 output_format=
'json', path=path, prefix=prefix, labels=labels,
336 return metric_module.MetricsSnapshot.from_json(str(response))
338 async def single_metric_optional(
342 labels: typing.Optional[typing.Dict[str, str]] =
None,
343 ) -> typing.Optional[Metric]:
344 response = await self.metrics(path=path, labels=labels)
345 metrics_list = response.get(path, [])
347 assert len(metrics_list) <= 1, (
348 f
'More than one metric found for path {path} and labels {labels}: '
355 return next(iter(metrics_list))
357 async def single_metric(
361 labels: typing.Optional[typing.Dict[str, str]] =
None,
363 value = await self.single_metric_optional(path, labels=labels)
364 assert value
is not None, (
365 f
'No metric was found for path {path} and labels {labels}',
521 A helper class for computing metric differences.
523 @see ClientMonitor.metrics_diff
524 @ingroup userver_testsuite
530 _client: ClientMonitor,
531 _path: typing.Optional[str],
532 _prefix: typing.Optional[str],
533 _labels: typing.Optional[typing.Dict[str, str]],
536 self._client = _client
538 self._prefix = _prefix
539 self._labels = _labels
541 self.
_baseline: typing.Optional[metric_module.MetricsSnapshot] =
None
542 self.
_current: typing.Optional[metric_module.MetricsSnapshot] =
None
543 self.
_diff: typing.Optional[metric_module.MetricsSnapshot] =
None
548 def baseline(self) -> metric_module.MetricsSnapshot:
553 def baseline(self, value: metric_module.MetricsSnapshot) ->
None:
556 self.
_diff = _subtract_metrics_snapshots(
561 def current(self) -> metric_module.MetricsSnapshot:
562 assert self.
_current is not None,
'Set self.current first'
566 def current(self, value: metric_module.MetricsSnapshot) ->
None:
568 assert self.
_baseline is not None,
'Set self.baseline first'
569 self.
_diff = _subtract_metrics_snapshots(
574 def diff(self) -> metric_module.MetricsSnapshot:
575 assert self.
_diff is not None,
'Set self.current first'
580 subpath: typing.Optional[str] =
None,
581 add_labels: typing.Optional[typing.Dict] =
None,
583 default: typing.Optional[float] =
None,
584 ) -> metric_module.MetricValue:
586 Returns a single metric value at the specified path, prepending
587 the path provided at construction. If a dict of labels is provided,
588 does en exact match of labels, prepending the labels provided
591 @param subpath Suffix of the metric path; the path provided
592 at construction is prepended
593 @param add_labels Labels that the metric must have in addition
594 to the labels provided at construction
595 @param default An optional default value in case the metric is missing
596 @throws AssertionError if not one metric by path
598 base_path = self._path
or self._prefix
599 if base_path
and subpath:
600 path = f
'{base_path}.{subpath}'
602 assert base_path
or subpath,
'No path provided'
603 path = base_path
or subpath
or ''
604 labels: typing.Optional[dict] =
None
605 if self._labels
is not None or add_labels
is not None:
606 labels = {**(self._labels
or {}), **(add_labels
or {})}
607 return self.
diff.
value_at(path, labels, default=default)
609 async def fetch(self) -> metric_module.MetricsSnapshot:
611 Fetches metric values from the service.
613 return await self._client.metrics(
614 path=self._path, prefix=self._prefix, labels=self._labels,
617 async def __aenter__(self) -> 'MetricsDiffer':
622 async def __aexit__(self, exc_type, exc, exc_tb) -> None:
629def _subtract_metrics_snapshots(
630 current: metric_module.MetricsSnapshot,
631 initial: metric_module.MetricsSnapshot,
633) -> metric_module.MetricsSnapshot:
634 return metric_module.MetricsSnapshot({
636 _subtract_metrics(path, current_metric, initial, diff_gauge)
637 for current_metric
in current_group
639 for path, current_group
in current.items()
643def _subtract_metrics(
645 current_metric: metric_module.Metric,
646 initial: metric_module.MetricsSnapshot,
648) -> metric_module.Metric:
649 initial_group = initial.get(path,
None)
650 if initial_group
is None:
651 return current_metric
652 initial_metric = next(
653 (x
for x
in initial_group
if x.labels == current_metric.labels),
None,
655 if initial_metric
is None:
656 return current_metric
658 return metric_module.Metric(
659 labels=current_metric.labels,
660 value=_subtract_metric_values(
661 current=current_metric,
662 initial=initial_metric,
663 diff_gauge=diff_gauge,
665 _type=current_metric.type(),
669def _subtract_metric_values(
670 current: metric_module.Metric,
671 initial: metric_module.Metric,
673) -> metric_module.MetricValue:
674 assert current.type()
is not metric_module.MetricType.UNSPECIFIED
675 assert initial.type()
is not metric_module.MetricType.UNSPECIFIED
676 assert current.type() == initial.type()
678 if isinstance(current.value, metric_module.Histogram):
679 assert isinstance(initial.value, metric_module.Histogram)
680 return _subtract_metric_values_hist(current=current, initial=initial)
682 assert not isinstance(initial.value, metric_module.Histogram)
683 return _subtract_metric_values_num(
684 current=current, initial=initial, diff_gauge=diff_gauge,
688def _subtract_metric_values_num(
689 current: metric_module.Metric,
690 initial: metric_module.Metric,
693 current_value = typing.cast(float, current.value)
694 initial_value = typing.cast(float, initial.value)
696 current.type()
is metric_module.MetricType.RATE
697 or initial.type()
is metric_module.MetricType.RATE
700 return current_value - initial_value
if should_diff
else current_value
703def _subtract_metric_values_hist(
704 current: metric_module.Metric, initial: metric_module.Metric,
705) -> metric_module.Histogram:
706 current_value = typing.cast(metric_module.Histogram, current.value)
707 initial_value = typing.cast(metric_module.Histogram, initial.value)
708 assert current_value.bounds == initial_value.bounds
709 return metric_module.Histogram(
710 bounds=current_value.bounds,
713 for t
in zip(current_value.buckets, initial_value.buckets)
715 inf=current_value.inf - initial_value.inf,
719class AiohttpClient(service_client.AiohttpClient):
720 PeriodicTaskFailed = PeriodicTaskFailed
721 TestsuiteActionFailed = TestsuiteActionFailed
722 TestsuiteTaskNotFound = TestsuiteTaskNotFound
723 TestsuiteTaskConflict = TestsuiteTaskConflict
724 TestsuiteTaskFailed = TestsuiteTaskFailed
730 config: TestsuiteClientConfig,
735 cache_invalidation_state,
737 api_coverage_report=
None,
738 periodic_tasks_state: typing.Optional[PeriodicTasksState] =
None,
739 allow_all_caches_invalidation: bool =
True,
740 cache_control: typing.Optional[caches.CacheControl] =
None,
744 super().__init__(base_url, span_id_header=span_id_header, **kwargs)
745 self._config = config
746 self._periodic_tasks = periodic_tasks_state
747 self._testpoint = testpoint
748 self._log_capture_fixture = log_capture_fixture
750 mocked_time=mocked_time,
751 testpoint=self._testpoint,
752 testpoint_control=testpoint_control,
753 invalidation_state=cache_invalidation_state,
754 cache_control=cache_control,
756 self._api_coverage_report = api_coverage_report
757 self._allow_all_caches_invalidation = allow_all_caches_invalidation
758 self._asyncexc_check = asyncexc_check
760 async def run_periodic_task(self, name):
761 response = await self._testsuite_action(
'run_periodic_task', name=name)
762 if not response[
'status']:
763 raise self.PeriodicTaskFailed(f
'Periodic task {name} failed')
765 async def suspend_periodic_tasks(self, names: typing.List[str]) ->
None:
766 if not self._periodic_tasks:
768 self._periodic_tasks.tasks_to_suspend.update(names)
769 await self._suspend_periodic_tasks()
771 async def resume_periodic_tasks(self, names: typing.List[str]) ->
None:
772 if not self._periodic_tasks:
774 self._periodic_tasks.tasks_to_suspend.difference_update(names)
775 await self._suspend_periodic_tasks()
777 async def resume_all_periodic_tasks(self) -> None:
778 if not self._periodic_tasks:
780 self._periodic_tasks.tasks_to_suspend.clear()
781 await self._suspend_periodic_tasks()
783 async def write_cache_dumps(
784 self, names: typing.List[str], *, testsuite_skip_prepare=
False,
786 await self._testsuite_action(
789 testsuite_skip_prepare=testsuite_skip_prepare,
792 async def read_cache_dumps(
793 self, names: typing.List[str], *, testsuite_skip_prepare=
False,
795 await self._testsuite_action(
798 testsuite_skip_prepare=testsuite_skip_prepare,
801 async def run_distlock_task(self, name: str) ->
None:
802 await self.run_task(f
'distlock/{name}')
804 async def reset_metrics(self) -> None:
805 await self._testsuite_action(
'reset_metrics')
807 async def metrics_portability(
808 self, *, prefix: typing.Optional[str] =
None,
809 ) -> typing.Dict[str, typing.List[typing.Dict[str, str]]]:
810 return await self._testsuite_action(
811 'metrics_portability', prefix=prefix,
814 async def list_tasks(self) -> typing.List[str]:
815 response = await self._do_testsuite_action(
'tasks_list')
817 response.raise_for_status()
818 body = await response.json(content_type=
None)
821 async def run_task(self, name: str) ->
None:
822 response = await self._do_testsuite_action(
823 'task_run', json={
'name': name},
825 await _task_check_response(name, response)
827 @contextlib.asynccontextmanager
828 async def spawn_task(self, name: str):
829 task_id = await self._task_spawn(name)
833 await self._task_stop_spawned(task_id)
835 async def _task_spawn(self, name: str) -> str:
836 response = await self._do_testsuite_action(
837 'task_spawn', json={
'name': name},
839 data = await _task_check_response(name, response)
840 return data[
'task_id']
842 async def _task_stop_spawned(self, task_id: str) ->
None:
843 response = await self._do_testsuite_action(
844 'task_stop', json={
'task_id': task_id},
846 await _task_check_response(task_id, response)
848 async def http_allowed_urls_extra(
849 self, http_allowed_urls_extra: typing.List[str],
851 await self._do_testsuite_action(
852 'http_allowed_urls_extra',
853 json={
'allowed_urls_extra': http_allowed_urls_extra},
854 testsuite_skip_prepare=
True,
857 @contextlib.asynccontextmanager
858 async def capture_logs(
859 self, *, log_level: str =
'DEBUG', testsuite_skip_prepare: bool =
False,
861 async with self._log_capture_fixture.start_capture(
864 logger.debug(
'Starting logcapture')
865 await self._testsuite_action(
868 socket_logging_duplication=
True,
869 testsuite_skip_prepare=testsuite_skip_prepare,
873 await self._log_capture_fixture.wait_for_client()
876 logger.debug(
'Finishing logcapture')
877 await self._testsuite_action(
879 log_level=self._log_capture_fixture.default_log_level,
880 socket_logging_duplication=
False,
881 testsuite_skip_prepare=testsuite_skip_prepare,
884 async def log_flush(self, logger_name: typing.Optional[str] =
None):
885 await self._testsuite_action(
886 'log_flush', logger_name=logger_name, testsuite_skip_prepare=
True,
889 async def invalidate_caches(
892 clean_update: bool =
True,
893 cache_names: typing.Optional[typing.List[str]] =
None,
894 testsuite_skip_prepare: bool =
False,
896 if cache_names
is None and clean_update:
897 if self._allow_all_caches_invalidation:
898 warnings.warn(CACHE_INVALIDATION_MESSAGE, DeprecationWarning)
900 __tracebackhide__ =
True
901 raise RuntimeError(CACHE_INVALIDATION_MESSAGE)
903 if testsuite_skip_prepare:
904 await self._tests_control({
905 'invalidate_caches': {
906 'update_type': (
'full' if clean_update
else 'incremental'),
907 **({
'names': cache_names}
if cache_names
else {}),
911 await self.tests_control(
912 invalidate_caches=
True,
913 clean_update=clean_update,
914 cache_names=cache_names,
917 async def tests_control(
920 invalidate_caches: bool =
True,
921 clean_update: bool =
True,
922 cache_names: typing.Optional[typing.List[str]] =
None,
923 http_allowed_urls_extra=
None,
924 ) -> typing.Dict[str, typing.Any]:
925 body: typing.Dict[str, typing.Any] = (
926 self._state_manager.get_pending_update()
929 if 'invalidate_caches' in body
and invalidate_caches:
930 if not clean_update
or cache_names:
932 'Manual cache invalidation leads to indirect initial '
933 'full cache invalidation',
935 await self._prepare()
938 if invalidate_caches:
939 body[
'invalidate_caches'] = {
940 'update_type': (
'full' if clean_update
else 'incremental'),
943 body[
'invalidate_caches'][
'names'] = cache_names
945 if http_allowed_urls_extra
is not None:
946 await self.http_allowed_urls_extra(http_allowed_urls_extra)
948 return await self._tests_control(body)
950 async def update_server_state(self) -> None:
951 await self._prepare()
953 async def enable_testpoints(self, *, no_auto_cache_cleanup=False) -> None:
954 if not self._testpoint:
956 if no_auto_cache_cleanup:
957 await self._tests_control({
958 'testpoints': sorted(self._testpoint.keys()),
961 await self.update_server_state()
963 async def get_dynamic_config_defaults(
965 ) -> typing.Dict[str, typing.Any]:
966 return await self._testsuite_action(
967 'get_dynamic_config_defaults', testsuite_skip_prepare=
True,
970 async def _tests_control(self, body: dict) -> typing.Dict[str, typing.Any]:
971 with self._state_manager.updating_state(body):
972 async with await self._do_testsuite_action(
973 'control', json=body, testsuite_skip_prepare=
True,
975 if response.status == 404:
977 'It seems that testsuite support is not enabled '
980 response.raise_for_status()
981 return await response.json(content_type=
None)
983 async def _suspend_periodic_tasks(self):
985 self._periodic_tasks.tasks_to_suspend
986 != self._periodic_tasks.suspended_tasks
988 await self._testsuite_action(
989 'suspend_periodic_tasks',
990 names=sorted(self._periodic_tasks.tasks_to_suspend),
992 self._periodic_tasks.suspended_tasks = set(
993 self._periodic_tasks.tasks_to_suspend,
996 def _do_testsuite_action(self, action, **kwargs):
997 if not self._config.testsuite_action_path:
999 'tests-control component is not properly configured',
1001 path = self._config.testsuite_action_path.format(action=action)
1002 return self.post(path, **kwargs)
1004 async def _testsuite_action(
1005 self, action, *, testsuite_skip_prepare=False, **kwargs,
1007 async with await self._do_testsuite_action(
1008 action, json=kwargs, testsuite_skip_prepare=testsuite_skip_prepare,
1010 if response.status == 500:
1011 raise TestsuiteActionFailed
1012 response.raise_for_status()
1013 return await response.json(content_type=
None)
1015 async def _prepare(self) -> None:
1016 with self._state_manager.cache_control_update()
as pending_update:
1018 await self._tests_control(pending_update)
1024 headers: typing.Optional[typing.Dict[str, str]] =
None,
1025 bearer: typing.Optional[str] =
None,
1026 x_real_ip: typing.Optional[str] =
None,
1028 testsuite_skip_prepare: bool =
False,
1030 ) -> aiohttp.ClientResponse:
1031 if self._asyncexc_check:
1033 self._asyncexc_check()
1035 if not testsuite_skip_prepare:
1036 await self._prepare()
1038 response = await super()._request(
1039 http_method, path, headers, bearer, x_real_ip, **kwargs,
1041 if self._api_coverage_report:
1042 self._api_coverage_report.update_usage_stat(
1043 path, http_method, response.status, response.content_type,
1046 if self._asyncexc_check:
1048 self._asyncexc_check()