551 A helper class for computing metric differences.
553 @see ClientMonitor.metrics_diff
554 @ingroup userver_testsuite
560 _client: ClientMonitor,
561 _path: typing.Optional[str],
562 _prefix: typing.Optional[str],
563 _labels: typing.Optional[typing.Dict[str, str]],
566 self._client = _client
568 self._prefix = _prefix
569 self._labels = _labels
570 self._diff_gauge = _diff_gauge
571 self.
_baseline: typing.Optional[metric_module.MetricsSnapshot] =
None
572 self.
_current: typing.Optional[metric_module.MetricsSnapshot] =
None
573 self.
_diff: typing.Optional[metric_module.MetricsSnapshot] =
None
578 def baseline(self) -> metric_module.MetricsSnapshot:
583 def baseline(self, value: metric_module.MetricsSnapshot) ->
None:
586 self.
_diff = _subtract_metrics_snapshots(
593 def current(self) -> metric_module.MetricsSnapshot:
594 assert self.
_current is not None,
'Set self.current first'
598 def current(self, value: metric_module.MetricsSnapshot) ->
None:
600 assert self.
_baseline is not None,
'Set self.baseline first'
601 self.
_diff = _subtract_metrics_snapshots(
608 def diff(self) -> metric_module.MetricsSnapshot:
609 assert self.
_diff is not None,
'Set self.current first'
614 subpath: typing.Optional[str] =
None,
615 add_labels: typing.Optional[typing.Dict] =
None,
617 default: typing.Optional[float] =
None,
618 ) -> metric_module.MetricValue:
620 Returns a single metric value at the specified path, prepending
621 the path provided at construction. If a dict of labels is provided,
622 does en exact match of labels, prepending the labels provided
625 @param subpath Suffix of the metric path; the path provided
626 at construction is prepended
627 @param add_labels Labels that the metric must have in addition
628 to the labels provided at construction
629 @param default An optional default value in case the metric is missing
630 @throws AssertionError if not one metric by path
632 base_path = self._path
or self._prefix
633 if base_path
and subpath:
634 path = f
'{base_path}.{subpath}'
636 assert base_path
or subpath,
'No path provided'
637 path = base_path
or subpath
or ''
638 labels: typing.Optional[dict] =
None
639 if self._labels
is not None or add_labels
is not None:
640 labels = {**(self._labels
or {}), **(add_labels
or {})}
641 return self.
diff.
value_at(path, labels, default=default)
643 async def fetch(self) -> metric_module.MetricsSnapshot:
645 Fetches metric values from the service.
647 return await self._client.metrics(
643 async def fetch(self) -> metric_module.MetricsSnapshot:
…
653 async def __aenter__(self) -> 'MetricsDiffer':
658 async def __aexit__(self, exc_type, exc, exc_tb) -> None:
665def _subtract_metrics_snapshots(
666 current: metric_module.MetricsSnapshot,
667 initial: metric_module.MetricsSnapshot,
669) -> metric_module.MetricsSnapshot:
670 return metric_module.MetricsSnapshot({
671 path: {_subtract_metrics(path, current_metric, initial, diff_gauge)
for current_metric
in current_group}
672 for path, current_group
in current.items()
676def _subtract_metrics(
678 current_metric: metric_module.Metric,
679 initial: metric_module.MetricsSnapshot,
681) -> metric_module.Metric:
682 initial_group = initial.get(path,
None)
683 if initial_group
is None:
684 return current_metric
685 initial_metric = next(
686 (x
for x
in initial_group
if x.labels == current_metric.labels),
689 if initial_metric
is None:
690 return current_metric
692 return metric_module.Metric(
693 labels=current_metric.labels,
694 value=_subtract_metric_values(
695 current=current_metric,
696 initial=initial_metric,
697 diff_gauge=diff_gauge,
699 _type=current_metric.type(),
703def _subtract_metric_values(
704 current: metric_module.Metric,
705 initial: metric_module.Metric,
707) -> metric_module.MetricValue:
708 assert current.type()
is not metric_module.MetricType.UNSPECIFIED
709 assert initial.type()
is not metric_module.MetricType.UNSPECIFIED
710 assert current.type() == initial.type()
712 if isinstance(current.value, metric_module.Histogram):
713 assert isinstance(initial.value, metric_module.Histogram)
714 return _subtract_metric_values_hist(current=current, initial=initial)
716 assert not isinstance(initial.value, metric_module.Histogram)
717 return _subtract_metric_values_num(
720 diff_gauge=diff_gauge,
724def _subtract_metric_values_num(
725 current: metric_module.Metric,
726 initial: metric_module.Metric,
729 current_value = typing.cast(float, current.value)
730 initial_value = typing.cast(float, initial.value)
732 current.type()
is metric_module.MetricType.RATE
or initial.type()
is metric_module.MetricType.RATE
or diff_gauge
734 return current_value - initial_value
if should_diff
else current_value
737def _subtract_metric_values_hist(
738 current: metric_module.Metric,
739 initial: metric_module.Metric,
740) -> metric_module.Histogram:
741 current_value = typing.cast(metric_module.Histogram, current.value)
742 initial_value = typing.cast(metric_module.Histogram, initial.value)
743 assert current_value.bounds == initial_value.bounds
744 return metric_module.Histogram(
745 bounds=current_value.bounds,
746 buckets=[t[0] - t[1]
for t
in zip(current_value.buckets, initial_value.buckets)],
747 inf=current_value.inf - initial_value.inf,
751class AiohttpClient(service_client.AiohttpClient):
752 PeriodicTaskFailed = PeriodicTaskFailed
753 TestsuiteActionFailed = TestsuiteActionFailed
754 TestsuiteTaskNotFound = TestsuiteTaskNotFound
755 TestsuiteTaskConflict = TestsuiteTaskConflict
756 TestsuiteTaskFailed = TestsuiteTaskFailed
762 config: TestsuiteClientConfig,
764 log_capture_fixture: logcapture.CaptureServer,
767 cache_invalidation_state,
769 api_coverage_report=
None,
770 periodic_tasks_state: typing.Optional[PeriodicTasksState] =
None,
771 allow_all_caches_invalidation: bool =
True,
776 super().__init__(base_url, span_id_header=span_id_header, **kwargs)
777 self._config = config
778 self._periodic_tasks = periodic_tasks_state
779 self._testpoint = testpoint
780 self._log_capture_fixture = log_capture_fixture
782 mocked_time=mocked_time,
783 testpoint=self._testpoint,
784 testpoint_control=testpoint_control,
785 invalidation_state=cache_invalidation_state,
786 cache_control=cache_control,
788 self._api_coverage_report = api_coverage_report
789 self._allow_all_caches_invalidation = allow_all_caches_invalidation
790 self._asyncexc_check = asyncexc_check
792 async def run_periodic_task(self, name):
793 response = await self._testsuite_action(
'run_periodic_task', name=name)
794 if not response[
'status']:
795 raise self.PeriodicTaskFailed(f
'Periodic task {name} failed')
797 async def suspend_periodic_tasks(self, names: typing.List[str]) ->
None:
798 if not self._periodic_tasks:
800 self._periodic_tasks.tasks_to_suspend.update(names)
801 await self._suspend_periodic_tasks()
803 async def resume_periodic_tasks(self, names: typing.List[str]) ->
None:
804 if not self._periodic_tasks:
806 self._periodic_tasks.tasks_to_suspend.difference_update(names)
807 await self._suspend_periodic_tasks()
809 async def resume_all_periodic_tasks(self) -> None:
810 if not self._periodic_tasks:
812 self._periodic_tasks.tasks_to_suspend.clear()
813 await self._suspend_periodic_tasks()
815 async def write_cache_dumps(
817 names: typing.List[str],
819 testsuite_skip_prepare=
False,
821 await self._testsuite_action(
824 testsuite_skip_prepare=testsuite_skip_prepare,
827 async def read_cache_dumps(
829 names: typing.List[str],
831 testsuite_skip_prepare=
False,
833 await self._testsuite_action(
836 testsuite_skip_prepare=testsuite_skip_prepare,
839 async def run_distlock_task(self, name: str) ->
None:
840 await self.run_task(f
'distlock/{name}')
842 async def reset_metrics(self) -> None:
843 await self._testsuite_action(
'reset_metrics')
845 async def metrics_portability(
848 prefix: typing.Optional[str] =
None,
849 ) -> typing.Dict[str, typing.List[typing.Dict[str, str]]]:
850 return await self._testsuite_action(
851 'metrics_portability',
855 async def list_tasks(self) -> typing.List[str]:
856 response = await self._do_testsuite_action(
'tasks_list')
858 response.raise_for_status()
859 body = await response.json(content_type=
None)
862 async def run_task(self, name: str) ->
None:
863 response = await self._do_testsuite_action(
867 await _task_check_response(name, response)
869 @contextlib.asynccontextmanager
870 async def spawn_task(self, name: str):
871 task_id = await self._task_spawn(name)
875 await self._task_stop_spawned(task_id)
877 async def _task_spawn(self, name: str) -> str:
878 response = await self._do_testsuite_action(
882 data = await _task_check_response(name, response)
883 return data[
'task_id']
885 async def _task_stop_spawned(self, task_id: str) ->
None:
886 response = await self._do_testsuite_action(
888 json={
'task_id': task_id},
890 await _task_check_response(task_id, response)
892 async def http_allowed_urls_extra(
894 http_allowed_urls_extra: typing.List[str],
896 await self._do_testsuite_action(
897 'http_allowed_urls_extra',
898 json={
'allowed_urls_extra': http_allowed_urls_extra},
899 testsuite_skip_prepare=
True,
902 @contextlib.asynccontextmanager
903 async def capture_logs(
906 log_level: str =
'DEBUG',
907 testsuite_skip_prepare: bool =
False,
909 async with self._log_capture_fixture.capture(
910 log_level=logcapture.LogLevel.from_string(log_level),
912 logger.debug(
'Starting logcapture')
913 await self._testsuite_action(
916 socket_logging_duplication=
True,
917 testsuite_skip_prepare=testsuite_skip_prepare,
921 await self._log_capture_fixture.wait_for_client()
924 await self._testsuite_action(
926 log_level=self._log_capture_fixture.default_log_level.name,
927 socket_logging_duplication=
False,
928 testsuite_skip_prepare=testsuite_skip_prepare,
931 async def log_flush(self, logger_name: typing.Optional[str] =
None):
932 await self._testsuite_action(
934 logger_name=logger_name,
935 testsuite_skip_prepare=
True,
938 async def invalidate_caches(
941 clean_update: bool =
True,
942 cache_names: typing.Optional[typing.List[str]] =
None,
943 testsuite_skip_prepare: bool =
False,
945 if cache_names
is None and clean_update:
946 if self._allow_all_caches_invalidation:
947 warnings.warn(CACHE_INVALIDATION_MESSAGE, DeprecationWarning)
949 __tracebackhide__ =
True
950 raise RuntimeError(CACHE_INVALIDATION_MESSAGE)
952 if testsuite_skip_prepare:
953 await self._tests_control({
954 'invalidate_caches': {
955 'update_type': (
'full' if clean_update
else 'incremental'),
956 **({
'names': cache_names}
if cache_names
else {}),
960 await self.tests_control(
961 invalidate_caches=
True,
962 clean_update=clean_update,
963 cache_names=cache_names,
966 async def tests_control(
969 invalidate_caches: bool =
True,
970 clean_update: bool =
True,
971 cache_names: typing.Optional[typing.List[str]] =
None,
972 http_allowed_urls_extra: typing.Optional[typing.List[str]] =
None,
973 ) -> typing.Dict[str, typing.Any]:
974 body: typing.Dict[str, typing.Any] = self._state_manager.get_pending_update()
976 if 'invalidate_caches' in body
and invalidate_caches:
977 if not clean_update
or cache_names:
979 'Manual cache invalidation leads to indirect initial full cache invalidation',
981 await self._prepare()
984 if invalidate_caches:
985 body[
'invalidate_caches'] = {
986 'update_type': (
'full' if clean_update
else 'incremental'),
989 body[
'invalidate_caches'][
'names'] = cache_names
991 if http_allowed_urls_extra
is not None:
992 await self.http_allowed_urls_extra(http_allowed_urls_extra)
994 return await self._tests_control(body)
996 async def update_server_state(self) -> None:
997 await self._prepare()
999 async def enable_testpoints(self, *, no_auto_cache_cleanup=False) -> None:
1000 if not self._testpoint:
1002 if no_auto_cache_cleanup:
1003 await self._tests_control({
1004 'testpoints': sorted(self._testpoint.keys()),
1007 await self.update_server_state()
1009 async def get_dynamic_config_defaults(
1011 ) -> typing.Dict[str, typing.Any]:
1012 return await self._testsuite_action(
1013 'get_dynamic_config_defaults',
1014 testsuite_skip_prepare=
True,
1017 async def _tests_control(self, body: dict) -> typing.Dict[str, typing.Any]:
1018 with self._state_manager.updating_state(body):
1019 async with await self._do_testsuite_action(
1022 testsuite_skip_prepare=
True,
1024 if response.status == 404:
1026 'It seems that testsuite support is not enabled for your service',
1028 response.raise_for_status()
1029 return await response.json(content_type=
None)
1031 async def _suspend_periodic_tasks(self):
1032 if self._periodic_tasks.tasks_to_suspend != self._periodic_tasks.suspended_tasks:
1033 await self._testsuite_action(
1034 'suspend_periodic_tasks',
1035 names=sorted(self._periodic_tasks.tasks_to_suspend),
1037 self._periodic_tasks.suspended_tasks = set(
1038 self._periodic_tasks.tasks_to_suspend,
1041 def _do_testsuite_action(self, action, **kwargs):
1042 if not self._config.testsuite_action_path:
1044 'tests-control component is not properly configured',
1046 path = self._config.testsuite_action_path.format(action=action)
1047 return self.post(path, **kwargs)
1049 async def _testsuite_action(
1053 testsuite_skip_prepare=False,
1056 async with await self._do_testsuite_action(
1059 testsuite_skip_prepare=testsuite_skip_prepare,
1061 if response.status == 500:
1062 raise TestsuiteActionFailed
1063 response.raise_for_status()
1064 return await response.json(content_type=
None)
1066 async def _prepare(self) -> None:
1067 with self._state_manager.cache_control_update()
as pending_update:
1069 await self._tests_control(pending_update)
1075 headers: typing.Optional[typing.Dict[str, str]] =
None,
1076 bearer: typing.Optional[str] =
None,
1077 x_real_ip: typing.Optional[str] =
None,
1079 testsuite_skip_prepare: bool =
False,
1081 ) -> aiohttp.ClientResponse:
1082 if self._asyncexc_check:
1084 self._asyncexc_check()
1086 if not testsuite_skip_prepare:
1087 await self._prepare()
1089 response = await super()._request(
1097 if self._api_coverage_report:
1098 self._api_coverage_report.update_usage_stat(
1102 response.content_type,
1105 if self._asyncexc_check:
1107 self._asyncexc_check()