555 A helper class for computing metric differences.
557 @see ClientMonitor.metrics_diff
559 Example with @ref pytest_userver.client.ClientMonitor.metrics_diff "await monitor_client.metrics_diff()":
560 @snippet samples/testsuite-support/tests/test_metrics.py metrics diff
562 @ingroup userver_testsuite
568 _client: ClientMonitor,
571 _labels: dict[str, str] |
None,
574 self._client = _client
576 self._prefix = _prefix
577 self._labels = _labels
578 self._diff_gauge = _diff_gauge
586 def baseline(self) -> pytest_userver.metrics.MetricsSnapshot:
594 self.
_diff = _subtract_metrics_snapshots(
601 def current(self) -> pytest_userver.metrics.MetricsSnapshot:
602 assert self.
_current is not None,
'Set self.current first'
608 assert self.
_baseline is not None,
'Set self.baseline first'
609 self.
_diff = _subtract_metrics_snapshots(
616 def diff(self) -> pytest_userver.metrics.MetricsSnapshot:
617 assert self.
_diff is not None,
'Set self.current first'
622 subpath: str |
None =
None,
623 add_labels: dict[str, str] |
None =
None,
625 default: float |
None =
None,
626 ) -> pytest_userver.metrics.MetricValue:
628 Returns a single metric value at the specified path, prepending
629 the path provided at construction. If a dict of labels is provided,
630 does en exact match of labels, prepending the labels provided at construction.
632 @param subpath Suffix of the metric path; the path provided at construction is prepended
633 @param add_labels Labels that the metric must have in addition to the labels provided at construction
634 @param default An optional default value in case the metric is missing
635 @throws AssertionError if not one metric by path
637 base_path = self._path
or self._prefix
638 if base_path
and subpath:
639 path = f
'{base_path}.{subpath}'
641 assert base_path
or subpath,
'No path provided'
642 path = base_path
or subpath
or ''
643 labels: dict |
None =
None
644 if self._labels
is not None or add_labels
is not None:
645 labels = {**(self._labels
or {}), **(add_labels
or {})}
646 return self.
diff.
value_at(path, labels, default=default)
648 async def fetch(self) -> pytest_userver.metrics.MetricsSnapshot:
650 Fetches metric values from the service.
652 return await self._client.metrics(
658 async def __aenter__(self) -> MetricsDiffer:
663 async def __aexit__(self, exc_type, exc, exc_tb) -> None:
670def _subtract_metrics_snapshots(
676 path: {_subtract_metrics(path, current_metric, initial, diff_gauge)
for current_metric
in current_group}
677 for path, current_group
in current.items()
681def _subtract_metrics(
687 initial_group = initial.get(path,
None)
688 if initial_group
is None:
689 return current_metric
690 initial_metric = next(
691 (x
for x
in initial_group
if x.labels == current_metric.labels),
694 if initial_metric
is None:
695 return current_metric
698 labels=current_metric.labels,
699 value=_subtract_metric_values(
700 current=current_metric,
701 initial=initial_metric,
702 diff_gauge=diff_gauge,
704 _type=current_metric.type(),
708def _subtract_metric_values(
712) -> pytest_userver.metrics.MetricValue:
713 assert current.type()
is not pytest_userver.metrics.MetricType.UNSPECIFIED
714 assert initial.type()
is not pytest_userver.metrics.MetricType.UNSPECIFIED
715 assert current.type() == initial.type()
719 return _subtract_metric_values_hist(current=current, initial=initial)
722 return _subtract_metric_values_num(
725 diff_gauge=diff_gauge,
729def _subtract_metric_values_num(
734 current_value = typing.cast(float, current.value)
735 initial_value = typing.cast(float, initial.value)
737 current.type()
is pytest_userver.metrics.MetricType.RATE
738 or initial.type()
is pytest_userver.metrics.MetricType.RATE
741 return current_value - initial_value
if should_diff
else current_value
744def _subtract_metric_values_hist(
750 assert current_value.bounds == initial_value.bounds
752 bounds=current_value.bounds,
753 buckets=[t[0] - t[1]
for t
in zip(current_value.buckets, initial_value.buckets, strict=
True)],
754 inf=current_value.inf - initial_value.inf,
758class AiohttpClient(service_client.AiohttpClient):
759 PeriodicTaskFailed = PeriodicTaskFailed
760 TestsuiteActionFailed = TestsuiteActionFailed
761 TestsuiteTaskNotFound = TestsuiteTaskNotFound
762 TestsuiteTaskConflict = TestsuiteTaskConflict
763 TestsuiteTaskFailed = TestsuiteTaskFailed
769 config: TestsuiteClientConfig,
771 log_capture_fixture: logcapture.CaptureServer,
774 cache_invalidation_state,
776 api_coverage_report=
None,
777 periodic_tasks_state: PeriodicTasksState |
None =
None,
778 allow_all_caches_invalidation: bool =
True,
783 super().__init__(base_url, span_id_header=span_id_header, **kwargs)
784 self._config = config
785 self._periodic_tasks = periodic_tasks_state
786 self._testpoint = testpoint
787 self._log_capture_fixture = log_capture_fixture
789 mocked_time=mocked_time,
790 testpoint=self._testpoint,
791 testpoint_control=testpoint_control,
792 invalidation_state=cache_invalidation_state,
793 cache_control=cache_control,
795 self._api_coverage_report = api_coverage_report
796 self._allow_all_caches_invalidation = allow_all_caches_invalidation
797 self._asyncexc_check = asyncexc_check
799 async def run_periodic(self, name) -> None:
800 await self.run_task(f
'periodic/{name}')
802 async def run_periodic_task(self, name):
803 warnings.warn(userver_warnings.WARN_PERIODIC_DEPRECATION, DeprecationWarning)
804 response = await self._testsuite_action(
'run_periodic_task', name=name)
805 if not response[
'status']:
806 raise self.PeriodicTaskFailed(f
'Periodic task {name} failed')
808 async def suspend_periodic_tasks(self, names: list[str]) ->
None:
809 if not self._periodic_tasks:
811 self._periodic_tasks.tasks_to_suspend.update(names)
812 await self._suspend_periodic_tasks()
814 async def resume_periodic_tasks(self, names: list[str]) ->
None:
815 warnings.warn(userver_warnings.WARN_PERIODIC_DEPRECATION, DeprecationWarning)
816 if not self._periodic_tasks:
818 self._periodic_tasks.tasks_to_suspend.difference_update(names)
819 await self._suspend_periodic_tasks()
821 async def resume_all_periodic_tasks(self) -> None:
822 if not self._periodic_tasks:
824 self._periodic_tasks.tasks_to_suspend.clear()
825 await self._suspend_periodic_tasks()
827 async def write_cache_dumps(
831 testsuite_skip_prepare=
False,
833 await self._testsuite_action(
836 testsuite_skip_prepare=testsuite_skip_prepare,
839 async def read_cache_dumps(
843 testsuite_skip_prepare=
False,
845 await self._testsuite_action(
848 testsuite_skip_prepare=testsuite_skip_prepare,
851 async def run_distlock_task(self, name: str) ->
None:
852 await self.run_task(f
'distlock/{name}')
854 async def reset_metrics(self) -> None:
855 await self._testsuite_action(
'reset_metrics')
857 async def metrics_portability(
860 prefix: str |
None =
None,
861 ) -> dict[str, list[dict[str, str]]]:
862 return await self._testsuite_action(
863 'metrics_portability',
867 async def list_tasks(self) -> list[str]:
868 response = await self._do_testsuite_action(
'tasks_list')
870 response.raise_for_status()
871 body = await response.json(content_type=
None)
874 async def run_task(self, name: str) ->
None:
875 response = await self._do_testsuite_action(
879 await _task_check_response(name, response)
881 @contextlib.asynccontextmanager
882 async def spawn_task(self, name: str):
883 task_id = await self._task_spawn(name)
887 await self._task_stop_spawned(task_id)
889 async def _task_spawn(self, name: str) -> str:
890 response = await self._do_testsuite_action(
894 data = await _task_check_response(name, response)
895 return data[
'task_id']
897 async def _task_stop_spawned(self, task_id: str) ->
None:
898 response = await self._do_testsuite_action(
900 json={
'task_id': task_id},
902 await _task_check_response(task_id, response)
904 async def http_allowed_urls_extra(
906 http_allowed_urls_extra: list[str],
908 await self._do_testsuite_action(
909 'http_allowed_urls_extra',
910 json={
'allowed_urls_extra': http_allowed_urls_extra},
911 testsuite_skip_prepare=
True,
914 @contextlib.asynccontextmanager
915 async def capture_logs(
918 log_level: str =
'DEBUG',
919 testsuite_skip_prepare: bool =
False,
921 async with self._log_capture_fixture.capture(
922 log_level=logcapture.LogLevel.from_string(log_level),
924 logger.debug(
'Starting logcapture')
925 await self._testsuite_action(
928 socket_logging_duplication=
True,
929 testsuite_skip_prepare=testsuite_skip_prepare,
933 await self._log_capture_fixture.wait_for_client()
936 await self._testsuite_action(
938 log_level=self._log_capture_fixture.default_log_level.name,
939 socket_logging_duplication=
False,
940 testsuite_skip_prepare=testsuite_skip_prepare,
943 async def log_flush(self, logger_name: str |
None =
None):
944 await self._testsuite_action(
946 logger_name=logger_name,
947 testsuite_skip_prepare=
True,
950 async def invalidate_caches(
953 clean_update: bool =
True,
954 cache_names: list[str] |
None =
None,
955 testsuite_skip_prepare: bool =
False,
957 if cache_names
is None and clean_update:
958 if self._allow_all_caches_invalidation:
959 warnings.warn(CACHE_INVALIDATION_MESSAGE, DeprecationWarning, stacklevel=2)
961 __tracebackhide__ =
True
962 raise RuntimeError(CACHE_INVALIDATION_MESSAGE)
964 if testsuite_skip_prepare:
965 await self._tests_control({
966 'invalidate_caches': {
967 'update_type': (
'full' if clean_update
else 'incremental'),
968 **({
'names': cache_names}
if cache_names
else {}),
972 await self.tests_control(
973 invalidate_caches=
True,
974 clean_update=clean_update,
975 cache_names=cache_names,
978 async def tests_control(
981 invalidate_caches: bool =
True,
982 clean_update: bool =
True,
983 cache_names: list[str] |
None =
None,
984 http_allowed_urls_extra: list[str] |
None =
None,
986 body: dict[str, Any] = self._state_manager.get_pending_update()
988 if 'invalidate_caches' in body
and invalidate_caches:
989 if not clean_update
or cache_names:
991 'Manual cache invalidation leads to indirect initial full cache invalidation',
993 await self._prepare()
996 if invalidate_caches:
997 body[
'invalidate_caches'] = {
998 'update_type': (
'full' if clean_update
else 'incremental'),
1001 body[
'invalidate_caches'][
'names'] = cache_names
1003 if http_allowed_urls_extra
is not None:
1004 await self.http_allowed_urls_extra(http_allowed_urls_extra)
1006 return await self._tests_control(body)
1008 async def update_server_state(self) -> None:
1009 await self._prepare()
1011 async def enable_testpoints(self, *, no_auto_cache_cleanup=False) -> None:
1012 if not self._testpoint:
1014 if no_auto_cache_cleanup:
1015 await self._tests_control({
1016 'testpoints': sorted(self._testpoint.keys()),
1019 await self.update_server_state()
1021 async def get_dynamic_config_defaults(
1023 ) -> dict[str, Any]:
1024 return await self._testsuite_action(
1025 'get_dynamic_config_defaults',
1026 testsuite_skip_prepare=
True,
1029 async def _tests_control(self, body: dict) -> dict[str, Any]:
1030 with self._state_manager.updating_state(body):
1031 async with await self._do_testsuite_action(
1034 testsuite_skip_prepare=
True,
1036 if response.status == 404:
1038 'It seems that testsuite support is not enabled for your service',
1040 response.raise_for_status()
1041 return await response.json(content_type=
None)
1043 async def _suspend_periodic_tasks(self):
1044 if self._periodic_tasks.tasks_to_suspend != self._periodic_tasks.suspended_tasks:
1045 await self._testsuite_action(
1046 'suspend_periodic_tasks',
1047 names=sorted(self._periodic_tasks.tasks_to_suspend),
1049 self._periodic_tasks.suspended_tasks = set(
1050 self._periodic_tasks.tasks_to_suspend,
1053 def _do_testsuite_action(self, action, **kwargs):
1054 if not self._config.testsuite_action_path:
1056 'tests-control component is not properly configured',
1058 path = self._config.testsuite_action_path.format(action=action)
1059 return self.post(path, **kwargs)
1061 async def _testsuite_action(
1065 testsuite_skip_prepare=False,
1068 async with await self._do_testsuite_action(
1071 testsuite_skip_prepare=testsuite_skip_prepare,
1073 if response.status == 500:
1074 raise TestsuiteActionFailed
1075 response.raise_for_status()
1076 return await response.json(content_type=
None)
1078 async def _prepare(self) -> None:
1079 with self._state_manager.cache_control_update()
as pending_update:
1081 await self._tests_control(pending_update)
1087 headers: dict[str, str] |
None =
None,
1088 bearer: str |
None =
None,
1089 x_real_ip: str |
None =
None,
1091 testsuite_skip_prepare: bool =
False,
1093 ) -> aiohttp.ClientResponse:
1094 if self._asyncexc_check:
1096 self._asyncexc_check()
1098 if not testsuite_skip_prepare:
1099 await self._prepare()
1101 response = await super()._request(
1109 if self._api_coverage_report:
1110 self._api_coverage_report.update_usage_stat(
1114 response.content_type,
1117 if self._asyncexc_check:
1119 self._asyncexc_check()
1129 Asyncio userver client, typically retrieved from
1130 @ref service_client "plugins.service_client.service_client"
1133 Compatible with werkzeug interface.
1135 @ingroup userver_testsuite
1138 PeriodicTaskFailed = PeriodicTaskFailed
1139 TestsuiteActionFailed = TestsuiteActionFailed
1140 TestsuiteTaskNotFound = TestsuiteTaskNotFound
1141 TestsuiteTaskConflict = TestsuiteTaskConflict
1142 TestsuiteTaskFailed = TestsuiteTaskFailed
1144 def _wrap_client_response(self, response: aiohttp.ClientResponse) -> Awaitable[http.ClientResponse]:
1145 return http.wrap_client_response(
1147 json_loads=approx.json_loads,
1151 async def run_periodic(self, name) -> None:
1152 await self.
_client.run_periodic(name)
1155 async def run_periodic_task(self, name):
1156 warnings.warn(userver_warnings.WARN_PERIODIC_DEPRECATION, DeprecationWarning)
1157 await self.
_client.run_periodic_task(name)
1160 async def suspend_periodic_tasks(self, names: list[str]) ->
None:
1161 await self.
_client.suspend_periodic_tasks(names)
1164 async def resume_periodic_tasks(self, names: list[str]) ->
None:
1165 warnings.warn(userver_warnings.WARN_PERIODIC_DEPRECATION, DeprecationWarning)
1166 await self.
_client.resume_periodic_tasks(names)
1169 async def resume_all_periodic_tasks(self) -> None:
1170 await self.
_client.resume_all_periodic_tasks()
1173 async def write_cache_dumps(
1177 testsuite_skip_prepare=
False,
1179 await self.
_client.write_cache_dumps(
1181 testsuite_skip_prepare=testsuite_skip_prepare,
1185 async def read_cache_dumps(
1189 testsuite_skip_prepare=
False,
1191 await self.
_client.read_cache_dumps(
1193 testsuite_skip_prepare=testsuite_skip_prepare,
1196 async def run_task(self, name: str) ->
None:
1197 await self.
_client.run_task(name)
1199 async def run_distlock_task(self, name: str) ->
None:
1200 await self.
_client.run_distlock_task(name)
1204 Calls `ResetMetric(metric);` for each metric that has such C++ function.
1206 Note that using `reset_metrics()` is discouraged, prefer using a more reliable
1207 @ref pytest_userver.client.ClientMonitor.metrics_diff "await monitor_client.metrics_diff()".
1209 @snippet samples/testsuite-support/tests/test_metrics.py metrics reset
1216 prefix: str |
None =
None,
1217 ) -> dict[str, list[dict[str, str]]]:
1219 Reports metrics related issues that could be encountered on
1220 different monitoring systems.
1222 @sa @ref utils::statistics::GetPortabilityWarnings
1226 def list_tasks(self) -> list[str]:
1227 return self.
_client.list_tasks()
1229 def spawn_task(self, name: str):
1230 return self.
_client.spawn_task(name)
1235 log_level: str =
'DEBUG',
1236 testsuite_skip_prepare: bool =
False,
1239 Captures logs from the service.
1241 @param log_level Do not capture logs below this level.
1242 @param testsuite_skip_prepare An advanced parameter to skip auto-`update_server_state`.
1244 @see @ref testsuite_logs_capture
1247 log_level=log_level,
1248 testsuite_skip_prepare=testsuite_skip_prepare,
1261 clean_update: bool =
True,
1262 cache_names: list[str] |
None =
None,
1263 testsuite_skip_prepare: bool =
False,
1266 Send request to service to update caches.
1268 @param clean_update if False, service will do a faster incremental
1269 update of caches whenever possible.
1270 @param cache_names which caches specifically should be updated;
1272 @param testsuite_skip_prepare if False, service will automatically do
1273 update_server_state().
1275 __tracebackhide__ =
True
1277 clean_update=clean_update,
1278 cache_names=cache_names,
1279 testsuite_skip_prepare=testsuite_skip_prepare,
1283 async def tests_control(
1285 invalidate_caches: bool =
True,
1286 clean_update: bool =
True,
1287 cache_names: list[str] |
None =
None,
1288 http_allowed_urls_extra: list[str] |
None =
None,
1289 ) -> dict[str, Any]:
1290 return await self.
_client.tests_control(
1291 invalidate_caches=invalidate_caches,
1292 clean_update=clean_update,
1293 cache_names=cache_names,
1294 http_allowed_urls_extra=http_allowed_urls_extra,
1300 Update service-side state through http call to 'tests/control':
1301 - clear dirty (from other tests) caches
1302 - set service-side mocked time,
1303 - resume / suspend periodic tasks
1305 If service is up-to-date, does nothing.
1312 Send list of handled testpoint pats to service. For these paths service
1313 will no more skip http calls from TESTPOINT(...) macro.
1315 @param no_auto_cache_cleanup prevent automatic cache cleanup.
1316 When calling service client first time in scope of current test, client
1317 makes additional http call to `tests/control` to update caches, to get
1318 rid of data from previous test.
1323 async def get_dynamic_config_defaults(
1325 ) -> dict[str, Any]:
1326 return await self.
_client.get_dynamic_config_defaults()
1329@dataclasses.dataclass