userver: /data/code/userver/testsuite/pytest_plugins/pytest_userver/client.py Source File
Loading...
Searching...
No Matches
client.py
1"""
2Python module that provides clients for functional tests with
3testsuite; see
4@ref scripts/docs/en/userver/functional_testing.md for an introduction.
5
6@ingroup userver_testsuite
7"""
8
9# pylint: disable=too-many-lines
10
11from __future__ import annotations
12
13from collections.abc import Awaitable
14from collections.abc import Iterator
15import contextlib
16import copy
17import dataclasses
18import json
19import logging
20import typing
21from typing import Any
22from typing import TypeAlias
23import warnings
24
25import aiohttp
26
27from testsuite import logcapture
28from testsuite import utils
29from testsuite.daemons import service_client
30from testsuite.utils import approx
31from testsuite.utils import http
32
33import pytest_userver.metrics # pylint: disable=import-error
34from pytest_userver.plugins import caches
35
36# @cond
37logger = logging.getLogger(__name__)
38# @endcond
39
40JsonAny: TypeAlias = int | float | str | list | dict
41JsonAnyOptional: TypeAlias = JsonAny | None
42
43_UNKNOWN_STATE = '__UNKNOWN__'
44
45CACHE_INVALIDATION_MESSAGE = (
46 'Direct cache invalidation is deprecated.\n'
47 '\n'
48 ' - Use client.update_server_state() to synchronize service state\n'
49 ' - Explicitly pass cache names to invalidate, e.g.: '
50 'invalidate_caches(cache_names=[...]).'
51)
52
53
54class BaseError(Exception):
55 """Base class for exceptions of this module."""
56
57
59 pass
60
61
63 pass
64
65
75 pass
76
77
79 pass
80
81
83 def __init__(self) -> None:
84 self.suspended_tasks: set[str] = set()
85 self.tasks_to_suspend: set[str] = set()
86
87
89 def __init__(self, name, reason):
90 self.name = name
91 self.reason = reason
92 super().__init__(f'Testsuite task {name!r} failed: {reason}')
93
94
95@dataclasses.dataclass(frozen=True)
97 testsuite_action_path: str | None = None
98 server_monitor_path: str | None = None
99
100
101Metric: TypeAlias = pytest_userver.metrics.Metric
102
103
105 """
106 Base asyncio userver client that implements HTTP requests to service.
107
108 Compatible with werkzeug interface.
109
110 @ingroup userver_testsuite
111 """
112
113 def __init__(self, client):
114 self._client = client
115
116 async def post(
117 self,
118 path: str,
119 # pylint: disable=redefined-outer-name
120 json: JsonAnyOptional = None,
121 data: Any = None,
122 params: dict[str, str] | None = None,
123 bearer: str | None = None,
124 x_real_ip: str | None = None,
125 headers: dict[str, str] | None = None,
126 **kwargs,
127 ) -> http.ClientResponse:
128 """
129 Make a HTTP POST request
130 """
131 response = await self._client.post(
132 path,
133 json=json,
134 data=data,
135 params=params,
136 headers=headers,
137 bearer=bearer,
138 x_real_ip=x_real_ip,
139 **kwargs,
140 )
141 return await self._wrap_client_response(response)
142
143 async def put(
144 self,
145 path,
146 # pylint: disable=redefined-outer-name
147 json: JsonAnyOptional = None,
148 data: Any = None,
149 params: dict[str, str] | None = None,
150 bearer: str | None = None,
151 x_real_ip: str | None = None,
152 headers: dict[str, str] | None = None,
153 **kwargs,
154 ) -> http.ClientResponse:
155 """
156 Make a HTTP PUT request
157 """
158 response = await self._client.put(
159 path,
160 json=json,
161 data=data,
162 params=params,
163 headers=headers,
164 bearer=bearer,
165 x_real_ip=x_real_ip,
166 **kwargs,
167 )
168 return await self._wrap_client_response(response)
169
170 async def patch(
171 self,
172 path,
173 # pylint: disable=redefined-outer-name
174 json: JsonAnyOptional = None,
175 data: Any = None,
176 params: dict[str, str] | None = None,
177 bearer: str | None = None,
178 x_real_ip: str | None = None,
179 headers: dict[str, str] | None = None,
180 **kwargs,
181 ) -> http.ClientResponse:
182 """
183 Make a HTTP PATCH request
184 """
185 response = await self._client.patch(
186 path,
187 json=json,
188 data=data,
189 params=params,
190 headers=headers,
191 bearer=bearer,
192 x_real_ip=x_real_ip,
193 **kwargs,
194 )
195 return await self._wrap_client_response(response)
196
197 async def get(
198 self,
199 path: str,
200 headers: dict[str, str] | None = None,
201 bearer: str | None = None,
202 x_real_ip: str | None = None,
203 **kwargs,
204 ) -> http.ClientResponse:
205 """
206 Make a HTTP GET request
207 """
208 response = await self._client.get(
209 path,
210 headers=headers,
211 bearer=bearer,
212 x_real_ip=x_real_ip,
213 **kwargs,
214 )
215 return await self._wrap_client_response(response)
216
217 async def delete(
218 self,
219 path: str,
220 headers: dict[str, str] | None = None,
221 bearer: str | None = None,
222 x_real_ip: str | None = None,
223 **kwargs,
224 ) -> http.ClientResponse:
225 """
226 Make a HTTP DELETE request
227 """
228 response = await self._client.delete(
229 path,
230 headers=headers,
231 bearer=bearer,
232 x_real_ip=x_real_ip,
233 **kwargs,
234 )
235 return await self._wrap_client_response(response)
236
237 async def options(
238 self,
239 path: str,
240 headers: dict[str, str] | None = None,
241 bearer: str | None = None,
242 x_real_ip: str | None = None,
243 **kwargs,
244 ) -> http.ClientResponse:
245 """
246 Make a HTTP OPTIONS request
247 """
248 response = await self._client.options(
249 path,
250 headers=headers,
251 bearer=bearer,
252 x_real_ip=x_real_ip,
253 **kwargs,
254 )
255 return await self._wrap_client_response(response)
256
257 async def request(
258 self,
259 http_method: str,
260 path: str,
261 **kwargs,
262 ) -> http.ClientResponse:
263 """
264 Make a HTTP request with the specified method
265 """
266 response = await self._client.request(http_method, path, **kwargs)
267 return await self._wrap_client_response(response)
268
269 @property
271 """
272 @deprecated Use pytest_userver.client.Client directly instead.
273 """
274 return self._client
275
276 def _wrap_client_response(self, response: aiohttp.ClientResponse) -> Awaitable[http.ClientResponse]:
277 return http.wrap_client_response(response)
278
279
280# @cond
281
282
283def _wrap_client_error(func):
284 async def _wrapper(*args, **kwargs):
285 try:
286 return await func(*args, **kwargs)
287 except aiohttp.client_exceptions.ClientResponseError as exc:
288 raise http.HttpResponseError(
289 url=exc.request_info.url,
290 status=exc.status,
291 )
292
293 return _wrapper
294
295
296class AiohttpClientMonitor(service_client.AiohttpClient):
297 _config: TestsuiteClientConfig
298
299 def __init__(self, base_url, *, config: TestsuiteClientConfig, **kwargs):
300 super().__init__(base_url, **kwargs)
301 self._config = config
302
303 async def get_metrics(self, prefix=None):
304 if not self._config.server_monitor_path:
305 raise ConfigurationError(
306 'handler-server-monitor component is not configured',
307 )
308 params = {'format': 'internal'}
309 if prefix is not None:
310 params['prefix'] = prefix
311 response = await self.get(
312 self._config.server_monitor_path,
313 params=params,
314 )
315 async with response:
316 response.raise_for_status()
317 return await response.json(content_type=None)
318
319 async def get_metric(self, metric_name):
320 metrics = await self.get_metrics(metric_name)
321 assert metric_name in metrics, (
322 f'No metric with name {metric_name!r}. Use "single_metric" function instead of "get_metric"'
323 )
324 return metrics[metric_name]
325
326 async def metrics_raw(
327 self,
328 output_format,
329 *,
330 path: str = None,
331 prefix: str = None,
332 labels: dict[str, str] | None = None,
333 ) -> str:
334 if not self._config.server_monitor_path:
335 raise ConfigurationError(
336 'handler-server-monitor component is not configured',
337 )
338
339 params = {'format': output_format}
340 if prefix:
341 params['prefix'] = prefix
342
343 if path:
344 params['path'] = path
345
346 if labels:
347 params['labels'] = json.dumps(labels)
348
349 response = await self.get(
350 self._config.server_monitor_path,
351 params=params,
352 )
353 async with response:
354 response.raise_for_status()
355 return await response.text()
356
357 async def metrics(
358 self,
359 *,
360 path: str = None,
361 prefix: str = None,
362 labels: dict[str, str] | None = None,
364 response = await self.metrics_raw(
365 output_format='json',
366 path=path,
367 prefix=prefix,
368 labels=labels,
369 )
371
372 async def single_metric_optional(
373 self,
374 path: str,
375 *,
376 labels: dict[str, str] | None = None,
378 response = await self.metrics(path=path, labels=labels)
379 metrics_list = response.get(path, [])
380
381 assert len(metrics_list) <= 1, (f'More than one metric found for path {path} and labels {labels}: {response}',)
382
383 if not metrics_list:
384 return None
385
386 return next(iter(metrics_list))
387
388 async def single_metric(
389 self,
390 path: str,
391 *,
392 labels: dict[str, str] | None = None,
394 value = await self.single_metric_optional(path, labels=labels)
395 assert value is not None, (f'No metric was found for path {path} and labels {labels}',)
396 return value
397
398
399# @endcond
400
401
403 """
404 Asyncio userver client for monitor listeners, typically retrieved from
405 plugins.service_client.monitor_client fixture.
406
407 Compatible with werkzeug interface.
408
409 @ingroup userver_testsuite
410 """
411
413 self,
414 *,
415 path: str | None = None,
416 prefix: str | None = None,
417 labels: dict[str, str] | None = None,
418 diff_gauge: bool = False,
419 ) -> MetricsDiffer:
420 """
421 Creates a `MetricsDiffer` that fetches metrics using this client.
422 It's recommended to use this method over `metrics` to make sure
423 the tests don't affect each other.
424
425 With `diff_gauge` off, only `RATE` metrics are differentiated.
426 With `diff_gauge` on, `GAUGE` metrics are differentiated as well,
427 which may lead to nonsensical results for those.
428
429 @param path Optional full metric path
430 @param prefix Optional prefix on which the metric paths should start
431 @param labels Optional dictionary of labels that must be in the metric
432 @param diff_gauge Whether to differentiate GAUGE metrics
433
434 @snippet samples/testsuite-support/tests/test_metrics.py metrics diff
435 """
436 return MetricsDiffer(
437 _client=self,
438 _path=path,
439 _prefix=prefix,
440 _labels=labels,
441 _diff_gauge=diff_gauge,
442 )
443
444 @_wrap_client_error
445 async def metrics(
446 self,
447 *,
448 path: str | None = None,
449 prefix: str | None = None,
450 labels: dict[str, str] | None = None,
452 """
453 Returns a dict of metric names to Metric.
454
455 @param path Optional full metric path
456 @param prefix Optional prefix on which the metric paths should start
457 @param labels Optional dictionary of labels that must be in the metric
458
459 @snippet samples/testsuite-support/tests/test_metrics.py metrics metrics
460 """
461 return await self._client.metrics(
462 path=path,
463 prefix=prefix,
464 labels=labels,
465 )
466
467 @_wrap_client_error
469 self,
470 path: str,
471 *,
472 labels: dict[str, str] | None = None,
474 """
475 Either return a pytest_userver.metrics.Metric or None if there's no such metric.
476
477 @param path Full metric path
478 @param labels Optional dictionary of labels that must be in the metric
479
480 @throws AssertionError if more than one metric returned
481
482 @snippet samples/testsuite-support/tests/test_metrics.py metrics single_metric_optional
483 """
484 return await self._client.single_metric_optional(path, labels=labels)
485
486 @_wrap_client_error
487 async def single_metric(
488 self,
489 path: str,
490 *,
491 labels: dict[str, str] | None = None,
493 """
494 Returns the pytest_userver.metrics.Metric.
495
496 @param path Full metric path
497 @param labels Optional dictionary of labels that must be in the metric
498
499 @throws AssertionError if more than one metric or no metric found
500
501 @snippet samples/testsuite-support/tests/test_metrics.py metrics single_metric
502 """
503 return await self._client.single_metric(path, labels=labels)
504
505 @_wrap_client_error
506 async def metrics_raw(
507 self,
508 output_format: str,
509 *,
510 path: str | None = None,
511 prefix: str | None = None,
512 labels: dict[str, str] | None = None,
513 ) -> dict[str, pytest_userver.metrics.Metric]:
514 """
515 Low level function that returns metrics in a specific format.
516 Use `metrics` and `single_metric` instead if possible.
517
518 @param output_format pytest_userver.metrics.Metric output format. See
519 @ref server::handlers::ServerMonitor for a list of supported formats.
520 @param path Optional full metric path
521 @param prefix Optional prefix on which the metric paths should start
522 @param labels Optional dictionary of labels that must be in the metric
523 """
524 return await self._client.metrics_raw(
525 output_format=output_format,
526 path=path,
527 prefix=prefix,
528 labels=labels,
529 )
530
531 @_wrap_client_error
532 async def get_metrics(self, prefix=None):
533 """
534 @deprecated Use metrics() or single_metric() instead
535 """
536 return await self._client.get_metrics(prefix=prefix)
537
538 @_wrap_client_error
539 async def get_metric(self, metric_name):
540 """
541 @deprecated Use metrics() or single_metric() instead
542 """
543 return await self._client.get_metric(metric_name)
544
545 @_wrap_client_error
546 async def fired_alerts(self):
547 response = await self._client.get('/service/fired-alerts')
548 assert response.status == 200
549 return (await response.json())['alerts']
550
551
553 """
554 A helper class for computing metric differences.
555
556 @see ClientMonitor.metrics_diff
557
558 Example with @ref pytest_userver.client.ClientMonitor.metrics_diff "await monitor_client.metrics_diff()":
559 @snippet samples/testsuite-support/tests/test_metrics.py metrics diff
560
561 @ingroup userver_testsuite
562 """
563
564 # @cond
565 def __init__(
566 self,
567 _client: ClientMonitor,
568 _path: str | None,
569 _prefix: str | None,
570 _labels: dict[str, str] | None,
571 _diff_gauge: bool,
572 ):
573 self._client = _client
574 self._path = _path
575 self._prefix = _prefix
576 self._labels = _labels
577 self._diff_gauge = _diff_gauge
581
582 # @endcond
583
584 @property
585 def baseline(self) -> pytest_userver.metrics.MetricsSnapshot:
586 assert self._baseline is not None
587 return self._baseline
588
589 @baseline.setter
590 def baseline(self, value: pytest_userver.metrics.MetricsSnapshot) -> None:
591 self._baseline = value
592 if self._current is not None:
593 self._diff = _subtract_metrics_snapshots(
594 self._current,
595 self._baseline,
596 self._diff_gauge,
597 )
598
599 @property
600 def current(self) -> pytest_userver.metrics.MetricsSnapshot:
601 assert self._current is not None, 'Set self.current first'
602 return self._current
603
604 @current.setter
605 def current(self, value: pytest_userver.metrics.MetricsSnapshot) -> None:
606 self._current = value
607 assert self._baseline is not None, 'Set self.baseline first'
608 self._diff = _subtract_metrics_snapshots(
609 self._current,
610 self._baseline,
611 self._diff_gauge,
612 )
613
614 @property
615 def diff(self) -> pytest_userver.metrics.MetricsSnapshot:
616 assert self._diff is not None, 'Set self.current first'
617 return self._diff
618
620 self,
621 subpath: str | None = None,
622 add_labels: dict[str, str] | None = None,
623 *,
624 default: float | None = None,
625 ) -> pytest_userver.metrics.MetricValue:
626 """
627 Returns a single metric value at the specified path, prepending
628 the path provided at construction. If a dict of labels is provided,
629 does en exact match of labels, prepending the labels provided at construction.
630
631 @param subpath Suffix of the metric path; the path provided at construction is prepended
632 @param add_labels Labels that the metric must have in addition to the labels provided at construction
633 @param default An optional default value in case the metric is missing
634 @throws AssertionError if not one metric by path
635 """
636 base_path = self._path or self._prefix
637 if base_path and subpath:
638 path = f'{base_path}.{subpath}'
639 else:
640 assert base_path or subpath, 'No path provided'
641 path = base_path or subpath or ''
642 labels: dict | None = None
643 if self._labels is not None or add_labels is not None:
644 labels = {**(self._labels or {}), **(add_labels or {})}
645 return self.diff.value_at(path, labels, default=default)
646
647 async def fetch(self) -> pytest_userver.metrics.MetricsSnapshot:
648 """
649 Fetches metric values from the service.
650 """
651 return await self._client.metrics(
652 path=self._path,
653 prefix=self._prefix,
654 labels=self._labels,
655 )
656
657 async def __aenter__(self) -> MetricsDiffer:
658 self._baseline = await self.fetch()
659 self._current = None
660 return self
661
662 async def __aexit__(self, exc_type, exc, exc_tb) -> None:
663 self.current = await self.fetch()
664
665
666# @cond
667
668
669def _subtract_metrics_snapshots(
670 current: pytest_userver.metrics.MetricsSnapshot,
672 diff_gauge: bool,
675 path: {_subtract_metrics(path, current_metric, initial, diff_gauge) for current_metric in current_group}
676 for path, current_group in current.items()
677 })
678
679
680def _subtract_metrics(
681 path: str,
682 current_metric: pytest_userver.metrics.Metric,
684 diff_gauge: bool,
686 initial_group = initial.get(path, None)
687 if initial_group is None:
688 return current_metric
689 initial_metric = next(
690 (x for x in initial_group if x.labels == current_metric.labels),
691 None,
692 )
693 if initial_metric is None:
694 return current_metric
695
697 labels=current_metric.labels,
698 value=_subtract_metric_values(
699 current=current_metric,
700 initial=initial_metric,
701 diff_gauge=diff_gauge,
702 ),
703 _type=current_metric.type(),
704 )
705
706
707def _subtract_metric_values(
708 current: pytest_userver.metrics.Metric,
710 diff_gauge: bool,
711) -> pytest_userver.metrics.MetricValue:
712 assert current.type() is not pytest_userver.metrics.MetricType.UNSPECIFIED
713 assert initial.type() is not pytest_userver.metrics.MetricType.UNSPECIFIED
714 assert current.type() == initial.type()
715
716 if isinstance(current.value, pytest_userver.metrics.Histogram):
717 assert isinstance(initial.value, pytest_userver.metrics.Histogram)
718 return _subtract_metric_values_hist(current=current, initial=initial)
719 else:
720 assert not isinstance(initial.value, pytest_userver.metrics.Histogram)
721 return _subtract_metric_values_num(
722 current=current,
723 initial=initial,
724 diff_gauge=diff_gauge,
725 )
726
727
728def _subtract_metric_values_num(
729 current: pytest_userver.metrics.Metric,
731 diff_gauge: bool,
732) -> float:
733 current_value = typing.cast(float, current.value)
734 initial_value = typing.cast(float, initial.value)
735 should_diff = (
736 current.type() is pytest_userver.metrics.MetricType.RATE
737 or initial.type() is pytest_userver.metrics.MetricType.RATE
738 or diff_gauge
739 )
740 return current_value - initial_value if should_diff else current_value
741
742
743def _subtract_metric_values_hist(
744 current: pytest_userver.metrics.Metric,
747 current_value = typing.cast(pytest_userver.metrics.Histogram, current.value)
748 initial_value = typing.cast(pytest_userver.metrics.Histogram, initial.value)
749 assert current_value.bounds == initial_value.bounds
751 bounds=current_value.bounds,
752 buckets=[t[0] - t[1] for t in zip(current_value.buckets, initial_value.buckets, strict=True)],
753 inf=current_value.inf - initial_value.inf,
754 )
755
756
757class AiohttpClient(service_client.AiohttpClient):
758 PeriodicTaskFailed = PeriodicTaskFailed
759 TestsuiteActionFailed = TestsuiteActionFailed
760 TestsuiteTaskNotFound = TestsuiteTaskNotFound
761 TestsuiteTaskConflict = TestsuiteTaskConflict
762 TestsuiteTaskFailed = TestsuiteTaskFailed
763
764 def __init__(
765 self,
766 base_url: str,
767 *,
768 config: TestsuiteClientConfig,
769 mocked_time,
770 log_capture_fixture: logcapture.CaptureServer,
771 testpoint,
772 testpoint_control,
773 cache_invalidation_state,
774 span_id_header=None,
775 api_coverage_report=None,
776 periodic_tasks_state: PeriodicTasksState | None = None,
777 allow_all_caches_invalidation: bool = True,
778 cache_control: caches.CacheControl | None = None,
779 asyncexc_check=None,
780 **kwargs,
781 ):
782 super().__init__(base_url, span_id_header=span_id_header, **kwargs)
783 self._config = config
784 self._periodic_tasks = periodic_tasks_state
785 self._testpoint = testpoint
786 self._log_capture_fixture = log_capture_fixture
787 self._state_manager = _StateManager(
788 mocked_time=mocked_time,
789 testpoint=self._testpoint,
790 testpoint_control=testpoint_control,
791 invalidation_state=cache_invalidation_state,
792 cache_control=cache_control,
793 )
794 self._api_coverage_report = api_coverage_report
795 self._allow_all_caches_invalidation = allow_all_caches_invalidation
796 self._asyncexc_check = asyncexc_check
797
798 async def run_periodic_task(self, name):
799 response = await self._testsuite_action('run_periodic_task', name=name)
800 if not response['status']:
801 raise self.PeriodicTaskFailed(f'Periodic task {name} failed')
802
803 async def suspend_periodic_tasks(self, names: list[str]) -> None:
804 if not self._periodic_tasks:
805 raise ConfigurationError('No periodic_tasks_state given')
806 self._periodic_tasks.tasks_to_suspend.update(names)
807 await self._suspend_periodic_tasks()
808
809 async def resume_periodic_tasks(self, names: list[str]) -> None:
810 if not self._periodic_tasks:
811 raise ConfigurationError('No periodic_tasks_state given')
812 self._periodic_tasks.tasks_to_suspend.difference_update(names)
813 await self._suspend_periodic_tasks()
814
815 async def resume_all_periodic_tasks(self) -> None:
816 if not self._periodic_tasks:
817 raise ConfigurationError('No periodic_tasks_state given')
818 self._periodic_tasks.tasks_to_suspend.clear()
819 await self._suspend_periodic_tasks()
820
821 async def write_cache_dumps(
822 self,
823 names: list[str],
824 *,
825 testsuite_skip_prepare=False,
826 ) -> None:
827 await self._testsuite_action(
828 'write_cache_dumps',
829 names=names,
830 testsuite_skip_prepare=testsuite_skip_prepare,
831 )
832
833 async def read_cache_dumps(
834 self,
835 names: list[str],
836 *,
837 testsuite_skip_prepare=False,
838 ) -> None:
839 await self._testsuite_action(
840 'read_cache_dumps',
841 names=names,
842 testsuite_skip_prepare=testsuite_skip_prepare,
843 )
844
845 async def run_distlock_task(self, name: str) -> None:
846 await self.run_task(f'distlock/{name}')
847
848 async def reset_metrics(self) -> None:
849 await self._testsuite_action('reset_metrics')
850
851 async def metrics_portability(
852 self,
853 *,
854 prefix: str | None = None,
855 ) -> dict[str, list[dict[str, str]]]:
856 return await self._testsuite_action(
857 'metrics_portability',
858 prefix=prefix,
859 )
860
861 async def list_tasks(self) -> list[str]:
862 response = await self._do_testsuite_action('tasks_list')
863 async with response:
864 response.raise_for_status()
865 body = await response.json(content_type=None)
866 return body['tasks']
867
868 async def run_task(self, name: str) -> None:
869 response = await self._do_testsuite_action(
870 'task_run',
871 json={'name': name},
872 )
873 await _task_check_response(name, response)
874
875 @contextlib.asynccontextmanager
876 async def spawn_task(self, name: str):
877 task_id = await self._task_spawn(name)
878 try:
879 yield
880 finally:
881 await self._task_stop_spawned(task_id)
882
883 async def _task_spawn(self, name: str) -> str:
884 response = await self._do_testsuite_action(
885 'task_spawn',
886 json={'name': name},
887 )
888 data = await _task_check_response(name, response)
889 return data['task_id']
890
891 async def _task_stop_spawned(self, task_id: str) -> None:
892 response = await self._do_testsuite_action(
893 'task_stop',
894 json={'task_id': task_id},
895 )
896 await _task_check_response(task_id, response)
897
898 async def http_allowed_urls_extra(
899 self,
900 http_allowed_urls_extra: list[str],
901 ) -> None:
902 await self._do_testsuite_action(
903 'http_allowed_urls_extra',
904 json={'allowed_urls_extra': http_allowed_urls_extra},
905 testsuite_skip_prepare=True,
906 )
907
908 @contextlib.asynccontextmanager
909 async def capture_logs(
910 self,
911 *,
912 log_level: str = 'DEBUG',
913 testsuite_skip_prepare: bool = False,
914 ):
915 async with self._log_capture_fixture.capture(
916 log_level=logcapture.LogLevel.from_string(log_level),
917 ) as capture:
918 logger.debug('Starting logcapture')
919 await self._testsuite_action(
920 'log_capture',
921 log_level=log_level,
922 socket_logging_duplication=True,
923 testsuite_skip_prepare=testsuite_skip_prepare,
924 )
925
926 try:
927 await self._log_capture_fixture.wait_for_client()
928 yield capture
929 finally:
930 await self._testsuite_action(
931 'log_capture',
932 log_level=self._log_capture_fixture.default_log_level.name,
933 socket_logging_duplication=False,
934 testsuite_skip_prepare=testsuite_skip_prepare,
935 )
936
937 async def log_flush(self, logger_name: str | None = None):
938 await self._testsuite_action(
939 'log_flush',
940 logger_name=logger_name,
941 testsuite_skip_prepare=True,
942 )
943
944 async def invalidate_caches(
945 self,
946 *,
947 clean_update: bool = True,
948 cache_names: list[str] | None = None,
949 testsuite_skip_prepare: bool = False,
950 ) -> None:
951 if cache_names is None and clean_update:
952 if self._allow_all_caches_invalidation:
953 warnings.warn(CACHE_INVALIDATION_MESSAGE, DeprecationWarning, stacklevel=2)
954 else:
955 __tracebackhide__ = True
956 raise RuntimeError(CACHE_INVALIDATION_MESSAGE)
957
958 if testsuite_skip_prepare:
959 await self._tests_control({
960 'invalidate_caches': {
961 'update_type': ('full' if clean_update else 'incremental'),
962 **({'names': cache_names} if cache_names else {}),
963 },
964 })
965 else:
966 await self.tests_control(
967 invalidate_caches=True,
968 clean_update=clean_update,
969 cache_names=cache_names,
970 )
971
972 async def tests_control(
973 self,
974 *,
975 invalidate_caches: bool = True,
976 clean_update: bool = True,
977 cache_names: list[str] | None = None,
978 http_allowed_urls_extra: list[str] | None = None,
979 ) -> dict[str, Any]:
980 body: dict[str, Any] = self._state_manager.get_pending_update()
981
982 if 'invalidate_caches' in body and invalidate_caches:
983 if not clean_update or cache_names:
984 logger.warning(
985 'Manual cache invalidation leads to indirect initial full cache invalidation',
986 )
987 await self._prepare()
988 body = {}
989
990 if invalidate_caches:
991 body['invalidate_caches'] = {
992 'update_type': ('full' if clean_update else 'incremental'),
993 }
994 if cache_names:
995 body['invalidate_caches']['names'] = cache_names
996
997 if http_allowed_urls_extra is not None:
998 await self.http_allowed_urls_extra(http_allowed_urls_extra)
999
1000 return await self._tests_control(body)
1001
1002 async def update_server_state(self) -> None:
1003 await self._prepare()
1004
1005 async def enable_testpoints(self, *, no_auto_cache_cleanup=False) -> None:
1006 if not self._testpoint:
1007 return
1008 if no_auto_cache_cleanup:
1009 await self._tests_control({
1010 'testpoints': sorted(self._testpoint.keys()),
1011 })
1012 else:
1013 await self.update_server_state()
1014
1015 async def get_dynamic_config_defaults(
1016 self,
1017 ) -> dict[str, Any]:
1018 return await self._testsuite_action(
1019 'get_dynamic_config_defaults',
1020 testsuite_skip_prepare=True,
1021 )
1022
1023 async def _tests_control(self, body: dict) -> dict[str, Any]:
1024 with self._state_manager.updating_state(body):
1025 async with await self._do_testsuite_action(
1026 'control',
1027 json=body,
1028 testsuite_skip_prepare=True,
1029 ) as response:
1030 if response.status == 404:
1031 raise ConfigurationError(
1032 'It seems that testsuite support is not enabled for your service',
1033 )
1034 response.raise_for_status()
1035 return await response.json(content_type=None)
1036
1037 async def _suspend_periodic_tasks(self):
1038 if self._periodic_tasks.tasks_to_suspend != self._periodic_tasks.suspended_tasks:
1039 await self._testsuite_action(
1040 'suspend_periodic_tasks',
1041 names=sorted(self._periodic_tasks.tasks_to_suspend),
1042 )
1043 self._periodic_tasks.suspended_tasks = set(
1044 self._periodic_tasks.tasks_to_suspend,
1045 )
1046
1047 def _do_testsuite_action(self, action, **kwargs):
1048 if not self._config.testsuite_action_path:
1049 raise ConfigurationError(
1050 'tests-control component is not properly configured',
1051 )
1052 path = self._config.testsuite_action_path.format(action=action)
1053 return self.post(path, **kwargs)
1054
1055 async def _testsuite_action(
1056 self,
1057 action,
1058 *,
1059 testsuite_skip_prepare=False,
1060 **kwargs,
1061 ):
1062 async with await self._do_testsuite_action(
1063 action,
1064 json=kwargs,
1065 testsuite_skip_prepare=testsuite_skip_prepare,
1066 ) as response:
1067 if response.status == 500:
1068 raise TestsuiteActionFailed
1069 response.raise_for_status()
1070 return await response.json(content_type=None)
1071
1072 async def _prepare(self) -> None:
1073 with self._state_manager.cache_control_update() as pending_update:
1074 if pending_update:
1075 await self._tests_control(pending_update)
1076
1077 async def _request( # pylint: disable=arguments-differ
1078 self,
1079 http_method: str,
1080 path: str,
1081 headers: dict[str, str] | None = None,
1082 bearer: str | None = None,
1083 x_real_ip: str | None = None,
1084 *,
1085 testsuite_skip_prepare: bool = False,
1086 **kwargs,
1087 ) -> aiohttp.ClientResponse:
1088 if self._asyncexc_check:
1089 # Check for pending background exceptions before call.
1090 self._asyncexc_check()
1091
1092 if not testsuite_skip_prepare:
1093 await self._prepare()
1094
1095 response = await super()._request(
1096 http_method,
1097 path,
1098 headers,
1099 bearer,
1100 x_real_ip,
1101 **kwargs,
1102 )
1103 if self._api_coverage_report:
1104 self._api_coverage_report.update_usage_stat(
1105 path,
1106 http_method,
1107 response.status,
1108 response.content_type,
1109 )
1110
1111 if self._asyncexc_check:
1112 # Check for pending background exceptions after call.
1113 self._asyncexc_check()
1114
1115 return response
1116
1117
1118# @endcond
1119
1120
1122 """
1123 Asyncio userver client, typically retrieved from
1124 @ref service_client "plugins.service_client.service_client"
1125 fixture.
1126
1127 Compatible with werkzeug interface.
1128
1129 @ingroup userver_testsuite
1130 """
1131
1132 PeriodicTaskFailed = PeriodicTaskFailed
1133 TestsuiteActionFailed = TestsuiteActionFailed
1134 TestsuiteTaskNotFound = TestsuiteTaskNotFound
1135 TestsuiteTaskConflict = TestsuiteTaskConflict
1136 TestsuiteTaskFailed = TestsuiteTaskFailed
1137
1138 def _wrap_client_response(self, response: aiohttp.ClientResponse) -> Awaitable[http.ClientResponse]:
1139 return http.wrap_client_response(
1140 response,
1141 json_loads=approx.json_loads,
1142 )
1143
1144 @_wrap_client_error
1145 async def run_periodic_task(self, name):
1146 await self._client.run_periodic_task(name)
1147
1148 @_wrap_client_error
1149 async def suspend_periodic_tasks(self, names: list[str]) -> None:
1150 await self._client.suspend_periodic_tasks(names)
1151
1152 @_wrap_client_error
1153 async def resume_periodic_tasks(self, names: list[str]) -> None:
1154 await self._client.resume_periodic_tasks(names)
1155
1156 @_wrap_client_error
1157 async def resume_all_periodic_tasks(self) -> None:
1158 await self._client.resume_all_periodic_tasks()
1159
1160 @_wrap_client_error
1161 async def write_cache_dumps(
1162 self,
1163 names: list[str],
1164 *,
1165 testsuite_skip_prepare=False,
1166 ) -> None:
1167 await self._client.write_cache_dumps(
1168 names=names,
1169 testsuite_skip_prepare=testsuite_skip_prepare,
1170 )
1171
1172 @_wrap_client_error
1173 async def read_cache_dumps(
1174 self,
1175 names: list[str],
1176 *,
1177 testsuite_skip_prepare=False,
1178 ) -> None:
1179 await self._client.read_cache_dumps(
1180 names=names,
1181 testsuite_skip_prepare=testsuite_skip_prepare,
1182 )
1183
1184 async def run_task(self, name: str) -> None:
1185 await self._client.run_task(name)
1186
1187 async def run_distlock_task(self, name: str) -> None:
1188 await self._client.run_distlock_task(name)
1189
1190 async def reset_metrics(self) -> None:
1191 """
1192 Calls `ResetMetric(metric);` for each metric that has such C++ function.
1193
1194 Note that using `reset_metrics()` is discouraged, prefer using a more reliable
1195 @ref pytest_userver.client.ClientMonitor.metrics_diff "await monitor_client.metrics_diff()".
1196
1197 @snippet samples/testsuite-support/tests/test_metrics.py metrics reset
1198 """
1199 await self._client.reset_metrics()
1200
1202 self,
1203 *,
1204 prefix: str | None = None,
1205 ) -> dict[str, list[dict[str, str]]]:
1206 """
1207 Reports metrics related issues that could be encountered on
1208 different monitoring systems.
1209
1210 @sa @ref utils::statistics::GetPortabilityWarnings
1211 """
1212 return await self._client.metrics_portability(prefix=prefix)
1213
1214 def list_tasks(self) -> list[str]:
1215 return self._client.list_tasks()
1216
1217 def spawn_task(self, name: str):
1218 return self._client.spawn_task(name)
1219
1221 self,
1222 *,
1223 log_level: str = 'DEBUG',
1224 testsuite_skip_prepare: bool = False,
1225 ):
1226 """
1227 Captures logs from the service.
1228
1229 @param log_level Do not capture logs below this level.
1230 @param testsuite_skip_prepare An advanced parameter to skip auto-`update_server_state`.
1231
1232 @see @ref testsuite_logs_capture
1233 """
1234 return self._client.capture_logs(
1235 log_level=log_level,
1236 testsuite_skip_prepare=testsuite_skip_prepare,
1237 )
1238
1239 def log_flush(self, logger_name: str | None = None):
1240 """
1241 Flush service logs.
1242 """
1243 return self._client.log_flush(logger_name=logger_name)
1244
1245 @_wrap_client_error
1247 self,
1248 *,
1249 clean_update: bool = True,
1250 cache_names: list[str] | None = None,
1251 testsuite_skip_prepare: bool = False,
1252 ) -> None:
1253 """
1254 Send request to service to update caches.
1255
1256 @param clean_update if False, service will do a faster incremental
1257 update of caches whenever possible.
1258 @param cache_names which caches specifically should be updated;
1259 update all if None.
1260 @param testsuite_skip_prepare if False, service will automatically do
1261 update_server_state().
1262 """
1263 __tracebackhide__ = True
1264 await self._client.invalidate_caches(
1265 clean_update=clean_update,
1266 cache_names=cache_names,
1267 testsuite_skip_prepare=testsuite_skip_prepare,
1268 )
1269
1270 @_wrap_client_error
1271 async def tests_control(
1272 self,
1273 invalidate_caches: bool = True,
1274 clean_update: bool = True,
1275 cache_names: list[str] | None = None,
1276 http_allowed_urls_extra: list[str] | None = None,
1277 ) -> dict[str, Any]:
1278 return await self._client.tests_control(
1279 invalidate_caches=invalidate_caches,
1280 clean_update=clean_update,
1281 cache_names=cache_names,
1282 http_allowed_urls_extra=http_allowed_urls_extra,
1283 )
1284
1285 @_wrap_client_error
1286 async def update_server_state(self) -> None:
1287 """
1288 Update service-side state through http call to 'tests/control':
1289 - clear dirty (from other tests) caches
1290 - set service-side mocked time,
1291 - resume / suspend periodic tasks
1292 - enable testpoints
1293 If service is up-to-date, does nothing.
1294 """
1295 await self._client.update_server_state()
1296
1297 @_wrap_client_error
1298 async def enable_testpoints(self, no_auto_cache_cleanup: bool = False) -> None:
1299 """
1300 Send list of handled testpoint pats to service. For these paths service
1301 will no more skip http calls from TESTPOINT(...) macro.
1302
1303 @param no_auto_cache_cleanup prevent automatic cache cleanup.
1304 When calling service client first time in scope of current test, client
1305 makes additional http call to `tests/control` to update caches, to get
1306 rid of data from previous test.
1307 """
1308 await self._client.enable_testpoints(no_auto_cache_cleanup=no_auto_cache_cleanup)
1309
1310 @_wrap_client_error
1311 async def get_dynamic_config_defaults(
1312 self,
1313 ) -> dict[str, Any]:
1314 return await self._client.get_dynamic_config_defaults()
1315
1316
1317@dataclasses.dataclass
1319 """Reflects the (supposed) current service state."""
1320
1321 invalidation_state: caches.InvalidationState
1322 now: str | None = _UNKNOWN_STATE
1323 testpoints: frozenset[str] = frozenset([_UNKNOWN_STATE])
1324
1325
1327 """
1328 Used for computing the requests that we need to automatically align
1329 the service state with the test fixtures state.
1330 """
1331
1332 def __init__(
1333 self,
1334 *,
1335 mocked_time,
1336 testpoint,
1337 testpoint_control,
1338 invalidation_state: caches.InvalidationState,
1339 cache_control: caches.CacheControl | None,
1340 ):
1341 self._state = _State(
1342 invalidation_state=copy.deepcopy(invalidation_state),
1343 )
1344 self._mocked_time = mocked_time
1345 self._testpoint = testpoint
1346 self._testpoint_control = testpoint_control
1347 self._invalidation_state = invalidation_state
1348 self._cache_control = cache_control
1349
1350 @contextlib.contextmanager
1351 def updating_state(self, body: dict[str, Any]):
1352 """
1353 Whenever `tests_control` handler is invoked
1354 (by the client itself during `prepare` or manually by the user),
1355 we need to synchronize `_state` with the (supposed) service state.
1356 The state update is decoded from the request body.
1357 """
1358 saved_state = copy.deepcopy(self._state)
1359 try:
1360 self._update_state(body)
1361 self._apply_new_state()
1362 yield
1363 except Exception: # noqa
1364 self._state = saved_state
1365 self._apply_new_state()
1366 raise
1367
1368 def get_pending_update(self) -> dict[str, Any]:
1369 """
1370 Compose the body of the `tests_control` request required to completely
1371 synchronize the service state with the state of test fixtures.
1372 """
1373 body: dict[str, Any] = {}
1374
1375 if self._invalidation_state.has_caches_to_update:
1376 body['invalidate_caches'] = {'update_type': 'full'}
1377 if not self._invalidation_state.should_update_all_caches:
1378 body['invalidate_caches']['names'] = list(
1379 self._invalidation_state.caches_to_update,
1380 )
1381
1382 desired_testpoints = self._testpoint.keys()
1383 if self._state.testpoints != frozenset(desired_testpoints):
1384 body['testpoints'] = sorted(desired_testpoints)
1385
1386 desired_now = self._get_desired_now()
1387 if self._state.now != desired_now:
1388 body['mock_now'] = desired_now
1389
1390 return body
1391
1392 @contextlib.contextmanager
1393 def cache_control_update(self) -> Iterator[dict[Any, Any]]:
1394 pending_update = self.get_pending_update()
1395 invalidate_caches = pending_update.get('invalidate_caches')
1396 if not invalidate_caches or not self._cache_control:
1397 yield pending_update
1398 else:
1399 cache_names = invalidate_caches.get('names')
1400 staged, actions = self._cache_control.query_caches(cache_names)
1401 self._apply_cache_control_actions(invalidate_caches, actions)
1402 yield pending_update
1403 self._cache_control.commit_staged(staged)
1404
1405 @staticmethod
1406 def _apply_cache_control_actions(
1407 invalidate_caches: dict[Any, Any],
1408 actions: list[tuple[str, caches.CacheControlAction]],
1409 ) -> None:
1410 cache_names = invalidate_caches.get('names')
1411 exclude_names = invalidate_caches.setdefault('exclude_names', [])
1412 force_incremental_names = invalidate_caches.setdefault(
1413 'force_incremental_names',
1414 [],
1415 )
1416 for cache_name, action in actions:
1417 match action:
1418 case caches.CacheControlAction.FULL:
1419 pass
1420 case caches.CacheControlAction.INCREMENTAL:
1421 force_incremental_names.append(cache_name)
1422 case caches.CacheControlAction.EXCLUDE:
1423 if cache_names is not None:
1424 cache_names.remove(cache_name)
1425 else:
1426 exclude_names.append(cache_name)
1427
1428 def _update_state(self, body: dict[str, Any]) -> None:
1429 body_invalidate_caches = body.get('invalidate_caches', {})
1430 update_type = body_invalidate_caches.get('update_type', 'full')
1431 body_cache_names = body_invalidate_caches.get('names', None)
1432 # An incremental update is considered insufficient to bring a cache
1433 # to a known state.
1434 if body_invalidate_caches and update_type == 'full':
1435 if body_cache_names is None:
1436 self._state.invalidation_state.on_all_caches_updated()
1437 else:
1438 self._state.invalidation_state.on_caches_updated(
1439 body_cache_names,
1440 )
1441
1442 if 'mock_now' in body:
1443 self._state.now = body['mock_now']
1444
1445 testpoints: list[str] | None = body.get('testpoints')
1446 if testpoints is not None:
1447 self._state.testpoints = frozenset(testpoints)
1448
1450 """Apply new state to related components."""
1451 self._testpoint_control.enabled_testpoints = self._state.testpoints
1452 self._invalidation_state.assign_copy(self._state.invalidation_state)
1453
1454 def _get_desired_now(self) -> str | None:
1455 if self._mocked_time.is_enabled:
1456 return utils.timestring(self._mocked_time.now())
1457 return None
1458
1459
1460async def _task_check_response(name: str, response) -> dict:
1461 async with response:
1462 if response.status == 404:
1463 raise TestsuiteTaskNotFound(f'Testsuite task {name!r} not found')
1464 if response.status == 409:
1465 raise TestsuiteTaskConflict(f'Testsuite task {name!r} conflict')
1466 assert response.status == 200
1467 data = await response.json()
1468 if not data.get('status', True):
1469 raise TestsuiteTaskFailed(name, data['reason'])
1470 return data