userver: /data/code/userver/testsuite/pytest_plugins/pytest_userver/client.py Source File
Loading...
Searching...
No Matches
client.py
1"""
2Python module that provides clients for functional tests with
3testsuite; see
4@ref scripts/docs/en/userver/functional_testing.md for an introduction.
5
6@ingroup userver_testsuite
7"""
8
9# pylint: disable=too-many-lines
10
11import contextlib
12import copy
13import dataclasses
14import json
15import logging
16import typing
17import warnings
18
19import aiohttp
20
21from testsuite import annotations
22from testsuite import utils
23from testsuite.daemons import service_client
24from testsuite.utils import approx
25from testsuite.utils import http
26
27import pytest_userver.metrics as metric_module # pylint: disable=import-error
28from pytest_userver.plugins import caches
29
30# @cond
31logger = logging.getLogger(__name__)
32# @endcond
33
34_UNKNOWN_STATE = '__UNKNOWN__'
35
36CACHE_INVALIDATION_MESSAGE = (
37 'Direct cache invalidation is deprecated.\n'
38 '\n'
39 ' - Use client.update_server_state() to synchronize service state\n'
40 ' - Explicitly pass cache names to invalidate, e.g.: '
41 'invalidate_caches(cache_names=[...]).'
42)
43
44
45class BaseError(Exception):
46 """Base class for exceptions of this module."""
47
48
50 pass
51
52
54 pass
55
56
66 pass
67
68
70 pass
71
72
74 def __init__(self):
75 self.suspended_tasks: typing.Set[str] = set()
76 self.tasks_to_suspend: typing.Set[str] = set()
77
78
80 def __init__(self, name, reason):
81 self.name = name
82 self.reason = reason
83 super().__init__(f'Testsuite task {name!r} failed: {reason}')
84
85
86@dataclasses.dataclass(frozen=True)
88 testsuite_action_path: typing.Optional[str] = None
89 server_monitor_path: typing.Optional[str] = None
90
91
92Metric = metric_module.Metric
93
94
96 """
97 Base asyncio userver client that implements HTTP requests to service.
98
99 Compatible with werkzeug interface.
100
101 @ingroup userver_testsuite
102 """
103
104 def __init__(self, client):
105 self._client = client
106
107 async def post(
108 self,
109 path: str,
110 # pylint: disable=redefined-outer-name
111 json: annotations.JsonAnyOptional = None,
112 data: typing.Any = None,
113 params: typing.Optional[typing.Dict[str, str]] = None,
114 bearer: typing.Optional[str] = None,
115 x_real_ip: typing.Optional[str] = None,
116 headers: typing.Optional[typing.Dict[str, str]] = None,
117 **kwargs,
118 ) -> http.ClientResponse:
119 """
120 Make a HTTP POST request
121 """
122 response = await self._client.post(
123 path,
124 json=json,
125 data=data,
126 params=params,
127 headers=headers,
128 bearer=bearer,
129 x_real_ip=x_real_ip,
130 **kwargs,
131 )
132 return await self._wrap_client_response(response)
133
134 async def put(
135 self,
136 path,
137 # pylint: disable=redefined-outer-name
138 json: annotations.JsonAnyOptional = None,
139 data: typing.Any = None,
140 params: typing.Optional[typing.Dict[str, str]] = None,
141 bearer: typing.Optional[str] = None,
142 x_real_ip: typing.Optional[str] = None,
143 headers: typing.Optional[typing.Dict[str, str]] = None,
144 **kwargs,
145 ) -> http.ClientResponse:
146 """
147 Make a HTTP PUT request
148 """
149 response = await self._client.put(
150 path,
151 json=json,
152 data=data,
153 params=params,
154 headers=headers,
155 bearer=bearer,
156 x_real_ip=x_real_ip,
157 **kwargs,
158 )
159 return await self._wrap_client_response(response)
160
161 async def patch(
162 self,
163 path,
164 # pylint: disable=redefined-outer-name
165 json: annotations.JsonAnyOptional = None,
166 data: typing.Any = None,
167 params: typing.Optional[typing.Dict[str, str]] = None,
168 bearer: typing.Optional[str] = None,
169 x_real_ip: typing.Optional[str] = None,
170 headers: typing.Optional[typing.Dict[str, str]] = None,
171 **kwargs,
172 ) -> http.ClientResponse:
173 """
174 Make a HTTP PATCH request
175 """
176 response = await self._client.patch(
177 path,
178 json=json,
179 data=data,
180 params=params,
181 headers=headers,
182 bearer=bearer,
183 x_real_ip=x_real_ip,
184 **kwargs,
185 )
186 return await self._wrap_client_response(response)
187
188 async def get(
189 self,
190 path: str,
191 headers: typing.Optional[typing.Dict[str, str]] = None,
192 bearer: typing.Optional[str] = None,
193 x_real_ip: typing.Optional[str] = None,
194 **kwargs,
195 ) -> http.ClientResponse:
196 """
197 Make a HTTP GET request
198 """
199 response = await self._client.get(
200 path,
201 headers=headers,
202 bearer=bearer,
203 x_real_ip=x_real_ip,
204 **kwargs,
205 )
206 return await self._wrap_client_response(response)
207
208 async def delete(
209 self,
210 path: str,
211 headers: typing.Optional[typing.Dict[str, str]] = None,
212 bearer: typing.Optional[str] = None,
213 x_real_ip: typing.Optional[str] = None,
214 **kwargs,
215 ) -> http.ClientResponse:
216 """
217 Make a HTTP DELETE request
218 """
219 response = await self._client.delete(
220 path,
221 headers=headers,
222 bearer=bearer,
223 x_real_ip=x_real_ip,
224 **kwargs,
225 )
226 return await self._wrap_client_response(response)
227
228 async def options(
229 self,
230 path: str,
231 headers: typing.Optional[typing.Dict[str, str]] = None,
232 bearer: typing.Optional[str] = None,
233 x_real_ip: typing.Optional[str] = None,
234 **kwargs,
235 ) -> http.ClientResponse:
236 """
237 Make a HTTP OPTIONS request
238 """
239 response = await self._client.options(
240 path,
241 headers=headers,
242 bearer=bearer,
243 x_real_ip=x_real_ip,
244 **kwargs,
245 )
246 return await self._wrap_client_response(response)
247
248 async def request(
249 self,
250 http_method: str,
251 path: str,
252 **kwargs,
253 ) -> http.ClientResponse:
254 """
255 Make a HTTP request with the specified method
256 """
257 response = await self._client.request(http_method, path, **kwargs)
258 return await self._wrap_client_response(response)
259
260 @property
262 """
263 @deprecated Use pytest_userver.client.Client directly instead.
264 """
265 return self._client
266
267 def _wrap_client_response(
268 self,
269 response: aiohttp.ClientResponse,
270 ) -> typing.Awaitable[http.ClientResponse]:
271 return http.wrap_client_response(response)
272
273
274# @cond
275
276
277def _wrap_client_error(func):
278 async def _wrapper(*args, **kwargs):
279 try:
280 return await func(*args, **kwargs)
281 except aiohttp.client_exceptions.ClientResponseError as exc:
282 raise http.HttpResponseError(
283 url=exc.request_info.url,
284 status=exc.status,
285 )
286
287 return _wrapper
288
289
290class AiohttpClientMonitor(service_client.AiohttpClient):
291 _config: TestsuiteClientConfig
292
293 def __init__(self, base_url, *, config: TestsuiteClientConfig, **kwargs):
294 super().__init__(base_url, **kwargs)
295 self._config = config
296
297 async def get_metrics(self, prefix=None):
298 if not self._config.server_monitor_path:
299 raise ConfigurationError(
300 'handler-server-monitor component is not configured',
301 )
302 params = {'format': 'internal'}
303 if prefix is not None:
304 params['prefix'] = prefix
305 response = await self.get(
306 self._config.server_monitor_path,
307 params=params,
308 )
309 async with response:
310 response.raise_for_status()
311 return await response.json(content_type=None)
312
313 async def get_metric(self, metric_name):
314 metrics = await self.get_metrics(metric_name)
315 assert metric_name in metrics, (
316 f'No metric with name {metric_name!r}. Use "single_metric" function instead of "get_metric"'
317 )
318 return metrics[metric_name]
319
320 async def metrics_raw(
321 self,
322 output_format,
323 *,
324 path: str = None,
325 prefix: str = None,
326 labels: typing.Optional[typing.Dict[str, str]] = None,
327 ) -> str:
328 if not self._config.server_monitor_path:
329 raise ConfigurationError(
330 'handler-server-monitor component is not configured',
331 )
332
333 params = {'format': output_format}
334 if prefix:
335 params['prefix'] = prefix
336
337 if path:
338 params['path'] = path
339
340 if labels:
341 params['labels'] = json.dumps(labels)
342
343 response = await self.get(
344 self._config.server_monitor_path,
345 params=params,
346 )
347 async with response:
348 response.raise_for_status()
349 return await response.text()
350
351 async def metrics(
352 self,
353 *,
354 path: str = None,
355 prefix: str = None,
356 labels: typing.Optional[typing.Dict[str, str]] = None,
357 ) -> metric_module.MetricsSnapshot:
358 response = await self.metrics_raw(
359 output_format='json',
360 path=path,
361 prefix=prefix,
362 labels=labels,
363 )
364 return metric_module.MetricsSnapshot.from_json(str(response))
365
366 async def single_metric_optional(
367 self,
368 path: str,
369 *,
370 labels: typing.Optional[typing.Dict[str, str]] = None,
371 ) -> typing.Optional[Metric]:
372 response = await self.metrics(path=path, labels=labels)
373 metrics_list = response.get(path, [])
374
375 assert len(metrics_list) <= 1, (f'More than one metric found for path {path} and labels {labels}: {response}',)
376
377 if not metrics_list:
378 return None
379
380 return next(iter(metrics_list))
381
382 async def single_metric(
383 self,
384 path: str,
385 *,
386 labels: typing.Optional[typing.Dict[str, str]] = None,
387 ) -> Metric:
388 value = await self.single_metric_optional(path, labels=labels)
389 assert value is not None, (f'No metric was found for path {path} and labels {labels}',)
390 return value
391
392
393# @endcond
394
395
397 """
398 Asyncio userver client for monitor listeners, typically retrieved from
399 plugins.service_client.monitor_client fixture.
400
401 Compatible with werkzeug interface.
402
403 @ingroup userver_testsuite
404 """
405
407 self,
408 *,
409 path: typing.Optional[str] = None,
410 prefix: typing.Optional[str] = None,
411 labels: typing.Optional[typing.Dict[str, str]] = None,
412 diff_gauge: bool = False,
413 ) -> 'MetricsDiffer':
414 """
415 Creates a `MetricsDiffer` that fetches metrics using this client.
416 It's recommended to use this method over `metrics` to make sure
417 the tests don't affect each other.
418
419 With `diff_gauge` off, only RATE metrics are differentiated.
420 With `diff_gauge` on, GAUGE metrics are differentiated as well,
421 which may lead to nonsensical results for those.
422
423 @param path Optional full metric path
424 @param prefix Optional prefix on which the metric paths should start
425 @param labels Optional dictionary of labels that must be in the metric
426 @param diff_gauge Whether to differentiate GAUGE metrics
427
428 @code
429 async with monitor_client.metrics_diff(prefix='foo') as differ:
430 # Do something that makes the service update its metrics
431 assert differ.value_at('path-suffix', {'label'}) == 42
432 @endcode
433 """
434 return MetricsDiffer(
435 _client=self,
436 _path=path,
437 _prefix=prefix,
438 _labels=labels,
439 _diff_gauge=diff_gauge,
440 )
441
442 @_wrap_client_error
443 async def metrics(
444 self,
445 *,
446 path: typing.Optional[str] = None,
447 prefix: typing.Optional[str] = None,
448 labels: typing.Optional[typing.Dict[str, str]] = None,
449 ) -> metric_module.MetricsSnapshot:
450 """
451 Returns a dict of metric names to Metric.
452
453 @param path Optional full metric path
454 @param prefix Optional prefix on which the metric paths should start
455 @param labels Optional dictionary of labels that must be in the metric
456 """
457 return await self._client.metrics(
458 path=path,
459 prefix=prefix,
460 labels=labels,
461 )
462
463 @_wrap_client_error
465 self,
466 path: str,
467 *,
468 labels: typing.Optional[typing.Dict[str, str]] = None,
469 ) -> typing.Optional[Metric]:
470 """
471 Either return a Metric or None if there's no such metric.
472
473 @param path Full metric path
474 @param labels Optional dictionary of labels that must be in the metric
475
476 @throws AssertionError if more than one metric returned
477 """
478 return await self._client.single_metric_optional(path, labels=labels)
479
480 @_wrap_client_error
481 async def single_metric(
482 self,
483 path: str,
484 *,
485 labels: typing.Optional[typing.Dict[str, str]] = None,
486 ) -> typing.Optional[Metric]:
487 """
488 Returns the Metric.
489
490 @param path Full metric path
491 @param labels Optional dictionary of labels that must be in the metric
492
493 @throws AssertionError if more than one metric or no metric found
494 """
495 return await self._client.single_metric(path, labels=labels)
496
497 @_wrap_client_error
498 async def metrics_raw(
499 self,
500 output_format: str,
501 *,
502 path: typing.Optional[str] = None,
503 prefix: typing.Optional[str] = None,
504 labels: typing.Optional[typing.Dict[str, str]] = None,
505 ) -> typing.Dict[str, Metric]:
506 """
507 Low level function that returns metrics in a specific format.
508 Use `metrics` and `single_metric` instead if possible.
509
510 @param output_format Metric output format. See
511 server::handlers::ServerMonitor for a list of supported formats.
512 @param path Optional full metric path
513 @param prefix Optional prefix on which the metric paths should start
514 @param labels Optional dictionary of labels that must be in the metric
515 """
516 return await self._client.metrics_raw(
517 output_format=output_format,
518 path=path,
519 prefix=prefix,
520 labels=labels,
521 )
522
523 @_wrap_client_error
524 async def get_metrics(self, prefix=None):
525 """
526 @deprecated Use metrics() or single_metric() instead
527 """
528 return await self._client.get_metrics(prefix=prefix)
529
530 @_wrap_client_error
531 async def get_metric(self, metric_name):
532 """
533 @deprecated Use metrics() or single_metric() instead
534 """
535 return await self._client.get_metric(metric_name)
536
537 @_wrap_client_error
538 async def fired_alerts(self):
539 response = await self._client.get('/service/fired-alerts')
540 assert response.status == 200
541 return (await response.json())['alerts']
542
543
545 """
546 A helper class for computing metric differences.
547
548 @see ClientMonitor.metrics_diff
549 @ingroup userver_testsuite
550 """
551
552 # @cond
553 def __init__(
554 self,
555 _client: ClientMonitor,
556 _path: typing.Optional[str],
557 _prefix: typing.Optional[str],
558 _labels: typing.Optional[typing.Dict[str, str]],
559 _diff_gauge: bool,
560 ):
561 self._client = _client
562 self._path = _path
563 self._prefix = _prefix
564 self._labels = _labels
565 self._diff_gauge = _diff_gauge
566 self._baseline: typing.Optional[metric_module.MetricsSnapshot] = None
567 self._current: typing.Optional[metric_module.MetricsSnapshot] = None
568 self._diff: typing.Optional[metric_module.MetricsSnapshot] = None
569
570 # @endcond
571
572 @property
573 def baseline(self) -> metric_module.MetricsSnapshot:
574 assert self._baseline is not None
575 return self._baseline
576
577 @baseline.setter
578 def baseline(self, value: metric_module.MetricsSnapshot) -> None:
579 self._baseline = value
580 if self._current is not None:
581 self._diff = _subtract_metrics_snapshots(
582 self._current,
583 self._baseline,
584 self._diff_gauge,
585 )
586
587 @property
588 def current(self) -> metric_module.MetricsSnapshot:
589 assert self._current is not None, 'Set self.current first'
590 return self._current
591
592 @current.setter
593 def current(self, value: metric_module.MetricsSnapshot) -> None:
594 self._current = value
595 assert self._baseline is not None, 'Set self.baseline first'
596 self._diff = _subtract_metrics_snapshots(
597 self._current,
598 self._baseline,
599 self._diff_gauge,
600 )
601
602 @property
603 def diff(self) -> metric_module.MetricsSnapshot:
604 assert self._diff is not None, 'Set self.current first'
605 return self._diff
606
608 self,
609 subpath: typing.Optional[str] = None,
610 add_labels: typing.Optional[typing.Dict] = None,
611 *,
612 default: typing.Optional[float] = None,
613 ) -> metric_module.MetricValue:
614 """
615 Returns a single metric value at the specified path, prepending
616 the path provided at construction. If a dict of labels is provided,
617 does en exact match of labels, prepending the labels provided
618 at construction.
619
620 @param subpath Suffix of the metric path; the path provided
621 at construction is prepended
622 @param add_labels Labels that the metric must have in addition
623 to the labels provided at construction
624 @param default An optional default value in case the metric is missing
625 @throws AssertionError if not one metric by path
626 """
627 base_path = self._path or self._prefix
628 if base_path and subpath:
629 path = f'{base_path}.{subpath}'
630 else:
631 assert base_path or subpath, 'No path provided'
632 path = base_path or subpath or ''
633 labels: typing.Optional[dict] = None
634 if self._labels is not None or add_labels is not None:
635 labels = {**(self._labels or {}), **(add_labels or {})}
636 return self.diff.value_at(path, labels, default=default)
637
638 async def fetch(self) -> metric_module.MetricsSnapshot:
639 """
640 Fetches metric values from the service.
641 """
642 return await self._client.metrics(
643 path=self._path,
644 prefix=self._prefix,
645 labels=self._labels,
646 )
647
648 async def __aenter__(self) -> 'MetricsDiffer':
649 self._baseline = await self.fetch()
650 self._current = None
651 return self
652
653 async def __aexit__(self, exc_type, exc, exc_tb) -> None:
654 self.currentcurrentcurrent = await self.fetch()
655
656
657# @cond
658
659
660def _subtract_metrics_snapshots(
661 current: metric_module.MetricsSnapshot,
662 initial: metric_module.MetricsSnapshot,
663 diff_gauge: bool,
664) -> metric_module.MetricsSnapshot:
665 return metric_module.MetricsSnapshot({
666 path: {_subtract_metrics(path, current_metric, initial, diff_gauge) for current_metric in current_group}
667 for path, current_group in current.items()
668 })
669
670
671def _subtract_metrics(
672 path: str,
673 current_metric: metric_module.Metric,
674 initial: metric_module.MetricsSnapshot,
675 diff_gauge: bool,
676) -> metric_module.Metric:
677 initial_group = initial.get(path, None)
678 if initial_group is None:
679 return current_metric
680 initial_metric = next(
681 (x for x in initial_group if x.labels == current_metric.labels),
682 None,
683 )
684 if initial_metric is None:
685 return current_metric
686
687 return metric_module.Metric(
688 labels=current_metric.labels,
689 value=_subtract_metric_values(
690 current=current_metric,
691 initial=initial_metric,
692 diff_gauge=diff_gauge,
693 ),
694 _type=current_metric.type(),
695 )
696
697
698def _subtract_metric_values(
699 current: metric_module.Metric,
700 initial: metric_module.Metric,
701 diff_gauge: bool,
702) -> metric_module.MetricValue:
703 assert current.type() is not metric_module.MetricType.UNSPECIFIED
704 assert initial.type() is not metric_module.MetricType.UNSPECIFIED
705 assert current.type() == initial.type()
706
707 if isinstance(current.value, metric_module.Histogram):
708 assert isinstance(initial.value, metric_module.Histogram)
709 return _subtract_metric_values_hist(current=current, initial=initial)
710 else:
711 assert not isinstance(initial.value, metric_module.Histogram)
712 return _subtract_metric_values_num(
713 current=current,
714 initial=initial,
715 diff_gauge=diff_gauge,
716 )
717
718
719def _subtract_metric_values_num(
720 current: metric_module.Metric,
721 initial: metric_module.Metric,
722 diff_gauge: bool,
723) -> float:
724 current_value = typing.cast(float, current.value)
725 initial_value = typing.cast(float, initial.value)
726 should_diff = (
727 current.type() is metric_module.MetricType.RATE or initial.type() is metric_module.MetricType.RATE or diff_gauge
728 )
729 return current_value - initial_value if should_diff else current_value
730
731
732def _subtract_metric_values_hist(
733 current: metric_module.Metric,
734 initial: metric_module.Metric,
735) -> metric_module.Histogram:
736 current_value = typing.cast(metric_module.Histogram, current.value)
737 initial_value = typing.cast(metric_module.Histogram, initial.value)
738 assert current_value.bounds == initial_value.bounds
739 return metric_module.Histogram(
740 bounds=current_value.bounds,
741 buckets=[t[0] - t[1] for t in zip(current_value.buckets, initial_value.buckets)],
742 inf=current_value.inf - initial_value.inf,
743 )
744
745
746class AiohttpClient(service_client.AiohttpClient):
747 PeriodicTaskFailed = PeriodicTaskFailed
748 TestsuiteActionFailed = TestsuiteActionFailed
749 TestsuiteTaskNotFound = TestsuiteTaskNotFound
750 TestsuiteTaskConflict = TestsuiteTaskConflict
751 TestsuiteTaskFailed = TestsuiteTaskFailed
752
753 def __init__(
754 self,
755 base_url: str,
756 *,
757 config: TestsuiteClientConfig,
758 mocked_time,
759 log_capture_fixture,
760 testpoint,
761 testpoint_control,
762 cache_invalidation_state,
763 span_id_header=None,
764 api_coverage_report=None,
765 periodic_tasks_state: typing.Optional[PeriodicTasksState] = None,
766 allow_all_caches_invalidation: bool = True,
767 cache_control: typing.Optional[caches.CacheControl] = None,
768 asyncexc_check=None,
769 **kwargs,
770 ):
771 super().__init__(base_url, span_id_header=span_id_header, **kwargs)
772 self._config = config
773 self._periodic_tasks = periodic_tasks_state
774 self._testpoint = testpoint
775 self._log_capture_fixture = log_capture_fixture
776 self._state_manager = _StateManager(
777 mocked_time=mocked_time,
778 testpoint=self._testpoint,
779 testpoint_control=testpoint_control,
780 invalidation_state=cache_invalidation_state,
781 cache_control=cache_control,
782 )
783 self._api_coverage_report = api_coverage_report
784 self._allow_all_caches_invalidation = allow_all_caches_invalidation
785 self._asyncexc_check = asyncexc_check
786
787 async def run_periodic_task(self, name):
788 response = await self._testsuite_action('run_periodic_task', name=name)
789 if not response['status']:
790 raise self.PeriodicTaskFailed(f'Periodic task {name} failed')
791
792 async def suspend_periodic_tasks(self, names: typing.List[str]) -> None:
793 if not self._periodic_tasks:
794 raise ConfigurationError('No periodic_tasks_state given')
795 self._periodic_tasks.tasks_to_suspend.update(names)
796 await self._suspend_periodic_tasks()
797
798 async def resume_periodic_tasks(self, names: typing.List[str]) -> None:
799 if not self._periodic_tasks:
800 raise ConfigurationError('No periodic_tasks_state given')
801 self._periodic_tasks.tasks_to_suspend.difference_update(names)
802 await self._suspend_periodic_tasks()
803
804 async def resume_all_periodic_tasks(self) -> None:
805 if not self._periodic_tasks:
806 raise ConfigurationError('No periodic_tasks_state given')
807 self._periodic_tasks.tasks_to_suspend.clear()
808 await self._suspend_periodic_tasks()
809
810 async def write_cache_dumps(
811 self,
812 names: typing.List[str],
813 *,
814 testsuite_skip_prepare=False,
815 ) -> None:
816 await self._testsuite_action(
817 'write_cache_dumps',
818 names=names,
819 testsuite_skip_prepare=testsuite_skip_prepare,
820 )
821
822 async def read_cache_dumps(
823 self,
824 names: typing.List[str],
825 *,
826 testsuite_skip_prepare=False,
827 ) -> None:
828 await self._testsuite_action(
829 'read_cache_dumps',
830 names=names,
831 testsuite_skip_prepare=testsuite_skip_prepare,
832 )
833
834 async def run_distlock_task(self, name: str) -> None:
835 await self.run_task(f'distlock/{name}')
836
837 async def reset_metrics(self) -> None:
838 await self._testsuite_action('reset_metrics')
839
840 async def metrics_portability(
841 self,
842 *,
843 prefix: typing.Optional[str] = None,
844 ) -> typing.Dict[str, typing.List[typing.Dict[str, str]]]:
845 return await self._testsuite_action(
846 'metrics_portability',
847 prefix=prefix,
848 )
849
850 async def list_tasks(self) -> typing.List[str]:
851 response = await self._do_testsuite_action('tasks_list')
852 async with response:
853 response.raise_for_status()
854 body = await response.json(content_type=None)
855 return body['tasks']
856
857 async def run_task(self, name: str) -> None:
858 response = await self._do_testsuite_action(
859 'task_run',
860 json={'name': name},
861 )
862 await _task_check_response(name, response)
863
864 @contextlib.asynccontextmanager
865 async def spawn_task(self, name: str):
866 task_id = await self._task_spawn(name)
867 try:
868 yield
869 finally:
870 await self._task_stop_spawned(task_id)
871
872 async def _task_spawn(self, name: str) -> str:
873 response = await self._do_testsuite_action(
874 'task_spawn',
875 json={'name': name},
876 )
877 data = await _task_check_response(name, response)
878 return data['task_id']
879
880 async def _task_stop_spawned(self, task_id: str) -> None:
881 response = await self._do_testsuite_action(
882 'task_stop',
883 json={'task_id': task_id},
884 )
885 await _task_check_response(task_id, response)
886
887 async def http_allowed_urls_extra(
888 self,
889 http_allowed_urls_extra: typing.List[str],
890 ) -> None:
891 await self._do_testsuite_action(
892 'http_allowed_urls_extra',
893 json={'allowed_urls_extra': http_allowed_urls_extra},
894 testsuite_skip_prepare=True,
895 )
896
897 @contextlib.asynccontextmanager
898 async def capture_logs(
899 self,
900 *,
901 log_level: str = 'DEBUG',
902 testsuite_skip_prepare: bool = False,
903 ):
904 async with self._log_capture_fixture.start_capture(
905 log_level=log_level,
906 ) as capture:
907 logger.debug('Starting logcapture')
908 await self._testsuite_action(
909 'log_capture',
910 log_level=log_level,
911 socket_logging_duplication=True,
912 testsuite_skip_prepare=testsuite_skip_prepare,
913 )
914
915 try:
916 await self._log_capture_fixture.wait_for_client()
917 yield capture
918 finally:
919 logger.debug('Finishing logcapture')
920 await self._testsuite_action(
921 'log_capture',
922 log_level=self._log_capture_fixture.default_log_level,
923 socket_logging_duplication=False,
924 testsuite_skip_prepare=testsuite_skip_prepare,
925 )
926
927 async def log_flush(self, logger_name: typing.Optional[str] = None):
928 await self._testsuite_action(
929 'log_flush',
930 logger_name=logger_name,
931 testsuite_skip_prepare=True,
932 )
933
934 async def invalidate_caches(
935 self,
936 *,
937 clean_update: bool = True,
938 cache_names: typing.Optional[typing.List[str]] = None,
939 testsuite_skip_prepare: bool = False,
940 ) -> None:
941 if cache_names is None and clean_update:
942 if self._allow_all_caches_invalidation:
943 warnings.warn(CACHE_INVALIDATION_MESSAGE, DeprecationWarning)
944 else:
945 __tracebackhide__ = True
946 raise RuntimeError(CACHE_INVALIDATION_MESSAGE)
947
948 if testsuite_skip_prepare:
949 await self._tests_control({
950 'invalidate_caches': {
951 'update_type': ('full' if clean_update else 'incremental'),
952 **({'names': cache_names} if cache_names else {}),
953 },
954 })
955 else:
956 await self.tests_control(
957 invalidate_caches=True,
958 clean_update=clean_update,
959 cache_names=cache_names,
960 )
961
962 async def tests_control(
963 self,
964 *,
965 invalidate_caches: bool = True,
966 clean_update: bool = True,
967 cache_names: typing.Optional[typing.List[str]] = None,
968 http_allowed_urls_extra=None,
969 ) -> typing.Dict[str, typing.Any]:
970 body: typing.Dict[str, typing.Any] = self._state_manager.get_pending_update()
971
972 if 'invalidate_caches' in body and invalidate_caches:
973 if not clean_update or cache_names:
974 logger.warning(
975 'Manual cache invalidation leads to indirect initial full cache invalidation',
976 )
977 await self._prepare()
978 body = {}
979
980 if invalidate_caches:
981 body['invalidate_caches'] = {
982 'update_type': ('full' if clean_update else 'incremental'),
983 }
984 if cache_names:
985 body['invalidate_caches']['names'] = cache_names
986
987 if http_allowed_urls_extra is not None:
988 await self.http_allowed_urls_extra(http_allowed_urls_extra)
989
990 return await self._tests_control(body)
991
992 async def update_server_state(self) -> None:
993 await self._prepare()
994
995 async def enable_testpoints(self, *, no_auto_cache_cleanup=False) -> None:
996 if not self._testpoint:
997 return
998 if no_auto_cache_cleanup:
999 await self._tests_control({
1000 'testpoints': sorted(self._testpoint.keys()),
1001 })
1002 else:
1003 await self.update_server_state()
1004
1005 async def get_dynamic_config_defaults(
1006 self,
1007 ) -> typing.Dict[str, typing.Any]:
1008 return await self._testsuite_action(
1009 'get_dynamic_config_defaults',
1010 testsuite_skip_prepare=True,
1011 )
1012
1013 async def _tests_control(self, body: dict) -> typing.Dict[str, typing.Any]:
1014 with self._state_manager.updating_state(body):
1015 async with await self._do_testsuite_action(
1016 'control',
1017 json=body,
1018 testsuite_skip_prepare=True,
1019 ) as response:
1020 if response.status == 404:
1021 raise ConfigurationError(
1022 'It seems that testsuite support is not enabled for your service',
1023 )
1024 response.raise_for_status()
1025 return await response.json(content_type=None)
1026
1027 async def _suspend_periodic_tasks(self):
1028 if self._periodic_tasks.tasks_to_suspend != self._periodic_tasks.suspended_tasks:
1029 await self._testsuite_action(
1030 'suspend_periodic_tasks',
1031 names=sorted(self._periodic_tasks.tasks_to_suspend),
1032 )
1033 self._periodic_tasks.suspended_tasks = set(
1034 self._periodic_tasks.tasks_to_suspend,
1035 )
1036
1037 def _do_testsuite_action(self, action, **kwargs):
1038 if not self._config.testsuite_action_path:
1039 raise ConfigurationError(
1040 'tests-control component is not properly configured',
1041 )
1042 path = self._config.testsuite_action_path.format(action=action)
1043 return self.post(path, **kwargs)
1044
1045 async def _testsuite_action(
1046 self,
1047 action,
1048 *,
1049 testsuite_skip_prepare=False,
1050 **kwargs,
1051 ):
1052 async with await self._do_testsuite_action(
1053 action,
1054 json=kwargs,
1055 testsuite_skip_prepare=testsuite_skip_prepare,
1056 ) as response:
1057 if response.status == 500:
1058 raise TestsuiteActionFailed
1059 response.raise_for_status()
1060 return await response.json(content_type=None)
1061
1062 async def _prepare(self) -> None:
1063 with self._state_manager.cache_control_update() as pending_update:
1064 if pending_update:
1065 await self._tests_control(pending_update)
1066
1067 async def _request( # pylint: disable=arguments-differ
1068 self,
1069 http_method: str,
1070 path: str,
1071 headers: typing.Optional[typing.Dict[str, str]] = None,
1072 bearer: typing.Optional[str] = None,
1073 x_real_ip: typing.Optional[str] = None,
1074 *,
1075 testsuite_skip_prepare: bool = False,
1076 **kwargs,
1077 ) -> aiohttp.ClientResponse:
1078 if self._asyncexc_check:
1079 # Check for pending background exceptions before call.
1080 self._asyncexc_check()
1081
1082 if not testsuite_skip_prepare:
1083 await self._prepare()
1084
1085 response = await super()._request(
1086 http_method,
1087 path,
1088 headers,
1089 bearer,
1090 x_real_ip,
1091 **kwargs,
1092 )
1093 if self._api_coverage_report:
1094 self._api_coverage_report.update_usage_stat(
1095 path,
1096 http_method,
1097 response.status,
1098 response.content_type,
1099 )
1100
1101 if self._asyncexc_check:
1102 # Check for pending background exceptions after call.
1103 self._asyncexc_check()
1104
1105 return response
1106
1107
1108# @endcond
1109
1110
1112 """
1113 Asyncio userver client, typically retrieved from
1114 @ref service_client "plugins.service_client.service_client"
1115 fixture.
1116
1117 Compatible with werkzeug interface.
1118
1119 @ingroup userver_testsuite
1120 """
1121
1122 PeriodicTaskFailed = PeriodicTaskFailed
1123 TestsuiteActionFailed = TestsuiteActionFailed
1124 TestsuiteTaskNotFound = TestsuiteTaskNotFound
1125 TestsuiteTaskConflict = TestsuiteTaskConflict
1126 TestsuiteTaskFailed = TestsuiteTaskFailed
1127
1128 def _wrap_client_response(
1129 self,
1130 response: aiohttp.ClientResponse,
1131 ) -> typing.Awaitable[http.ClientResponse]:
1132 return http.wrap_client_response(
1133 response,
1134 json_loads=approx.json_loads,
1135 )
1136
1137 @_wrap_client_error
1138 async def run_periodic_task(self, name):
1139 await self._client.run_periodic_task(name)
1140
1141 @_wrap_client_error
1142 async def suspend_periodic_tasks(self, names: typing.List[str]) -> None:
1143 await self._client.suspend_periodic_tasks(names)
1144
1145 @_wrap_client_error
1146 async def resume_periodic_tasks(self, names: typing.List[str]) -> None:
1147 await self._client.resume_periodic_tasks(names)
1148
1149 @_wrap_client_error
1150 async def resume_all_periodic_tasks(self) -> None:
1151 await self._client.resume_all_periodic_tasks()
1152
1153 @_wrap_client_error
1154 async def write_cache_dumps(
1155 self,
1156 names: typing.List[str],
1157 *,
1158 testsuite_skip_prepare=False,
1159 ) -> None:
1160 await self._client.write_cache_dumps(
1161 names=names,
1162 testsuite_skip_prepare=testsuite_skip_prepare,
1163 )
1164
1165 @_wrap_client_error
1166 async def read_cache_dumps(
1167 self,
1168 names: typing.List[str],
1169 *,
1170 testsuite_skip_prepare=False,
1171 ) -> None:
1172 await self._client.read_cache_dumps(
1173 names=names,
1174 testsuite_skip_prepare=testsuite_skip_prepare,
1175 )
1176
1177 async def run_task(self, name: str) -> None:
1178 await self._client.run_task(name)
1179
1180 async def run_distlock_task(self, name: str) -> None:
1181 await self._client.run_distlock_task(name)
1182
1183 async def reset_metrics(self) -> None:
1184 """
1185 Calls `ResetMetric(metric);` for each metric that has such C++ function
1186 """
1187 await self._client.reset_metrics()
1188
1190 self,
1191 *,
1192 prefix: typing.Optional[str] = None,
1193 ) -> typing.Dict[str, typing.List[typing.Dict[str, str]]]:
1194 """
1195 Reports metrics related issues that could be encountered on
1196 different monitoring systems.
1197
1198 @sa @ref utils::statistics::GetPortabilityInfo
1199 """
1200 return await self._client.metrics_portability(prefix=prefix)
1201
1202 def list_tasks(self) -> typing.List[str]:
1203 return self._client.list_tasks()
1204
1205 def spawn_task(self, name: str):
1206 return self._client.spawn_task(name)
1207
1209 self,
1210 *,
1211 log_level: str = 'DEBUG',
1212 testsuite_skip_prepare: bool = False,
1213 ):
1214 """
1215 Captures logs from the service.
1216
1217 @param log_level Do not capture logs below this level.
1218
1219 @see @ref testsuite_logs_capture
1220 """
1221 return self._client.capture_logs(
1222 log_level=log_level,
1223 testsuite_skip_prepare=testsuite_skip_prepare,
1224 )
1225
1226 def log_flush(self, logger_name: typing.Optional[str] = None):
1227 """
1228 Flush service logs.
1229 """
1230 return self._client.log_flush(logger_name=logger_name)
1231
1232 @_wrap_client_error
1234 self,
1235 *,
1236 clean_update: bool = True,
1237 cache_names: typing.Optional[typing.List[str]] = None,
1238 testsuite_skip_prepare: bool = False,
1239 ) -> None:
1240 """
1241 Send request to service to update caches.
1242
1243 @param clean_update if False, service will do a faster incremental
1244 update of caches whenever possible.
1245 @param cache_names which caches specifically should be updated;
1246 update all if None.
1247 @param testsuite_skip_prepare if False, service will automatically do
1248 update_server_state().
1249 """
1250 __tracebackhide__ = True
1251 await self._client.invalidate_caches(
1252 clean_update=clean_update,
1253 cache_names=cache_names,
1254 testsuite_skip_prepare=testsuite_skip_prepare,
1255 )
1256
1257 @_wrap_client_error
1258 async def tests_control(
1259 self,
1260 *args,
1261 **kwargs,
1262 ) -> typing.Dict[str, typing.Any]:
1263 return await self._client.tests_control(*args, **kwargs)
1264
1265 @_wrap_client_error
1266 async def update_server_state(self) -> None:
1267 """
1268 Update service-side state through http call to 'tests/control':
1269 - clear dirty (from other tests) caches
1270 - set service-side mocked time,
1271 - resume / suspend periodic tasks
1272 - enable testpoints
1273 If service is up-to-date, does nothing.
1274 """
1275 await self._client.update_server_state()
1276
1277 @_wrap_client_error
1278 async def enable_testpoints(self, *args, **kwargs) -> None:
1279 """
1280 Send list of handled testpoint pats to service. For these paths service
1281 will no more skip http calls from TESTPOINT(...) macro.
1282
1283 @param no_auto_cache_cleanup prevent automatic cache cleanup.
1284 When calling service client first time in scope of current test, client
1285 makes additional http call to `tests/control` to update caches, to get
1286 rid of data from previous test.
1287 """
1288 await self._client.enable_testpoints(*args, **kwargs)
1289
1290 @_wrap_client_error
1291 async def get_dynamic_config_defaults(
1292 self,
1293 ) -> typing.Dict[str, typing.Any]:
1294 return await self._client.get_dynamic_config_defaults()
1295
1296
1297@dataclasses.dataclass
1299 """Reflects the (supposed) current service state."""
1300
1301 invalidation_state: caches.InvalidationState
1302 now: typing.Optional[str] = _UNKNOWN_STATE
1303 testpoints: typing.FrozenSet[str] = frozenset([_UNKNOWN_STATE])
1304
1305
1307 """
1308 Used for computing the requests that we need to automatically align
1309 the service state with the test fixtures state.
1310 """
1311
1312 def __init__(
1313 self,
1314 *,
1315 mocked_time,
1316 testpoint,
1317 testpoint_control,
1318 invalidation_state: caches.InvalidationState,
1319 cache_control: typing.Optional[caches.CacheControl],
1320 ):
1321 self._state = _State(
1322 invalidation_state=copy.deepcopy(invalidation_state),
1323 )
1324 self._mocked_time = mocked_time
1325 self._testpoint = testpoint
1326 self._testpoint_control = testpoint_control
1327 self._invalidation_state = invalidation_state
1328 self._cache_control = cache_control
1329
1330 @contextlib.contextmanager
1331 def updating_state(self, body: typing.Dict[str, typing.Any]):
1332 """
1333 Whenever `tests_control` handler is invoked
1334 (by the client itself during `prepare` or manually by the user),
1335 we need to synchronize `_state` with the (supposed) service state.
1336 The state update is decoded from the request body.
1337 """
1338 saved_state = copy.deepcopy(self._state)
1339 try:
1340 self._update_state(body)
1341 self._apply_new_state()
1342 yield
1343 except Exception: # noqa
1344 self._state = saved_state
1345 self._apply_new_state()
1346 raise
1347
1348 def get_pending_update(self) -> typing.Dict[str, typing.Any]:
1349 """
1350 Compose the body of the `tests_control` request required to completely
1351 synchronize the service state with the state of test fixtures.
1352 """
1353 body: typing.Dict[str, typing.Any] = {}
1354
1355 if self._invalidation_state.has_caches_to_update:
1356 body['invalidate_caches'] = {'update_type': 'full'}
1357 if not self._invalidation_state.should_update_all_caches:
1358 body['invalidate_caches']['names'] = list(
1359 self._invalidation_state.caches_to_update,
1360 )
1361
1362 desired_testpoints = self._testpoint.keys()
1363 if self._state.testpoints != frozenset(desired_testpoints):
1364 body['testpoints'] = sorted(desired_testpoints)
1365
1366 desired_now = self._get_desired_now()
1367 if self._state.now != desired_now:
1368 body['mock_now'] = desired_now
1369
1370 return body
1371
1372 @contextlib.contextmanager
1373 def cache_control_update(self) -> typing.ContextManager[typing.Dict]:
1374 pending_update = self.get_pending_update()
1375 invalidate_caches = pending_update.get('invalidate_caches')
1376 if not invalidate_caches or not self._cache_control:
1377 yield pending_update
1378 else:
1379 cache_names = invalidate_caches.get('names')
1380 staged, actions = self._cache_control.query_caches(cache_names)
1381 self._apply_cache_control_actions(invalidate_caches, actions)
1382 yield pending_update
1383 self._cache_control.commit_staged(staged)
1384
1385 @staticmethod
1386 def _apply_cache_control_actions(
1387 invalidate_caches: typing.Dict,
1388 actions: typing.List[typing.Tuple[str, caches.CacheControlAction]],
1389 ) -> None:
1390 cache_names = invalidate_caches.get('names')
1391 exclude_names = invalidate_caches.setdefault('exclude_names', [])
1392 force_incremental_names = invalidate_caches.setdefault(
1393 'force_incremental_names',
1394 [],
1395 )
1396 for cache_name, action in actions:
1397 if action == caches.CacheControlAction.INCREMENTAL:
1398 force_incremental_names.append(cache_name)
1399 elif action == caches.CacheControlAction.EXCLUDE:
1400 if cache_names is not None:
1401 cache_names.remove(cache_name)
1402 else:
1403 exclude_names.append(cache_name)
1404
1405 def _update_state(self, body: typing.Dict[str, typing.Any]) -> None:
1406 body_invalidate_caches = body.get('invalidate_caches', {})
1407 update_type = body_invalidate_caches.get('update_type', 'full')
1408 body_cache_names = body_invalidate_caches.get('names', None)
1409 # An incremental update is considered insufficient to bring a cache
1410 # to a known state.
1411 if body_invalidate_caches and update_type == 'full':
1412 if body_cache_names is None:
1413 self._state.invalidation_state.on_all_caches_updated()
1414 else:
1415 self._state.invalidation_state.on_caches_updated(
1416 body_cache_names,
1417 )
1418
1419 if 'mock_now' in body:
1420 self._state.now = body['mock_now']
1421
1422 testpoints: typing.Optional[typing.List[str]] = body.get(
1423 'testpoints',
1424 None,
1425 )
1426 if testpoints is not None:
1427 self._state.testpoints = frozenset(testpoints)
1428
1430 """Apply new state to related components."""
1431 self._testpoint_control.enabled_testpoints = self._state.testpoints
1432 self._invalidation_state.assign_copy(self._state.invalidation_state)
1433
1434 def _get_desired_now(self) -> typing.Optional[str]:
1435 if self._mocked_time.is_enabled:
1436 return utils.timestring(self._mocked_time.now())
1437 return None
1438
1439
1440async def _task_check_response(name: str, response) -> dict:
1441 async with response:
1442 if response.status == 404:
1443 raise TestsuiteTaskNotFound(f'Testsuite task {name!r} not found')
1444 if response.status == 409:
1445 raise TestsuiteTaskConflict(f'Testsuite task {name!r} conflict')
1446 assert response.status == 200
1447 data = await response.json()
1448 if not data.get('status', True):
1449 raise TestsuiteTaskFailed(name, data['reason'])
1450 return data