554 A helper class for computing metric differences.
556 @see ClientMonitor.metrics_diff
558 Example with @ref pytest_userver.client.ClientMonitor.metrics_diff "await monitor_client.metrics_diff()":
559 @snippet samples/testsuite-support/tests/test_metrics.py metrics diff
561 @ingroup userver_testsuite
567 _client: ClientMonitor,
570 _labels: dict[str, str] |
None,
573 self._client = _client
575 self._prefix = _prefix
576 self._labels = _labels
577 self._diff_gauge = _diff_gauge
585 def baseline(self) -> pytest_userver.metrics.MetricsSnapshot:
593 self.
_diff = _subtract_metrics_snapshots(
600 def current(self) -> pytest_userver.metrics.MetricsSnapshot:
601 assert self.
_current is not None,
'Set self.current first'
607 assert self.
_baseline is not None,
'Set self.baseline first'
608 self.
_diff = _subtract_metrics_snapshots(
615 def diff(self) -> pytest_userver.metrics.MetricsSnapshot:
616 assert self.
_diff is not None,
'Set self.current first'
621 subpath: str |
None =
None,
622 add_labels: dict[str, str] |
None =
None,
624 default: float |
None =
None,
625 ) -> pytest_userver.metrics.MetricValue:
627 Returns a single metric value at the specified path, prepending
628 the path provided at construction. If a dict of labels is provided,
629 does en exact match of labels, prepending the labels provided at construction.
631 @param subpath Suffix of the metric path; the path provided at construction is prepended
632 @param add_labels Labels that the metric must have in addition to the labels provided at construction
633 @param default An optional default value in case the metric is missing
634 @throws AssertionError if not one metric by path
636 base_path = self._path
or self._prefix
637 if base_path
and subpath:
638 path = f
'{base_path}.{subpath}'
640 assert base_path
or subpath,
'No path provided'
641 path = base_path
or subpath
or ''
642 labels: dict |
None =
None
643 if self._labels
is not None or add_labels
is not None:
644 labels = {**(self._labels
or {}), **(add_labels
or {})}
645 return self.
diff.
value_at(path, labels, default=default)
647 async def fetch(self) -> pytest_userver.metrics.MetricsSnapshot:
649 Fetches metric values from the service.
651 return await self._client.metrics(
657 async def __aenter__(self) -> MetricsDiffer:
662 async def __aexit__(self, exc_type, exc, exc_tb) -> None:
669def _subtract_metrics_snapshots(
675 path: {_subtract_metrics(path, current_metric, initial, diff_gauge)
for current_metric
in current_group}
676 for path, current_group
in current.items()
680def _subtract_metrics(
686 initial_group = initial.get(path,
None)
687 if initial_group
is None:
688 return current_metric
689 initial_metric = next(
690 (x
for x
in initial_group
if x.labels == current_metric.labels),
693 if initial_metric
is None:
694 return current_metric
697 labels=current_metric.labels,
698 value=_subtract_metric_values(
699 current=current_metric,
700 initial=initial_metric,
701 diff_gauge=diff_gauge,
703 _type=current_metric.type(),
707def _subtract_metric_values(
711) -> pytest_userver.metrics.MetricValue:
712 assert current.type()
is not pytest_userver.metrics.MetricType.UNSPECIFIED
713 assert initial.type()
is not pytest_userver.metrics.MetricType.UNSPECIFIED
714 assert current.type() == initial.type()
718 return _subtract_metric_values_hist(current=current, initial=initial)
721 return _subtract_metric_values_num(
724 diff_gauge=diff_gauge,
728def _subtract_metric_values_num(
733 current_value = typing.cast(float, current.value)
734 initial_value = typing.cast(float, initial.value)
736 current.type()
is pytest_userver.metrics.MetricType.RATE
737 or initial.type()
is pytest_userver.metrics.MetricType.RATE
740 return current_value - initial_value
if should_diff
else current_value
743def _subtract_metric_values_hist(
749 assert current_value.bounds == initial_value.bounds
751 bounds=current_value.bounds,
752 buckets=[t[0] - t[1]
for t
in zip(current_value.buckets, initial_value.buckets, strict=
True)],
753 inf=current_value.inf - initial_value.inf,
757class AiohttpClient(service_client.AiohttpClient):
758 PeriodicTaskFailed = PeriodicTaskFailed
759 TestsuiteActionFailed = TestsuiteActionFailed
760 TestsuiteTaskNotFound = TestsuiteTaskNotFound
761 TestsuiteTaskConflict = TestsuiteTaskConflict
762 TestsuiteTaskFailed = TestsuiteTaskFailed
768 config: TestsuiteClientConfig,
770 log_capture_fixture: logcapture.CaptureServer,
773 cache_invalidation_state,
775 api_coverage_report=
None,
776 periodic_tasks_state: PeriodicTasksState |
None =
None,
777 allow_all_caches_invalidation: bool =
True,
782 super().__init__(base_url, span_id_header=span_id_header, **kwargs)
783 self._config = config
784 self._periodic_tasks = periodic_tasks_state
785 self._testpoint = testpoint
786 self._log_capture_fixture = log_capture_fixture
788 mocked_time=mocked_time,
789 testpoint=self._testpoint,
790 testpoint_control=testpoint_control,
791 invalidation_state=cache_invalidation_state,
792 cache_control=cache_control,
794 self._api_coverage_report = api_coverage_report
795 self._allow_all_caches_invalidation = allow_all_caches_invalidation
796 self._asyncexc_check = asyncexc_check
798 async def run_periodic_task(self, name):
799 response = await self._testsuite_action(
'run_periodic_task', name=name)
800 if not response[
'status']:
801 raise self.PeriodicTaskFailed(f
'Periodic task {name} failed')
803 async def suspend_periodic_tasks(self, names: list[str]) ->
None:
804 if not self._periodic_tasks:
806 self._periodic_tasks.tasks_to_suspend.update(names)
807 await self._suspend_periodic_tasks()
809 async def resume_periodic_tasks(self, names: list[str]) ->
None:
810 if not self._periodic_tasks:
812 self._periodic_tasks.tasks_to_suspend.difference_update(names)
813 await self._suspend_periodic_tasks()
815 async def resume_all_periodic_tasks(self) -> None:
816 if not self._periodic_tasks:
818 self._periodic_tasks.tasks_to_suspend.clear()
819 await self._suspend_periodic_tasks()
821 async def write_cache_dumps(
825 testsuite_skip_prepare=
False,
827 await self._testsuite_action(
830 testsuite_skip_prepare=testsuite_skip_prepare,
833 async def read_cache_dumps(
837 testsuite_skip_prepare=
False,
839 await self._testsuite_action(
842 testsuite_skip_prepare=testsuite_skip_prepare,
845 async def run_distlock_task(self, name: str) ->
None:
846 await self.run_task(f
'distlock/{name}')
848 async def reset_metrics(self) -> None:
849 await self._testsuite_action(
'reset_metrics')
851 async def metrics_portability(
854 prefix: str |
None =
None,
855 ) -> dict[str, list[dict[str, str]]]:
856 return await self._testsuite_action(
857 'metrics_portability',
861 async def list_tasks(self) -> list[str]:
862 response = await self._do_testsuite_action(
'tasks_list')
864 response.raise_for_status()
865 body = await response.json(content_type=
None)
868 async def run_task(self, name: str) ->
None:
869 response = await self._do_testsuite_action(
873 await _task_check_response(name, response)
875 @contextlib.asynccontextmanager
876 async def spawn_task(self, name: str):
877 task_id = await self._task_spawn(name)
881 await self._task_stop_spawned(task_id)
883 async def _task_spawn(self, name: str) -> str:
884 response = await self._do_testsuite_action(
888 data = await _task_check_response(name, response)
889 return data[
'task_id']
891 async def _task_stop_spawned(self, task_id: str) ->
None:
892 response = await self._do_testsuite_action(
894 json={
'task_id': task_id},
896 await _task_check_response(task_id, response)
898 async def http_allowed_urls_extra(
900 http_allowed_urls_extra: list[str],
902 await self._do_testsuite_action(
903 'http_allowed_urls_extra',
904 json={
'allowed_urls_extra': http_allowed_urls_extra},
905 testsuite_skip_prepare=
True,
908 @contextlib.asynccontextmanager
909 async def capture_logs(
912 log_level: str =
'DEBUG',
913 testsuite_skip_prepare: bool =
False,
915 async with self._log_capture_fixture.capture(
916 log_level=logcapture.LogLevel.from_string(log_level),
918 logger.debug(
'Starting logcapture')
919 await self._testsuite_action(
922 socket_logging_duplication=
True,
923 testsuite_skip_prepare=testsuite_skip_prepare,
927 await self._log_capture_fixture.wait_for_client()
930 await self._testsuite_action(
932 log_level=self._log_capture_fixture.default_log_level.name,
933 socket_logging_duplication=
False,
934 testsuite_skip_prepare=testsuite_skip_prepare,
937 async def log_flush(self, logger_name: str |
None =
None):
938 await self._testsuite_action(
940 logger_name=logger_name,
941 testsuite_skip_prepare=
True,
944 async def invalidate_caches(
947 clean_update: bool =
True,
948 cache_names: list[str] |
None =
None,
949 testsuite_skip_prepare: bool =
False,
951 if cache_names
is None and clean_update:
952 if self._allow_all_caches_invalidation:
953 warnings.warn(CACHE_INVALIDATION_MESSAGE, DeprecationWarning, stacklevel=2)
955 __tracebackhide__ =
True
956 raise RuntimeError(CACHE_INVALIDATION_MESSAGE)
958 if testsuite_skip_prepare:
959 await self._tests_control({
960 'invalidate_caches': {
961 'update_type': (
'full' if clean_update
else 'incremental'),
962 **({
'names': cache_names}
if cache_names
else {}),
966 await self.tests_control(
967 invalidate_caches=
True,
968 clean_update=clean_update,
969 cache_names=cache_names,
972 async def tests_control(
975 invalidate_caches: bool =
True,
976 clean_update: bool =
True,
977 cache_names: list[str] |
None =
None,
978 http_allowed_urls_extra: list[str] |
None =
None,
980 body: dict[str, Any] = self._state_manager.get_pending_update()
982 if 'invalidate_caches' in body
and invalidate_caches:
983 if not clean_update
or cache_names:
985 'Manual cache invalidation leads to indirect initial full cache invalidation',
987 await self._prepare()
990 if invalidate_caches:
991 body[
'invalidate_caches'] = {
992 'update_type': (
'full' if clean_update
else 'incremental'),
995 body[
'invalidate_caches'][
'names'] = cache_names
997 if http_allowed_urls_extra
is not None:
998 await self.http_allowed_urls_extra(http_allowed_urls_extra)
1000 return await self._tests_control(body)
1002 async def update_server_state(self) -> None:
1003 await self._prepare()
1005 async def enable_testpoints(self, *, no_auto_cache_cleanup=False) -> None:
1006 if not self._testpoint:
1008 if no_auto_cache_cleanup:
1009 await self._tests_control({
1010 'testpoints': sorted(self._testpoint.keys()),
1013 await self.update_server_state()
1015 async def get_dynamic_config_defaults(
1017 ) -> dict[str, Any]:
1018 return await self._testsuite_action(
1019 'get_dynamic_config_defaults',
1020 testsuite_skip_prepare=
True,
1023 async def _tests_control(self, body: dict) -> dict[str, Any]:
1024 with self._state_manager.updating_state(body):
1025 async with await self._do_testsuite_action(
1028 testsuite_skip_prepare=
True,
1030 if response.status == 404:
1032 'It seems that testsuite support is not enabled for your service',
1034 response.raise_for_status()
1035 return await response.json(content_type=
None)
1037 async def _suspend_periodic_tasks(self):
1038 if self._periodic_tasks.tasks_to_suspend != self._periodic_tasks.suspended_tasks:
1039 await self._testsuite_action(
1040 'suspend_periodic_tasks',
1041 names=sorted(self._periodic_tasks.tasks_to_suspend),
1043 self._periodic_tasks.suspended_tasks = set(
1044 self._periodic_tasks.tasks_to_suspend,
1047 def _do_testsuite_action(self, action, **kwargs):
1048 if not self._config.testsuite_action_path:
1050 'tests-control component is not properly configured',
1052 path = self._config.testsuite_action_path.format(action=action)
1053 return self.post(path, **kwargs)
1055 async def _testsuite_action(
1059 testsuite_skip_prepare=False,
1062 async with await self._do_testsuite_action(
1065 testsuite_skip_prepare=testsuite_skip_prepare,
1067 if response.status == 500:
1068 raise TestsuiteActionFailed
1069 response.raise_for_status()
1070 return await response.json(content_type=
None)
1072 async def _prepare(self) -> None:
1073 with self._state_manager.cache_control_update()
as pending_update:
1075 await self._tests_control(pending_update)
1081 headers: dict[str, str] |
None =
None,
1082 bearer: str |
None =
None,
1083 x_real_ip: str |
None =
None,
1085 testsuite_skip_prepare: bool =
False,
1087 ) -> aiohttp.ClientResponse:
1088 if self._asyncexc_check:
1090 self._asyncexc_check()
1092 if not testsuite_skip_prepare:
1093 await self._prepare()
1095 response = await super()._request(
1103 if self._api_coverage_report:
1104 self._api_coverage_report.update_usage_stat(
1108 response.content_type,
1111 if self._asyncexc_check:
1113 self._asyncexc_check()