userver: /home/antonyzhilin/arcadia/taxi/uservices/userver/testsuite/pytest_plugins/pytest_userver/client.py Source File
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages Concepts
client.py
1"""
2Python module that provides clients for functional tests with
3testsuite; see
4@ref scripts/docs/en/userver/functional_testing.md for an introduction.
5
6@ingroup userver_testsuite
7"""
8
9# pylint: disable=too-many-lines
10
11import contextlib
12import copy
13import dataclasses
14import json
15import logging
16import typing
17from typing import Optional
18from typing import Union
19import warnings
20
21import aiohttp
22
23from testsuite import logcapture
24from testsuite import utils
25from testsuite.daemons import service_client
26from testsuite.utils import approx
27from testsuite.utils import http
28
29import pytest_userver.metrics as metric_module # pylint: disable=import-error
30from pytest_userver.plugins import caches
31
32# @cond
33logger = logging.getLogger(__name__)
34# @endcond
35
36JsonAny = Union[int, float, str, list, dict]
37JsonAnyOptional = Optional[JsonAny]
38
39_UNKNOWN_STATE = '__UNKNOWN__'
40
41CACHE_INVALIDATION_MESSAGE = (
42 'Direct cache invalidation is deprecated.\n'
43 '\n'
44 ' - Use client.update_server_state() to synchronize service state\n'
45 ' - Explicitly pass cache names to invalidate, e.g.: '
46 'invalidate_caches(cache_names=[...]).'
47)
48
49
50class BaseError(Exception):
51 """Base class for exceptions of this module."""
52
53
55 pass
56
57
59 pass
60
61
71 pass
72
73
75 pass
76
77
79 def __init__(self):
80 self.suspended_tasks: typing.Set[str] = set()
81 self.tasks_to_suspend: typing.Set[str] = set()
82
83
85 def __init__(self, name, reason):
86 self.name = name
87 self.reason = reason
88 super().__init__(f'Testsuite task {name!r} failed: {reason}')
89
90
91@dataclasses.dataclass(frozen=True)
93 testsuite_action_path: typing.Optional[str] = None
94 server_monitor_path: typing.Optional[str] = None
95
96
97Metric = metric_module.Metric
98
99
101 """
102 Base asyncio userver client that implements HTTP requests to service.
103
104 Compatible with werkzeug interface.
105
106 @ingroup userver_testsuite
107 """
108
109 def __init__(self, client):
110 self._client = client
111
112 async def post(
113 self,
114 path: str,
115 # pylint: disable=redefined-outer-name
116 json: JsonAnyOptional = None,
117 data: typing.Any = None,
118 params: typing.Optional[typing.Dict[str, str]] = None,
119 bearer: typing.Optional[str] = None,
120 x_real_ip: typing.Optional[str] = None,
121 headers: typing.Optional[typing.Dict[str, str]] = None,
122 **kwargs,
123 ) -> http.ClientResponse:
124 """
125 Make a HTTP POST request
126 """
127 response = await self._client.post(
128 path,
129 json=json,
130 data=data,
131 params=params,
132 headers=headers,
133 bearer=bearer,
134 x_real_ip=x_real_ip,
135 **kwargs,
136 )
137 return await self._wrap_client_response(response)
138
139 async def put(
140 self,
141 path,
142 # pylint: disable=redefined-outer-name
143 json: JsonAnyOptional = None,
144 data: typing.Any = None,
145 params: typing.Optional[typing.Dict[str, str]] = None,
146 bearer: typing.Optional[str] = None,
147 x_real_ip: typing.Optional[str] = None,
148 headers: typing.Optional[typing.Dict[str, str]] = None,
149 **kwargs,
150 ) -> http.ClientResponse:
151 """
152 Make a HTTP PUT request
153 """
154 response = await self._client.put(
155 path,
156 json=json,
157 data=data,
158 params=params,
159 headers=headers,
160 bearer=bearer,
161 x_real_ip=x_real_ip,
162 **kwargs,
163 )
164 return await self._wrap_client_response(response)
165
166 async def patch(
167 self,
168 path,
169 # pylint: disable=redefined-outer-name
170 json: JsonAnyOptional = None,
171 data: typing.Any = None,
172 params: typing.Optional[typing.Dict[str, str]] = None,
173 bearer: typing.Optional[str] = None,
174 x_real_ip: typing.Optional[str] = None,
175 headers: typing.Optional[typing.Dict[str, str]] = None,
176 **kwargs,
177 ) -> http.ClientResponse:
178 """
179 Make a HTTP PATCH request
180 """
181 response = await self._client.patch(
182 path,
183 json=json,
184 data=data,
185 params=params,
186 headers=headers,
187 bearer=bearer,
188 x_real_ip=x_real_ip,
189 **kwargs,
190 )
191 return await self._wrap_client_response(response)
192
193 async def get(
194 self,
195 path: str,
196 headers: typing.Optional[typing.Dict[str, str]] = None,
197 bearer: typing.Optional[str] = None,
198 x_real_ip: typing.Optional[str] = None,
199 **kwargs,
200 ) -> http.ClientResponse:
201 """
202 Make a HTTP GET request
203 """
204 response = await self._client.get(
205 path,
206 headers=headers,
207 bearer=bearer,
208 x_real_ip=x_real_ip,
209 **kwargs,
210 )
211 return await self._wrap_client_response(response)
212
213 async def delete(
214 self,
215 path: str,
216 headers: typing.Optional[typing.Dict[str, str]] = None,
217 bearer: typing.Optional[str] = None,
218 x_real_ip: typing.Optional[str] = None,
219 **kwargs,
220 ) -> http.ClientResponse:
221 """
222 Make a HTTP DELETE request
223 """
224 response = await self._client.delete(
225 path,
226 headers=headers,
227 bearer=bearer,
228 x_real_ip=x_real_ip,
229 **kwargs,
230 )
231 return await self._wrap_client_response(response)
232
233 async def options(
234 self,
235 path: str,
236 headers: typing.Optional[typing.Dict[str, str]] = None,
237 bearer: typing.Optional[str] = None,
238 x_real_ip: typing.Optional[str] = None,
239 **kwargs,
240 ) -> http.ClientResponse:
241 """
242 Make a HTTP OPTIONS request
243 """
244 response = await self._client.options(
245 path,
246 headers=headers,
247 bearer=bearer,
248 x_real_ip=x_real_ip,
249 **kwargs,
250 )
251 return await self._wrap_client_response(response)
252
253 async def request(
254 self,
255 http_method: str,
256 path: str,
257 **kwargs,
258 ) -> http.ClientResponse:
259 """
260 Make a HTTP request with the specified method
261 """
262 response = await self._client.request(http_method, path, **kwargs)
263 return await self._wrap_client_response(response)
264
265 @property
267 """
268 @deprecated Use pytest_userver.client.Client directly instead.
269 """
270 return self._client
271
272 def _wrap_client_response(
273 self,
274 response: aiohttp.ClientResponse,
275 ) -> typing.Awaitable[http.ClientResponse]:
276 return http.wrap_client_response(response)
277
278
279# @cond
280
281
282def _wrap_client_error(func):
283 async def _wrapper(*args, **kwargs):
284 try:
285 return await func(*args, **kwargs)
286 except aiohttp.client_exceptions.ClientResponseError as exc:
287 raise http.HttpResponseError(
288 url=exc.request_info.url,
289 status=exc.status,
290 )
291
292 return _wrapper
293
294
295class AiohttpClientMonitor(service_client.AiohttpClient):
296 _config: TestsuiteClientConfig
297
298 def __init__(self, base_url, *, config: TestsuiteClientConfig, **kwargs):
299 super().__init__(base_url, **kwargs)
300 self._config = config
301
302 async def get_metrics(self, prefix=None):
303 if not self._config.server_monitor_path:
304 raise ConfigurationError(
305 'handler-server-monitor component is not configured',
306 )
307 params = {'format': 'internal'}
308 if prefix is not None:
309 params['prefix'] = prefix
310 response = await self.get(
311 self._config.server_monitor_path,
312 params=params,
313 )
314 async with response:
315 response.raise_for_status()
316 return await response.json(content_type=None)
317
318 async def get_metric(self, metric_name):
319 metrics = await self.get_metrics(metric_name)
320 assert metric_name in metrics, (
321 f'No metric with name {metric_name!r}. Use "single_metric" function instead of "get_metric"'
322 )
323 return metrics[metric_name]
324
325 async def metrics_raw(
326 self,
327 output_format,
328 *,
329 path: str = None,
330 prefix: str = None,
331 labels: typing.Optional[typing.Dict[str, str]] = None,
332 ) -> str:
333 if not self._config.server_monitor_path:
334 raise ConfigurationError(
335 'handler-server-monitor component is not configured',
336 )
337
338 params = {'format': output_format}
339 if prefix:
340 params['prefix'] = prefix
341
342 if path:
343 params['path'] = path
344
345 if labels:
346 params['labels'] = json.dumps(labels)
347
348 response = await self.get(
349 self._config.server_monitor_path,
350 params=params,
351 )
352 async with response:
353 response.raise_for_status()
354 return await response.text()
355
356 async def metrics(
357 self,
358 *,
359 path: str = None,
360 prefix: str = None,
361 labels: typing.Optional[typing.Dict[str, str]] = None,
362 ) -> metric_module.MetricsSnapshot:
363 response = await self.metrics_raw(
364 output_format='json',
365 path=path,
366 prefix=prefix,
367 labels=labels,
368 )
369 return metric_module.MetricsSnapshot.from_json(str(response))
370
371 async def single_metric_optional(
372 self,
373 path: str,
374 *,
375 labels: typing.Optional[typing.Dict[str, str]] = None,
376 ) -> typing.Optional[Metric]:
377 response = await self.metrics(path=path, labels=labels)
378 metrics_list = response.get(path, [])
379
380 assert len(metrics_list) <= 1, (f'More than one metric found for path {path} and labels {labels}: {response}',)
381
382 if not metrics_list:
383 return None
384
385 return next(iter(metrics_list))
386
387 async def single_metric(
388 self,
389 path: str,
390 *,
391 labels: typing.Optional[typing.Dict[str, str]] = None,
392 ) -> Metric:
393 value = await self.single_metric_optional(path, labels=labels)
394 assert value is not None, (f'No metric was found for path {path} and labels {labels}',)
395 return value
396
397
398# @endcond
399
400
402 """
403 Asyncio userver client for monitor listeners, typically retrieved from
404 plugins.service_client.monitor_client fixture.
405
406 Compatible with werkzeug interface.
407
408 @ingroup userver_testsuite
409 """
410
412 self,
413 *,
414 path: typing.Optional[str] = None,
415 prefix: typing.Optional[str] = None,
416 labels: typing.Optional[typing.Dict[str, str]] = None,
417 diff_gauge: bool = False,
418 ) -> 'MetricsDiffer':
419 """
420 Creates a `MetricsDiffer` that fetches metrics using this client.
421 It's recommended to use this method over `metrics` to make sure
422 the tests don't affect each other.
423
424 With `diff_gauge` off, only RATE metrics are differentiated.
425 With `diff_gauge` on, GAUGE metrics are differentiated as well,
426 which may lead to nonsensical results for those.
427
428 @param path Optional full metric path
429 @param prefix Optional prefix on which the metric paths should start
430 @param labels Optional dictionary of labels that must be in the metric
431 @param diff_gauge Whether to differentiate GAUGE metrics
432
433 @code
434 async with monitor_client.metrics_diff(prefix='foo') as differ:
435 # Do something that makes the service update its metrics
436 assert differ.value_at('path-suffix', {'label'}) == 42
437 @endcode
438 """
439 return MetricsDiffer(
440 _client=self,
441 _path=path,
442 _prefix=prefix,
443 _labels=labels,
444 _diff_gauge=diff_gauge,
445 )
446
447 @_wrap_client_error
448 async def metrics(
449 self,
450 *,
451 path: typing.Optional[str] = None,
452 prefix: typing.Optional[str] = None,
453 labels: typing.Optional[typing.Dict[str, str]] = None,
454 ) -> metric_module.MetricsSnapshot:
455 """
456 Returns a dict of metric names to Metric.
457
458 @param path Optional full metric path
459 @param prefix Optional prefix on which the metric paths should start
460 @param labels Optional dictionary of labels that must be in the metric
461 """
462 return await self._client.metrics(
463 path=path,
464 prefix=prefix,
465 labels=labels,
466 )
467
468 @_wrap_client_error
470 self,
471 path: str,
472 *,
473 labels: typing.Optional[typing.Dict[str, str]] = None,
474 ) -> typing.Optional[Metric]:
475 """
476 Either return a Metric or None if there's no such metric.
477
478 @param path Full metric path
479 @param labels Optional dictionary of labels that must be in the metric
480
481 @throws AssertionError if more than one metric returned
482 """
483 return await self._client.single_metric_optional(path, labels=labels)
484
485 @_wrap_client_error
486 async def single_metric(
487 self,
488 path: str,
489 *,
490 labels: typing.Optional[typing.Dict[str, str]] = None,
491 ) -> typing.Optional[Metric]:
492 """
493 Returns the Metric.
494
495 @param path Full metric path
496 @param labels Optional dictionary of labels that must be in the metric
497
498 @throws AssertionError if more than one metric or no metric found
499 """
500 return await self._client.single_metric(path, labels=labels)
501
502 @_wrap_client_error
503 async def metrics_raw(
504 self,
505 output_format: str,
506 *,
507 path: typing.Optional[str] = None,
508 prefix: typing.Optional[str] = None,
509 labels: typing.Optional[typing.Dict[str, str]] = None,
510 ) -> typing.Dict[str, Metric]:
511 """
512 Low level function that returns metrics in a specific format.
513 Use `metrics` and `single_metric` instead if possible.
514
515 @param output_format Metric output format. See
516 server::handlers::ServerMonitor for a list of supported formats.
517 @param path Optional full metric path
518 @param prefix Optional prefix on which the metric paths should start
519 @param labels Optional dictionary of labels that must be in the metric
520 """
521 return await self._client.metrics_raw(
522 output_format=output_format,
523 path=path,
524 prefix=prefix,
525 labels=labels,
526 )
527
528 @_wrap_client_error
529 async def get_metrics(self, prefix=None):
530 """
531 @deprecated Use metrics() or single_metric() instead
532 """
533 return await self._client.get_metrics(prefix=prefix)
534
535 @_wrap_client_error
536 async def get_metric(self, metric_name):
537 """
538 @deprecated Use metrics() or single_metric() instead
539 """
540 return await self._client.get_metric(metric_name)
541
542 @_wrap_client_error
543 async def fired_alerts(self):
544 response = await self._client.get('/service/fired-alerts')
545 assert response.status == 200
546 return (await response.json())['alerts']
547
548
550 """
551 A helper class for computing metric differences.
552
553 @see ClientMonitor.metrics_diff
554 @ingroup userver_testsuite
555 """
556
557 # @cond
558 def __init__(
559 self,
560 _client: ClientMonitor,
561 _path: typing.Optional[str],
562 _prefix: typing.Optional[str],
563 _labels: typing.Optional[typing.Dict[str, str]],
564 _diff_gauge: bool,
565 ):
566 self._client = _client
567 self._path = _path
568 self._prefix = _prefix
569 self._labels = _labels
570 self._diff_gauge = _diff_gauge
571 self._baseline: typing.Optional[metric_module.MetricsSnapshot] = None
572 self._current: typing.Optional[metric_module.MetricsSnapshot] = None
573 self._diff: typing.Optional[metric_module.MetricsSnapshot] = None
574
575 # @endcond
576
577 @property
578 def baseline(self) -> metric_module.MetricsSnapshot:
579 assert self._baseline is not None
580 return self._baseline
581
582 @baseline.setter
583 def baseline(self, value: metric_module.MetricsSnapshot) -> None:
584 self._baseline = value
585 if self._current is not None:
586 self._diff = _subtract_metrics_snapshots(
587 self._current,
588 self._baseline,
589 self._diff_gauge,
590 )
591
592 @property
593 def current(self) -> metric_module.MetricsSnapshot:
594 assert self._current is not None, 'Set self.current first'
595 return self._current
596
597 @current.setter
598 def current(self, value: metric_module.MetricsSnapshot) -> None:
599 self._current = value
600 assert self._baseline is not None, 'Set self.baseline first'
601 self._diff = _subtract_metrics_snapshots(
602 self._current,
603 self._baseline,
604 self._diff_gauge,
605 )
606
607 @property
608 def diff(self) -> metric_module.MetricsSnapshot:
609 assert self._diff is not None, 'Set self.current first'
610 return self._diff
611
613 self,
614 subpath: typing.Optional[str] = None,
615 add_labels: typing.Optional[typing.Dict] = None,
616 *,
617 default: typing.Optional[float] = None,
618 ) -> metric_module.MetricValue:
619 """
620 Returns a single metric value at the specified path, prepending
621 the path provided at construction. If a dict of labels is provided,
622 does en exact match of labels, prepending the labels provided
623 at construction.
624
625 @param subpath Suffix of the metric path; the path provided
626 at construction is prepended
627 @param add_labels Labels that the metric must have in addition
628 to the labels provided at construction
629 @param default An optional default value in case the metric is missing
630 @throws AssertionError if not one metric by path
631 """
632 base_path = self._path or self._prefix
633 if base_path and subpath:
634 path = f'{base_path}.{subpath}'
635 else:
636 assert base_path or subpath, 'No path provided'
637 path = base_path or subpath or ''
638 labels: typing.Optional[dict] = None
639 if self._labels is not None or add_labels is not None:
640 labels = {**(self._labels or {}), **(add_labels or {})}
641 return self.diff.value_at(path, labels, default=default)
642
643 async def fetch(self) -> metric_module.MetricsSnapshot:
644 """
645 Fetches metric values from the service.
646 """
647 return await self._client.metrics(
648 path=self._path,
649 prefix=self._prefix,
650 labels=self._labels,
651 )
652
653 async def __aenter__(self) -> 'MetricsDiffer':
654 self._baseline = await self.fetch()
655 self._current = None
656 return self
657
658 async def __aexit__(self, exc_type, exc, exc_tb) -> None:
659 self.current = await self.fetch()
660
661
662# @cond
663
664
665def _subtract_metrics_snapshots(
666 current: metric_module.MetricsSnapshot,
667 initial: metric_module.MetricsSnapshot,
668 diff_gauge: bool,
669) -> metric_module.MetricsSnapshot:
670 return metric_module.MetricsSnapshot({
671 path: {_subtract_metrics(path, current_metric, initial, diff_gauge) for current_metric in current_group}
672 for path, current_group in current.items()
673 })
674
675
676def _subtract_metrics(
677 path: str,
678 current_metric: metric_module.Metric,
679 initial: metric_module.MetricsSnapshot,
680 diff_gauge: bool,
681) -> metric_module.Metric:
682 initial_group = initial.get(path, None)
683 if initial_group is None:
684 return current_metric
685 initial_metric = next(
686 (x for x in initial_group if x.labels == current_metric.labels),
687 None,
688 )
689 if initial_metric is None:
690 return current_metric
691
692 return metric_module.Metric(
693 labels=current_metric.labels,
694 value=_subtract_metric_values(
695 current=current_metric,
696 initial=initial_metric,
697 diff_gauge=diff_gauge,
698 ),
699 _type=current_metric.type(),
700 )
701
702
703def _subtract_metric_values(
704 current: metric_module.Metric,
705 initial: metric_module.Metric,
706 diff_gauge: bool,
707) -> metric_module.MetricValue:
708 assert current.type() is not metric_module.MetricType.UNSPECIFIED
709 assert initial.type() is not metric_module.MetricType.UNSPECIFIED
710 assert current.type() == initial.type()
711
712 if isinstance(current.value, metric_module.Histogram):
713 assert isinstance(initial.value, metric_module.Histogram)
714 return _subtract_metric_values_hist(current=current, initial=initial)
715 else:
716 assert not isinstance(initial.value, metric_module.Histogram)
717 return _subtract_metric_values_num(
718 current=current,
719 initial=initial,
720 diff_gauge=diff_gauge,
721 )
722
723
724def _subtract_metric_values_num(
725 current: metric_module.Metric,
726 initial: metric_module.Metric,
727 diff_gauge: bool,
728) -> float:
729 current_value = typing.cast(float, current.value)
730 initial_value = typing.cast(float, initial.value)
731 should_diff = (
732 current.type() is metric_module.MetricType.RATE or initial.type() is metric_module.MetricType.RATE or diff_gauge
733 )
734 return current_value - initial_value if should_diff else current_value
735
736
737def _subtract_metric_values_hist(
738 current: metric_module.Metric,
739 initial: metric_module.Metric,
740) -> metric_module.Histogram:
741 current_value = typing.cast(metric_module.Histogram, current.value)
742 initial_value = typing.cast(metric_module.Histogram, initial.value)
743 assert current_value.bounds == initial_value.bounds
744 return metric_module.Histogram(
745 bounds=current_value.bounds,
746 buckets=[t[0] - t[1] for t in zip(current_value.buckets, initial_value.buckets)],
747 inf=current_value.inf - initial_value.inf,
748 )
749
750
751class AiohttpClient(service_client.AiohttpClient):
752 PeriodicTaskFailed = PeriodicTaskFailed
753 TestsuiteActionFailed = TestsuiteActionFailed
754 TestsuiteTaskNotFound = TestsuiteTaskNotFound
755 TestsuiteTaskConflict = TestsuiteTaskConflict
756 TestsuiteTaskFailed = TestsuiteTaskFailed
757
758 def __init__(
759 self,
760 base_url: str,
761 *,
762 config: TestsuiteClientConfig,
763 mocked_time,
764 log_capture_fixture: logcapture.CaptureServer,
765 testpoint,
766 testpoint_control,
767 cache_invalidation_state,
768 span_id_header=None,
769 api_coverage_report=None,
770 periodic_tasks_state: typing.Optional[PeriodicTasksState] = None,
771 allow_all_caches_invalidation: bool = True,
772 cache_control: typing.Optional[caches.CacheControl] = None,
773 asyncexc_check=None,
774 **kwargs,
775 ):
776 super().__init__(base_url, span_id_header=span_id_header, **kwargs)
777 self._config = config
778 self._periodic_tasks = periodic_tasks_state
779 self._testpoint = testpoint
780 self._log_capture_fixture = log_capture_fixture
781 self._state_manager = _StateManager(
782 mocked_time=mocked_time,
783 testpoint=self._testpoint,
784 testpoint_control=testpoint_control,
785 invalidation_state=cache_invalidation_state,
786 cache_control=cache_control,
787 )
788 self._api_coverage_report = api_coverage_report
789 self._allow_all_caches_invalidation = allow_all_caches_invalidation
790 self._asyncexc_check = asyncexc_check
791
792 async def run_periodic_task(self, name):
793 response = await self._testsuite_action('run_periodic_task', name=name)
794 if not response['status']:
795 raise self.PeriodicTaskFailed(f'Periodic task {name} failed')
796
797 async def suspend_periodic_tasks(self, names: typing.List[str]) -> None:
798 if not self._periodic_tasks:
799 raise ConfigurationError('No periodic_tasks_state given')
800 self._periodic_tasks.tasks_to_suspend.update(names)
801 await self._suspend_periodic_tasks()
802
803 async def resume_periodic_tasks(self, names: typing.List[str]) -> None:
804 if not self._periodic_tasks:
805 raise ConfigurationError('No periodic_tasks_state given')
806 self._periodic_tasks.tasks_to_suspend.difference_update(names)
807 await self._suspend_periodic_tasks()
808
809 async def resume_all_periodic_tasks(self) -> None:
810 if not self._periodic_tasks:
811 raise ConfigurationError('No periodic_tasks_state given')
812 self._periodic_tasks.tasks_to_suspend.clear()
813 await self._suspend_periodic_tasks()
814
815 async def write_cache_dumps(
816 self,
817 names: typing.List[str],
818 *,
819 testsuite_skip_prepare=False,
820 ) -> None:
821 await self._testsuite_action(
822 'write_cache_dumps',
823 names=names,
824 testsuite_skip_prepare=testsuite_skip_prepare,
825 )
826
827 async def read_cache_dumps(
828 self,
829 names: typing.List[str],
830 *,
831 testsuite_skip_prepare=False,
832 ) -> None:
833 await self._testsuite_action(
834 'read_cache_dumps',
835 names=names,
836 testsuite_skip_prepare=testsuite_skip_prepare,
837 )
838
839 async def run_distlock_task(self, name: str) -> None:
840 await self.run_task(f'distlock/{name}')
841
842 async def reset_metrics(self) -> None:
843 await self._testsuite_action('reset_metrics')
844
845 async def metrics_portability(
846 self,
847 *,
848 prefix: typing.Optional[str] = None,
849 ) -> typing.Dict[str, typing.List[typing.Dict[str, str]]]:
850 return await self._testsuite_action(
851 'metrics_portability',
852 prefix=prefix,
853 )
854
855 async def list_tasks(self) -> typing.List[str]:
856 response = await self._do_testsuite_action('tasks_list')
857 async with response:
858 response.raise_for_status()
859 body = await response.json(content_type=None)
860 return body['tasks']
861
862 async def run_task(self, name: str) -> None:
863 response = await self._do_testsuite_action(
864 'task_run',
865 json={'name': name},
866 )
867 await _task_check_response(name, response)
868
869 @contextlib.asynccontextmanager
870 async def spawn_task(self, name: str):
871 task_id = await self._task_spawn(name)
872 try:
873 yield
874 finally:
875 await self._task_stop_spawned(task_id)
876
877 async def _task_spawn(self, name: str) -> str:
878 response = await self._do_testsuite_action(
879 'task_spawn',
880 json={'name': name},
881 )
882 data = await _task_check_response(name, response)
883 return data['task_id']
884
885 async def _task_stop_spawned(self, task_id: str) -> None:
886 response = await self._do_testsuite_action(
887 'task_stop',
888 json={'task_id': task_id},
889 )
890 await _task_check_response(task_id, response)
891
892 async def http_allowed_urls_extra(
893 self,
894 http_allowed_urls_extra: typing.List[str],
895 ) -> None:
896 await self._do_testsuite_action(
897 'http_allowed_urls_extra',
898 json={'allowed_urls_extra': http_allowed_urls_extra},
899 testsuite_skip_prepare=True,
900 )
901
902 @contextlib.asynccontextmanager
903 async def capture_logs(
904 self,
905 *,
906 log_level: str = 'DEBUG',
907 testsuite_skip_prepare: bool = False,
908 ):
909 async with self._log_capture_fixture.capture(
910 log_level=logcapture.LogLevel.from_string(log_level),
911 ) as capture:
912 logger.debug('Starting logcapture')
913 await self._testsuite_action(
914 'log_capture',
915 log_level=log_level,
916 socket_logging_duplication=True,
917 testsuite_skip_prepare=testsuite_skip_prepare,
918 )
919
920 try:
921 await self._log_capture_fixture.wait_for_client()
922 yield capture
923 finally:
924 await self._testsuite_action(
925 'log_capture',
926 log_level=self._log_capture_fixture.default_log_level.name,
927 socket_logging_duplication=False,
928 testsuite_skip_prepare=testsuite_skip_prepare,
929 )
930
931 async def log_flush(self, logger_name: typing.Optional[str] = None):
932 await self._testsuite_action(
933 'log_flush',
934 logger_name=logger_name,
935 testsuite_skip_prepare=True,
936 )
937
938 async def invalidate_caches(
939 self,
940 *,
941 clean_update: bool = True,
942 cache_names: typing.Optional[typing.List[str]] = None,
943 testsuite_skip_prepare: bool = False,
944 ) -> None:
945 if cache_names is None and clean_update:
946 if self._allow_all_caches_invalidation:
947 warnings.warn(CACHE_INVALIDATION_MESSAGE, DeprecationWarning)
948 else:
949 __tracebackhide__ = True
950 raise RuntimeError(CACHE_INVALIDATION_MESSAGE)
951
952 if testsuite_skip_prepare:
953 await self._tests_control({
954 'invalidate_caches': {
955 'update_type': ('full' if clean_update else 'incremental'),
956 **({'names': cache_names} if cache_names else {}),
957 },
958 })
959 else:
960 await self.tests_control(
961 invalidate_caches=True,
962 clean_update=clean_update,
963 cache_names=cache_names,
964 )
965
966 async def tests_control(
967 self,
968 *,
969 invalidate_caches: bool = True,
970 clean_update: bool = True,
971 cache_names: typing.Optional[typing.List[str]] = None,
972 http_allowed_urls_extra: typing.Optional[typing.List[str]] = None,
973 ) -> typing.Dict[str, typing.Any]:
974 body: typing.Dict[str, typing.Any] = self._state_manager.get_pending_update()
975
976 if 'invalidate_caches' in body and invalidate_caches:
977 if not clean_update or cache_names:
978 logger.warning(
979 'Manual cache invalidation leads to indirect initial full cache invalidation',
980 )
981 await self._prepare()
982 body = {}
983
984 if invalidate_caches:
985 body['invalidate_caches'] = {
986 'update_type': ('full' if clean_update else 'incremental'),
987 }
988 if cache_names:
989 body['invalidate_caches']['names'] = cache_names
990
991 if http_allowed_urls_extra is not None:
992 await self.http_allowed_urls_extra(http_allowed_urls_extra)
993
994 return await self._tests_control(body)
995
996 async def update_server_state(self) -> None:
997 await self._prepare()
998
999 async def enable_testpoints(self, *, no_auto_cache_cleanup=False) -> None:
1000 if not self._testpoint:
1001 return
1002 if no_auto_cache_cleanup:
1003 await self._tests_control({
1004 'testpoints': sorted(self._testpoint.keys()),
1005 })
1006 else:
1007 await self.update_server_state()
1008
1009 async def get_dynamic_config_defaults(
1010 self,
1011 ) -> typing.Dict[str, typing.Any]:
1012 return await self._testsuite_action(
1013 'get_dynamic_config_defaults',
1014 testsuite_skip_prepare=True,
1015 )
1016
1017 async def _tests_control(self, body: dict) -> typing.Dict[str, typing.Any]:
1018 with self._state_manager.updating_state(body):
1019 async with await self._do_testsuite_action(
1020 'control',
1021 json=body,
1022 testsuite_skip_prepare=True,
1023 ) as response:
1024 if response.status == 404:
1025 raise ConfigurationError(
1026 'It seems that testsuite support is not enabled for your service',
1027 )
1028 response.raise_for_status()
1029 return await response.json(content_type=None)
1030
1031 async def _suspend_periodic_tasks(self):
1032 if self._periodic_tasks.tasks_to_suspend != self._periodic_tasks.suspended_tasks:
1033 await self._testsuite_action(
1034 'suspend_periodic_tasks',
1035 names=sorted(self._periodic_tasks.tasks_to_suspend),
1036 )
1037 self._periodic_tasks.suspended_tasks = set(
1038 self._periodic_tasks.tasks_to_suspend,
1039 )
1040
1041 def _do_testsuite_action(self, action, **kwargs):
1042 if not self._config.testsuite_action_path:
1043 raise ConfigurationError(
1044 'tests-control component is not properly configured',
1045 )
1046 path = self._config.testsuite_action_path.format(action=action)
1047 return self.post(path, **kwargs)
1048
1049 async def _testsuite_action(
1050 self,
1051 action,
1052 *,
1053 testsuite_skip_prepare=False,
1054 **kwargs,
1055 ):
1056 async with await self._do_testsuite_action(
1057 action,
1058 json=kwargs,
1059 testsuite_skip_prepare=testsuite_skip_prepare,
1060 ) as response:
1061 if response.status == 500:
1062 raise TestsuiteActionFailed
1063 response.raise_for_status()
1064 return await response.json(content_type=None)
1065
1066 async def _prepare(self) -> None:
1067 with self._state_manager.cache_control_update() as pending_update:
1068 if pending_update:
1069 await self._tests_control(pending_update)
1070
1071 async def _request( # pylint: disable=arguments-differ
1072 self,
1073 http_method: str,
1074 path: str,
1075 headers: typing.Optional[typing.Dict[str, str]] = None,
1076 bearer: typing.Optional[str] = None,
1077 x_real_ip: typing.Optional[str] = None,
1078 *,
1079 testsuite_skip_prepare: bool = False,
1080 **kwargs,
1081 ) -> aiohttp.ClientResponse:
1082 if self._asyncexc_check:
1083 # Check for pending background exceptions before call.
1084 self._asyncexc_check()
1085
1086 if not testsuite_skip_prepare:
1087 await self._prepare()
1088
1089 response = await super()._request(
1090 http_method,
1091 path,
1092 headers,
1093 bearer,
1094 x_real_ip,
1095 **kwargs,
1096 )
1097 if self._api_coverage_report:
1098 self._api_coverage_report.update_usage_stat(
1099 path,
1100 http_method,
1101 response.status,
1102 response.content_type,
1103 )
1104
1105 if self._asyncexc_check:
1106 # Check for pending background exceptions after call.
1107 self._asyncexc_check()
1108
1109 return response
1110
1111
1112# @endcond
1113
1114
1116 """
1117 Asyncio userver client, typically retrieved from
1118 @ref service_client "plugins.service_client.service_client"
1119 fixture.
1120
1121 Compatible with werkzeug interface.
1122
1123 @ingroup userver_testsuite
1124 """
1125
1126 PeriodicTaskFailed = PeriodicTaskFailed
1127 TestsuiteActionFailed = TestsuiteActionFailed
1128 TestsuiteTaskNotFound = TestsuiteTaskNotFound
1129 TestsuiteTaskConflict = TestsuiteTaskConflict
1130 TestsuiteTaskFailed = TestsuiteTaskFailed
1131
1132 def _wrap_client_response(
1133 self,
1134 response: aiohttp.ClientResponse,
1135 ) -> typing.Awaitable[http.ClientResponse]:
1136 return http.wrap_client_response(
1137 response,
1138 json_loads=approx.json_loads,
1139 )
1140
1141 @_wrap_client_error
1142 async def run_periodic_task(self, name):
1143 await self._client.run_periodic_task(name)
1144
1145 @_wrap_client_error
1146 async def suspend_periodic_tasks(self, names: typing.List[str]) -> None:
1147 await self._client.suspend_periodic_tasks(names)
1148
1149 @_wrap_client_error
1150 async def resume_periodic_tasks(self, names: typing.List[str]) -> None:
1151 await self._client.resume_periodic_tasks(names)
1152
1153 @_wrap_client_error
1154 async def resume_all_periodic_tasks(self) -> None:
1155 await self._client.resume_all_periodic_tasks()
1156
1157 @_wrap_client_error
1158 async def write_cache_dumps(
1159 self,
1160 names: typing.List[str],
1161 *,
1162 testsuite_skip_prepare=False,
1163 ) -> None:
1164 await self._client.write_cache_dumps(
1165 names=names,
1166 testsuite_skip_prepare=testsuite_skip_prepare,
1167 )
1168
1169 @_wrap_client_error
1170 async def read_cache_dumps(
1171 self,
1172 names: typing.List[str],
1173 *,
1174 testsuite_skip_prepare=False,
1175 ) -> None:
1176 await self._client.read_cache_dumps(
1177 names=names,
1178 testsuite_skip_prepare=testsuite_skip_prepare,
1179 )
1180
1181 async def run_task(self, name: str) -> None:
1182 await self._client.run_task(name)
1183
1184 async def run_distlock_task(self, name: str) -> None:
1185 await self._client.run_distlock_task(name)
1186
1187 async def reset_metrics(self) -> None:
1188 """
1189 Calls `ResetMetric(metric);` for each metric that has such C++ function
1190 """
1191 await self._client.reset_metrics()
1192
1194 self,
1195 *,
1196 prefix: typing.Optional[str] = None,
1197 ) -> typing.Dict[str, typing.List[typing.Dict[str, str]]]:
1198 """
1199 Reports metrics related issues that could be encountered on
1200 different monitoring systems.
1201
1202 @sa @ref utils::statistics::GetPortabilityWarnings
1203 """
1204 return await self._client.metrics_portability(prefix=prefix)
1205
1206 def list_tasks(self) -> typing.List[str]:
1207 return self._client.list_tasks()
1208
1209 def spawn_task(self, name: str):
1210 return self._client.spawn_task(name)
1211
1213 self,
1214 *,
1215 log_level: str = 'DEBUG',
1216 testsuite_skip_prepare: bool = False,
1217 ):
1218 """
1219 Captures logs from the service.
1220
1221 @param log_level Do not capture logs below this level.
1222 @param testsuite_skip_prepare An advanced parameter to skip auto-`update_server_state`.
1223
1224 @see @ref testsuite_logs_capture
1225 """
1226 return self._client.capture_logs(
1227 log_level=log_level,
1228 testsuite_skip_prepare=testsuite_skip_prepare,
1229 )
1230
1231 def log_flush(self, logger_name: typing.Optional[str] = None):
1232 """
1233 Flush service logs.
1234 """
1235 return self._client.log_flush(logger_name=logger_name)
1236
1237 @_wrap_client_error
1239 self,
1240 *,
1241 clean_update: bool = True,
1242 cache_names: typing.Optional[typing.List[str]] = None,
1243 testsuite_skip_prepare: bool = False,
1244 ) -> None:
1245 """
1246 Send request to service to update caches.
1247
1248 @param clean_update if False, service will do a faster incremental
1249 update of caches whenever possible.
1250 @param cache_names which caches specifically should be updated;
1251 update all if None.
1252 @param testsuite_skip_prepare if False, service will automatically do
1253 update_server_state().
1254 """
1255 __tracebackhide__ = True
1256 await self._client.invalidate_caches(
1257 clean_update=clean_update,
1258 cache_names=cache_names,
1259 testsuite_skip_prepare=testsuite_skip_prepare,
1260 )
1261
1262 @_wrap_client_error
1263 async def tests_control(
1264 self,
1265 invalidate_caches: bool = True,
1266 clean_update: bool = True,
1267 cache_names: typing.Optional[typing.List[str]] = None,
1268 http_allowed_urls_extra: typing.Optional[typing.List[str]] = None,
1269 ) -> typing.Dict[str, typing.Any]:
1270 return await self._client.tests_control(
1271 invalidate_caches=invalidate_caches,
1272 clean_update=clean_update,
1273 cache_names=cache_names,
1274 http_allowed_urls_extra=http_allowed_urls_extra,
1275 )
1276
1277 @_wrap_client_error
1278 async def update_server_state(self) -> None:
1279 """
1280 Update service-side state through http call to 'tests/control':
1281 - clear dirty (from other tests) caches
1282 - set service-side mocked time,
1283 - resume / suspend periodic tasks
1284 - enable testpoints
1285 If service is up-to-date, does nothing.
1286 """
1287 await self._client.update_server_state()
1288
1289 @_wrap_client_error
1290 async def enable_testpoints(self, no_auto_cache_cleanup: bool = False) -> None:
1291 """
1292 Send list of handled testpoint pats to service. For these paths service
1293 will no more skip http calls from TESTPOINT(...) macro.
1294
1295 @param no_auto_cache_cleanup prevent automatic cache cleanup.
1296 When calling service client first time in scope of current test, client
1297 makes additional http call to `tests/control` to update caches, to get
1298 rid of data from previous test.
1299 """
1300 await self._client.enable_testpoints(no_auto_cache_cleanup=no_auto_cache_cleanup)
1301
1302 @_wrap_client_error
1303 async def get_dynamic_config_defaults(
1304 self,
1305 ) -> typing.Dict[str, typing.Any]:
1306 return await self._client.get_dynamic_config_defaults()
1307
1308
1309@dataclasses.dataclass
1311 """Reflects the (supposed) current service state."""
1312
1313 invalidation_state: caches.InvalidationState
1314 now: typing.Optional[str] = _UNKNOWN_STATE
1315 testpoints: typing.FrozenSet[str] = frozenset([_UNKNOWN_STATE])
1316
1317
1319 """
1320 Used for computing the requests that we need to automatically align
1321 the service state with the test fixtures state.
1322 """
1323
1324 def __init__(
1325 self,
1326 *,
1327 mocked_time,
1328 testpoint,
1329 testpoint_control,
1330 invalidation_state: caches.InvalidationState,
1331 cache_control: typing.Optional[caches.CacheControl],
1332 ):
1333 self._state = _State(
1334 invalidation_state=copy.deepcopy(invalidation_state),
1335 )
1336 self._mocked_time = mocked_time
1337 self._testpoint = testpoint
1338 self._testpoint_control = testpoint_control
1339 self._invalidation_state = invalidation_state
1340 self._cache_control = cache_control
1341
1342 @contextlib.contextmanager
1343 def updating_state(self, body: typing.Dict[str, typing.Any]):
1344 """
1345 Whenever `tests_control` handler is invoked
1346 (by the client itself during `prepare` or manually by the user),
1347 we need to synchronize `_state` with the (supposed) service state.
1348 The state update is decoded from the request body.
1349 """
1350 saved_state = copy.deepcopy(self._state)
1351 try:
1352 self._update_state(body)
1353 self._apply_new_state()
1354 yield
1355 except Exception: # noqa
1356 self._state = saved_state
1357 self._apply_new_state()
1358 raise
1359
1360 def get_pending_update(self) -> typing.Dict[str, typing.Any]:
1361 """
1362 Compose the body of the `tests_control` request required to completely
1363 synchronize the service state with the state of test fixtures.
1364 """
1365 body: typing.Dict[str, typing.Any] = {}
1366
1367 if self._invalidation_state.has_caches_to_update:
1368 body['invalidate_caches'] = {'update_type': 'full'}
1369 if not self._invalidation_state.should_update_all_caches:
1370 body['invalidate_caches']['names'] = list(
1371 self._invalidation_state.caches_to_update,
1372 )
1373
1374 desired_testpoints = self._testpoint.keys()
1375 if self._state.testpoints != frozenset(desired_testpoints):
1376 body['testpoints'] = sorted(desired_testpoints)
1377
1378 desired_now = self._get_desired_now()
1379 if self._state.now != desired_now:
1380 body['mock_now'] = desired_now
1381
1382 return body
1383
1384 @contextlib.contextmanager
1385 def cache_control_update(self) -> typing.ContextManager[typing.Dict]:
1386 pending_update = self.get_pending_update()
1387 invalidate_caches = pending_update.get('invalidate_caches')
1388 if not invalidate_caches or not self._cache_control:
1389 yield pending_update
1390 else:
1391 cache_names = invalidate_caches.get('names')
1392 staged, actions = self._cache_control.query_caches(cache_names)
1393 self._apply_cache_control_actions(invalidate_caches, actions)
1394 yield pending_update
1395 self._cache_control.commit_staged(staged)
1396
1397 @staticmethod
1398 def _apply_cache_control_actions(
1399 invalidate_caches: typing.Dict,
1400 actions: typing.List[typing.Tuple[str, caches.CacheControlAction]],
1401 ) -> None:
1402 cache_names = invalidate_caches.get('names')
1403 exclude_names = invalidate_caches.setdefault('exclude_names', [])
1404 force_incremental_names = invalidate_caches.setdefault(
1405 'force_incremental_names',
1406 [],
1407 )
1408 for cache_name, action in actions:
1409 if action == caches.CacheControlAction.INCREMENTAL:
1410 force_incremental_names.append(cache_name)
1411 elif action == caches.CacheControlAction.EXCLUDE:
1412 if cache_names is not None:
1413 cache_names.remove(cache_name)
1414 else:
1415 exclude_names.append(cache_name)
1416
1417 def _update_state(self, body: typing.Dict[str, typing.Any]) -> None:
1418 body_invalidate_caches = body.get('invalidate_caches', {})
1419 update_type = body_invalidate_caches.get('update_type', 'full')
1420 body_cache_names = body_invalidate_caches.get('names', None)
1421 # An incremental update is considered insufficient to bring a cache
1422 # to a known state.
1423 if body_invalidate_caches and update_type == 'full':
1424 if body_cache_names is None:
1425 self._state.invalidation_state.on_all_caches_updated()
1426 else:
1427 self._state.invalidation_state.on_caches_updated(
1428 body_cache_names,
1429 )
1430
1431 if 'mock_now' in body:
1432 self._state.now = body['mock_now']
1433
1434 testpoints: typing.Optional[typing.List[str]] = body.get(
1435 'testpoints',
1436 None,
1437 )
1438 if testpoints is not None:
1439 self._state.testpoints = frozenset(testpoints)
1440
1442 """Apply new state to related components."""
1443 self._testpoint_control.enabled_testpoints = self._state.testpoints
1444 self._invalidation_state.assign_copy(self._state.invalidation_state)
1445
1446 def _get_desired_now(self) -> typing.Optional[str]:
1447 if self._mocked_time.is_enabled:
1448 return utils.timestring(self._mocked_time.now())
1449 return None
1450
1451
1452async def _task_check_response(name: str, response) -> dict:
1453 async with response:
1454 if response.status == 404:
1455 raise TestsuiteTaskNotFound(f'Testsuite task {name!r} not found')
1456 if response.status == 409:
1457 raise TestsuiteTaskConflict(f'Testsuite task {name!r} conflict')
1458 assert response.status == 200
1459 data = await response.json()
1460 if not data.get('status', True):
1461 raise TestsuiteTaskFailed(name, data['reason'])
1462 return data