552 A helper class for computing metric differences.
554 @see ClientMonitor.metrics_diff
555 @ingroup userver_testsuite
561 _client: ClientMonitor,
564 _labels: dict[str, str] |
None,
567 self._client = _client
569 self._prefix = _prefix
570 self._labels = _labels
571 self._diff_gauge = _diff_gauge
572 self.
_baseline: metric_module.MetricsSnapshot |
None =
None
573 self.
_current: metric_module.MetricsSnapshot |
None =
None
574 self.
_diff: metric_module.MetricsSnapshot |
None =
None
579 def baseline(self) -> metric_module.MetricsSnapshot:
584 def baseline(self, value: metric_module.MetricsSnapshot) ->
None:
587 self.
_diff = _subtract_metrics_snapshots(
594 def current(self) -> metric_module.MetricsSnapshot:
595 assert self.
_current is not None,
'Set self.current first'
599 def current(self, value: metric_module.MetricsSnapshot) ->
None:
601 assert self.
_baseline is not None,
'Set self.baseline first'
602 self.
_diff = _subtract_metrics_snapshots(
609 def diff(self) -> metric_module.MetricsSnapshot:
610 assert self.
_diff is not None,
'Set self.current first'
615 subpath: str |
None =
None,
616 add_labels: dict[str, str] |
None =
None,
618 default: float |
None =
None,
619 ) -> metric_module.MetricValue:
621 Returns a single metric value at the specified path, prepending
622 the path provided at construction. If a dict of labels is provided,
623 does en exact match of labels, prepending the labels provided
626 @param subpath Suffix of the metric path; the path provided
627 at construction is prepended
628 @param add_labels Labels that the metric must have in addition
629 to the labels provided at construction
630 @param default An optional default value in case the metric is missing
631 @throws AssertionError if not one metric by path
633 base_path = self._path
or self._prefix
634 if base_path
and subpath:
635 path = f
'{base_path}.{subpath}'
637 assert base_path
or subpath,
'No path provided'
638 path = base_path
or subpath
or ''
639 labels: dict |
None =
None
640 if self._labels
is not None or add_labels
is not None:
641 labels = {**(self._labels
or {}), **(add_labels
or {})}
642 return self.
diff.
value_at(path, labels, default=default)
644 async def fetch(self) -> metric_module.MetricsSnapshot:
646 Fetches metric values from the service.
648 return await self._client.metrics(
654 async def __aenter__(self) -> MetricsDiffer:
659 async def __aexit__(self, exc_type, exc, exc_tb) -> None:
666def _subtract_metrics_snapshots(
667 current: metric_module.MetricsSnapshot,
668 initial: metric_module.MetricsSnapshot,
670) -> metric_module.MetricsSnapshot:
671 return metric_module.MetricsSnapshot({
672 path: {_subtract_metrics(path, current_metric, initial, diff_gauge)
for current_metric
in current_group}
673 for path, current_group
in current.items()
677def _subtract_metrics(
679 current_metric: metric_module.Metric,
680 initial: metric_module.MetricsSnapshot,
682) -> metric_module.Metric:
683 initial_group = initial.get(path,
None)
684 if initial_group
is None:
685 return current_metric
686 initial_metric = next(
687 (x
for x
in initial_group
if x.labels == current_metric.labels),
690 if initial_metric
is None:
691 return current_metric
693 return metric_module.Metric(
694 labels=current_metric.labels,
695 value=_subtract_metric_values(
696 current=current_metric,
697 initial=initial_metric,
698 diff_gauge=diff_gauge,
700 _type=current_metric.type(),
704def _subtract_metric_values(
705 current: metric_module.Metric,
706 initial: metric_module.Metric,
708) -> metric_module.MetricValue:
709 assert current.type()
is not metric_module.MetricType.UNSPECIFIED
710 assert initial.type()
is not metric_module.MetricType.UNSPECIFIED
711 assert current.type() == initial.type()
713 if isinstance(current.value, metric_module.Histogram):
714 assert isinstance(initial.value, metric_module.Histogram)
715 return _subtract_metric_values_hist(current=current, initial=initial)
717 assert not isinstance(initial.value, metric_module.Histogram)
718 return _subtract_metric_values_num(
721 diff_gauge=diff_gauge,
725def _subtract_metric_values_num(
726 current: metric_module.Metric,
727 initial: metric_module.Metric,
730 current_value = typing.cast(float, current.value)
731 initial_value = typing.cast(float, initial.value)
733 current.type()
is metric_module.MetricType.RATE
or initial.type()
is metric_module.MetricType.RATE
or diff_gauge
735 return current_value - initial_value
if should_diff
else current_value
738def _subtract_metric_values_hist(
739 current: metric_module.Metric,
740 initial: metric_module.Metric,
741) -> metric_module.Histogram:
742 current_value = typing.cast(metric_module.Histogram, current.value)
743 initial_value = typing.cast(metric_module.Histogram, initial.value)
744 assert current_value.bounds == initial_value.bounds
745 return metric_module.Histogram(
746 bounds=current_value.bounds,
747 buckets=[t[0] - t[1]
for t
in zip(current_value.buckets, initial_value.buckets, strict=
True)],
748 inf=current_value.inf - initial_value.inf,
752class AiohttpClient(service_client.AiohttpClient):
753 PeriodicTaskFailed = PeriodicTaskFailed
754 TestsuiteActionFailed = TestsuiteActionFailed
755 TestsuiteTaskNotFound = TestsuiteTaskNotFound
756 TestsuiteTaskConflict = TestsuiteTaskConflict
757 TestsuiteTaskFailed = TestsuiteTaskFailed
763 config: TestsuiteClientConfig,
765 log_capture_fixture: logcapture.CaptureServer,
768 cache_invalidation_state,
770 api_coverage_report=
None,
771 periodic_tasks_state: PeriodicTasksState |
None =
None,
772 allow_all_caches_invalidation: bool =
True,
777 super().__init__(base_url, span_id_header=span_id_header, **kwargs)
778 self._config = config
779 self._periodic_tasks = periodic_tasks_state
780 self._testpoint = testpoint
781 self._log_capture_fixture = log_capture_fixture
783 mocked_time=mocked_time,
784 testpoint=self._testpoint,
785 testpoint_control=testpoint_control,
786 invalidation_state=cache_invalidation_state,
787 cache_control=cache_control,
789 self._api_coverage_report = api_coverage_report
790 self._allow_all_caches_invalidation = allow_all_caches_invalidation
791 self._asyncexc_check = asyncexc_check
793 async def run_periodic_task(self, name):
794 response = await self._testsuite_action(
'run_periodic_task', name=name)
795 if not response[
'status']:
796 raise self.PeriodicTaskFailed(f
'Periodic task {name} failed')
798 async def suspend_periodic_tasks(self, names: list[str]) ->
None:
799 if not self._periodic_tasks:
801 self._periodic_tasks.tasks_to_suspend.update(names)
802 await self._suspend_periodic_tasks()
804 async def resume_periodic_tasks(self, names: list[str]) ->
None:
805 if not self._periodic_tasks:
807 self._periodic_tasks.tasks_to_suspend.difference_update(names)
808 await self._suspend_periodic_tasks()
810 async def resume_all_periodic_tasks(self) -> None:
811 if not self._periodic_tasks:
813 self._periodic_tasks.tasks_to_suspend.clear()
814 await self._suspend_periodic_tasks()
816 async def write_cache_dumps(
820 testsuite_skip_prepare=
False,
822 await self._testsuite_action(
825 testsuite_skip_prepare=testsuite_skip_prepare,
828 async def read_cache_dumps(
832 testsuite_skip_prepare=
False,
834 await self._testsuite_action(
837 testsuite_skip_prepare=testsuite_skip_prepare,
840 async def run_distlock_task(self, name: str) ->
None:
841 await self.run_task(f
'distlock/{name}')
843 async def reset_metrics(self) -> None:
844 await self._testsuite_action(
'reset_metrics')
846 async def metrics_portability(
849 prefix: str |
None =
None,
850 ) -> dict[str, list[dict[str, str]]]:
851 return await self._testsuite_action(
852 'metrics_portability',
856 async def list_tasks(self) -> list[str]:
857 response = await self._do_testsuite_action(
'tasks_list')
859 response.raise_for_status()
860 body = await response.json(content_type=
None)
863 async def run_task(self, name: str) ->
None:
864 response = await self._do_testsuite_action(
868 await _task_check_response(name, response)
870 @contextlib.asynccontextmanager
871 async def spawn_task(self, name: str):
872 task_id = await self._task_spawn(name)
876 await self._task_stop_spawned(task_id)
878 async def _task_spawn(self, name: str) -> str:
879 response = await self._do_testsuite_action(
883 data = await _task_check_response(name, response)
884 return data[
'task_id']
886 async def _task_stop_spawned(self, task_id: str) ->
None:
887 response = await self._do_testsuite_action(
889 json={
'task_id': task_id},
891 await _task_check_response(task_id, response)
893 async def http_allowed_urls_extra(
895 http_allowed_urls_extra: list[str],
897 await self._do_testsuite_action(
898 'http_allowed_urls_extra',
899 json={
'allowed_urls_extra': http_allowed_urls_extra},
900 testsuite_skip_prepare=
True,
903 @contextlib.asynccontextmanager
904 async def capture_logs(
907 log_level: str =
'DEBUG',
908 testsuite_skip_prepare: bool =
False,
910 async with self._log_capture_fixture.capture(
911 log_level=logcapture.LogLevel.from_string(log_level),
913 logger.debug(
'Starting logcapture')
914 await self._testsuite_action(
917 socket_logging_duplication=
True,
918 testsuite_skip_prepare=testsuite_skip_prepare,
922 await self._log_capture_fixture.wait_for_client()
925 await self._testsuite_action(
927 log_level=self._log_capture_fixture.default_log_level.name,
928 socket_logging_duplication=
False,
929 testsuite_skip_prepare=testsuite_skip_prepare,
932 async def log_flush(self, logger_name: str |
None =
None):
933 await self._testsuite_action(
935 logger_name=logger_name,
936 testsuite_skip_prepare=
True,
939 async def invalidate_caches(
942 clean_update: bool =
True,
943 cache_names: list[str] |
None =
None,
944 testsuite_skip_prepare: bool =
False,
946 if cache_names
is None and clean_update:
947 if self._allow_all_caches_invalidation:
948 warnings.warn(CACHE_INVALIDATION_MESSAGE, DeprecationWarning, stacklevel=2)
950 __tracebackhide__ =
True
951 raise RuntimeError(CACHE_INVALIDATION_MESSAGE)
953 if testsuite_skip_prepare:
954 await self._tests_control({
955 'invalidate_caches': {
956 'update_type': (
'full' if clean_update
else 'incremental'),
957 **({
'names': cache_names}
if cache_names
else {}),
961 await self.tests_control(
962 invalidate_caches=
True,
963 clean_update=clean_update,
964 cache_names=cache_names,
967 async def tests_control(
970 invalidate_caches: bool =
True,
971 clean_update: bool =
True,
972 cache_names: list[str] |
None =
None,
973 http_allowed_urls_extra: list[str] |
None =
None,
975 body: dict[str, Any] = self._state_manager.get_pending_update()
977 if 'invalidate_caches' in body
and invalidate_caches:
978 if not clean_update
or cache_names:
980 'Manual cache invalidation leads to indirect initial full cache invalidation',
982 await self._prepare()
985 if invalidate_caches:
986 body[
'invalidate_caches'] = {
987 'update_type': (
'full' if clean_update
else 'incremental'),
990 body[
'invalidate_caches'][
'names'] = cache_names
992 if http_allowed_urls_extra
is not None:
993 await self.http_allowed_urls_extra(http_allowed_urls_extra)
995 return await self._tests_control(body)
997 async def update_server_state(self) -> None:
998 await self._prepare()
1000 async def enable_testpoints(self, *, no_auto_cache_cleanup=False) -> None:
1001 if not self._testpoint:
1003 if no_auto_cache_cleanup:
1004 await self._tests_control({
1005 'testpoints': sorted(self._testpoint.keys()),
1008 await self.update_server_state()
1010 async def get_dynamic_config_defaults(
1012 ) -> dict[str, Any]:
1013 return await self._testsuite_action(
1014 'get_dynamic_config_defaults',
1015 testsuite_skip_prepare=
True,
1018 async def _tests_control(self, body: dict) -> dict[str, Any]:
1019 with self._state_manager.updating_state(body):
1020 async with await self._do_testsuite_action(
1023 testsuite_skip_prepare=
True,
1025 if response.status == 404:
1027 'It seems that testsuite support is not enabled for your service',
1029 response.raise_for_status()
1030 return await response.json(content_type=
None)
1032 async def _suspend_periodic_tasks(self):
1033 if self._periodic_tasks.tasks_to_suspend != self._periodic_tasks.suspended_tasks:
1034 await self._testsuite_action(
1035 'suspend_periodic_tasks',
1036 names=sorted(self._periodic_tasks.tasks_to_suspend),
1038 self._periodic_tasks.suspended_tasks = set(
1039 self._periodic_tasks.tasks_to_suspend,
1042 def _do_testsuite_action(self, action, **kwargs):
1043 if not self._config.testsuite_action_path:
1045 'tests-control component is not properly configured',
1047 path = self._config.testsuite_action_path.format(action=action)
1048 return self.post(path, **kwargs)
1050 async def _testsuite_action(
1054 testsuite_skip_prepare=False,
1057 async with await self._do_testsuite_action(
1060 testsuite_skip_prepare=testsuite_skip_prepare,
1062 if response.status == 500:
1063 raise TestsuiteActionFailed
1064 response.raise_for_status()
1065 return await response.json(content_type=
None)
1067 async def _prepare(self) -> None:
1068 with self._state_manager.cache_control_update()
as pending_update:
1070 await self._tests_control(pending_update)
1076 headers: dict[str, str] |
None =
None,
1077 bearer: str |
None =
None,
1078 x_real_ip: str |
None =
None,
1080 testsuite_skip_prepare: bool =
False,
1082 ) -> aiohttp.ClientResponse:
1083 if self._asyncexc_check:
1085 self._asyncexc_check()
1087 if not testsuite_skip_prepare:
1088 await self._prepare()
1090 response = await super()._request(
1098 if self._api_coverage_report:
1099 self._api_coverage_report.update_usage_stat(
1103 response.content_type,
1106 if self._asyncexc_check:
1108 self._asyncexc_check()