userver: /data/code/userver/testsuite/pytest_plugins/pytest_userver/client.py Source File
Loading...
Searching...
No Matches
client.py
1"""
2Python module that provides clients for functional tests with
3testsuite; see
4@ref scripts/docs/en/userver/functional_testing.md for an introduction.
5
6@ingroup userver_testsuite
7"""
8
9# pylint: disable=too-many-lines
10
11import contextlib
12import copy
13import dataclasses
14import json
15import logging
16import typing
17import warnings
18
19import aiohttp
20
21from testsuite import annotations
22from testsuite import utils
23from testsuite.daemons import service_client
24from testsuite.utils import approx
25from testsuite.utils import http
26
27import pytest_userver.metrics as metric_module # pylint: disable=import-error
28from pytest_userver.plugins import caches
29
30# @cond
31logger = logging.getLogger(__name__)
32# @endcond
33
34_UNKNOWN_STATE = '__UNKNOWN__'
35
36CACHE_INVALIDATION_MESSAGE = (
37 'Direct cache invalidation is deprecated.\n'
38 '\n'
39 ' - Use client.update_server_state() to synchronize service state\n'
40 ' - Explicitly pass cache names to invalidate, e.g.: '
41 'invalidate_caches(cache_names=[...]).'
42)
43
44
45class BaseError(Exception):
46 """Base class for exceptions of this module."""
47
48
50 pass
51
52
54 pass
55
56
66 pass
67
68
70 pass
71
72
74 def __init__(self):
75 self.suspended_tasks: typing.Set[str] = set()
76 self.tasks_to_suspend: typing.Set[str] = set()
77
78
80 def __init__(self, name, reason):
81 self.name = name
82 self.reason = reason
83 super().__init__(f'Testsuite task {name!r} failed: {reason}')
84
85
86@dataclasses.dataclass(frozen=True)
88 testsuite_action_path: typing.Optional[str] = None
89 server_monitor_path: typing.Optional[str] = None
90
91
92Metric = metric_module.Metric
93
94
96 """
97 Base asyncio userver client that implements HTTP requests to service.
98
99 Compatible with werkzeug interface.
100
101 @ingroup userver_testsuite
102 """
103
104 def __init__(self, client):
105 self._client = client
106
107 async def post(
108 self,
109 path: str,
110 # pylint: disable=redefined-outer-name
111 json: annotations.JsonAnyOptional = None,
112 data: typing.Any = None,
113 params: typing.Optional[typing.Dict[str, str]] = None,
114 bearer: typing.Optional[str] = None,
115 x_real_ip: typing.Optional[str] = None,
116 headers: typing.Optional[typing.Dict[str, str]] = None,
117 **kwargs,
118 ) -> http.ClientResponse:
119 """
120 Make a HTTP POST request
121 """
122 response = await self._client.post(
123 path,
124 json=json,
125 data=data,
126 params=params,
127 headers=headers,
128 bearer=bearer,
129 x_real_ip=x_real_ip,
130 **kwargs,
131 )
132 return await self._wrap_client_response(response)
133
134 async def put(
135 self,
136 path,
137 # pylint: disable=redefined-outer-name
138 json: annotations.JsonAnyOptional = None,
139 data: typing.Any = None,
140 params: typing.Optional[typing.Dict[str, str]] = None,
141 bearer: typing.Optional[str] = None,
142 x_real_ip: typing.Optional[str] = None,
143 headers: typing.Optional[typing.Dict[str, str]] = None,
144 **kwargs,
145 ) -> http.ClientResponse:
146 """
147 Make a HTTP PUT request
148 """
149 response = await self._client.put(
150 path,
151 json=json,
152 data=data,
153 params=params,
154 headers=headers,
155 bearer=bearer,
156 x_real_ip=x_real_ip,
157 **kwargs,
158 )
159 return await self._wrap_client_response(response)
160
161 async def patch(
162 self,
163 path,
164 # pylint: disable=redefined-outer-name
165 json: annotations.JsonAnyOptional = None,
166 data: typing.Any = None,
167 params: typing.Optional[typing.Dict[str, str]] = None,
168 bearer: typing.Optional[str] = None,
169 x_real_ip: typing.Optional[str] = None,
170 headers: typing.Optional[typing.Dict[str, str]] = None,
171 **kwargs,
172 ) -> http.ClientResponse:
173 """
174 Make a HTTP PATCH request
175 """
176 response = await self._client.patch(
177 path,
178 json=json,
179 data=data,
180 params=params,
181 headers=headers,
182 bearer=bearer,
183 x_real_ip=x_real_ip,
184 **kwargs,
185 )
186 return await self._wrap_client_response(response)
187
188 async def get(
189 self,
190 path: str,
191 headers: typing.Optional[typing.Dict[str, str]] = None,
192 bearer: typing.Optional[str] = None,
193 x_real_ip: typing.Optional[str] = None,
194 **kwargs,
195 ) -> http.ClientResponse:
196 """
197 Make a HTTP GET request
198 """
199 response = await self._client.get(
200 path, headers=headers, bearer=bearer, x_real_ip=x_real_ip, **kwargs,
201 )
202 return await self._wrap_client_response(response)
203
204 async def delete(
205 self,
206 path: str,
207 headers: typing.Optional[typing.Dict[str, str]] = None,
208 bearer: typing.Optional[str] = None,
209 x_real_ip: typing.Optional[str] = None,
210 **kwargs,
211 ) -> http.ClientResponse:
212 """
213 Make a HTTP DELETE request
214 """
215 response = await self._client.delete(
216 path, headers=headers, bearer=bearer, x_real_ip=x_real_ip, **kwargs,
217 )
218 return await self._wrap_client_response(response)
219
220 async def options(
221 self,
222 path: str,
223 headers: typing.Optional[typing.Dict[str, str]] = None,
224 bearer: typing.Optional[str] = None,
225 x_real_ip: typing.Optional[str] = None,
226 **kwargs,
227 ) -> http.ClientResponse:
228 """
229 Make a HTTP OPTIONS request
230 """
231 response = await self._client.options(
232 path, headers=headers, bearer=bearer, x_real_ip=x_real_ip, **kwargs,
233 )
234 return await self._wrap_client_response(response)
235
236 async def request(
237 self, http_method: str, path: str, **kwargs,
238 ) -> http.ClientResponse:
239 """
240 Make a HTTP request with the specified method
241 """
242 response = await self._client.request(http_method, path, **kwargs)
243 return await self._wrap_client_response(response)
244
245 def _wrap_client_response(
246 self, response: aiohttp.ClientResponse,
247 ) -> typing.Awaitable[http.ClientResponse]:
248 return http.wrap_client_response(response)
249
250
251# @cond
252
253
254def _wrap_client_error(func):
255 async def _wrapper(*args, **kwargs):
256 try:
257 return await func(*args, **kwargs)
258 except aiohttp.client_exceptions.ClientResponseError as exc:
259 raise http.HttpResponseError(
260 url=exc.request_info.url, status=exc.status,
261 )
262
263 return _wrapper
264
265
266class AiohttpClientMonitor(service_client.AiohttpClient):
267 _config: TestsuiteClientConfig
268
269 def __init__(self, base_url, *, config: TestsuiteClientConfig, **kwargs):
270 super().__init__(base_url, **kwargs)
271 self._config = config
272
273 async def get_metrics(self, prefix=None):
274 if not self._config.server_monitor_path:
275 raise ConfigurationError(
276 'handler-server-monitor component is not configured',
277 )
278 params = {'format': 'internal'}
279 if prefix is not None:
280 params['prefix'] = prefix
281 response = await self.get(
282 self._config.server_monitor_path, params=params,
283 )
284 async with response:
285 response.raise_for_status()
286 return await response.json(content_type=None)
287
288 async def get_metric(self, metric_name):
289 metrics = await self.get_metrics(metric_name)
290 assert metric_name in metrics, (
291 f'No metric with name {metric_name!r}. '
292 f'Use "single_metric" function instead of "get_metric"'
293 )
294 return metrics[metric_name]
295
296 async def metrics_raw(
297 self,
298 output_format,
299 *,
300 path: str = None,
301 prefix: str = None,
302 labels: typing.Optional[typing.Dict[str, str]] = None,
303 ) -> str:
304 if not self._config.server_monitor_path:
305 raise ConfigurationError(
306 'handler-server-monitor component is not configured',
307 )
308
309 params = {'format': output_format}
310 if prefix:
311 params['prefix'] = prefix
312
313 if path:
314 params['path'] = path
315
316 if labels:
317 params['labels'] = json.dumps(labels)
318
319 response = await self.get(
320 self._config.server_monitor_path, params=params,
321 )
322 async with response:
323 response.raise_for_status()
324 return await response.text()
325
326 async def metrics(
327 self,
328 *,
329 path: str = None,
330 prefix: str = None,
331 labels: typing.Optional[typing.Dict[str, str]] = None,
332 ) -> metric_module.MetricsSnapshot:
333 response = await self.metrics_raw(
334 output_format='json', path=path, prefix=prefix, labels=labels,
335 )
336 return metric_module.MetricsSnapshot.from_json(str(response))
337
338 async def single_metric_optional(
339 self,
340 path: str,
341 *,
342 labels: typing.Optional[typing.Dict[str, str]] = None,
343 ) -> typing.Optional[Metric]:
344 response = await self.metrics(path=path, labels=labels)
345 metrics_list = response.get(path, [])
346
347 assert len(metrics_list) <= 1, (
348 f'More than one metric found for path {path} and labels {labels}: '
349 f'{response}',
350 )
351
352 if not metrics_list:
353 return None
354
355 return next(iter(metrics_list))
356
357 async def single_metric(
358 self,
359 path: str,
360 *,
361 labels: typing.Optional[typing.Dict[str, str]] = None,
362 ) -> Metric:
363 value = await self.single_metric_optional(path, labels=labels)
364 assert value is not None, (
365 f'No metric was found for path {path} and labels {labels}',
366 )
367 return value
368
369
370# @endcond
371
372
374 """
375 Asyncio userver client for monitor listeners, typically retrieved from
376 plugins.service_client.monitor_client fixture.
377
378 Compatible with werkzeug interface.
379
380 @ingroup userver_testsuite
381 """
382
384 self,
385 *,
386 path: typing.Optional[str] = None,
387 prefix: typing.Optional[str] = None,
388 labels: typing.Optional[typing.Dict[str, str]] = None,
389 diff_gauge: bool = False,
390 ) -> 'MetricsDiffer':
391 """
392 Creates a `MetricsDiffer` that fetches metrics using this client.
393 It's recommended to use this method over `metrics` to make sure
394 the tests don't affect each other.
395
396 With `diff_gauge` off, only RATE metrics are differentiated.
397 With `diff_gauge` on, GAUGE metrics are differentiated as well,
398 which may lead to nonsensical results for those.
399
400 @param path Optional full metric path
401 @param prefix Optional prefix on which the metric paths should start
402 @param labels Optional dictionary of labels that must be in the metric
403 @param diff_gauge Whether to differentiate GAUGE metrics
404
405 @code
406 async with monitor_client.metrics_diff(prefix='foo') as differ:
407 # Do something that makes the service update its metrics
408 assert differ.value_at('path-suffix', {'label'}) == 42
409 @endcode
410 """
411 return MetricsDiffer(
412 _client=self,
413 _path=path,
414 _prefix=prefix,
415 _labels=labels,
416 _diff_gauge=diff_gauge,
417 )
418
419 @_wrap_client_error
420 async def metrics(
421 self,
422 *,
423 path: typing.Optional[str] = None,
424 prefix: typing.Optional[str] = None,
425 labels: typing.Optional[typing.Dict[str, str]] = None,
426 ) -> metric_module.MetricsSnapshot:
427 """
428 Returns a dict of metric names to Metric.
429
430 @param path Optional full metric path
431 @param prefix Optional prefix on which the metric paths should start
432 @param labels Optional dictionary of labels that must be in the metric
433 """
434 return await self._client.metrics(
435 path=path, prefix=prefix, labels=labels,
436 )
437
438 @_wrap_client_error
440 self,
441 path: str,
442 *,
443 labels: typing.Optional[typing.Dict[str, str]] = None,
444 ) -> typing.Optional[Metric]:
445 """
446 Either return a Metric or None if there's no such metric.
447
448 @param path Full metric path
449 @param labels Optional dictionary of labels that must be in the metric
450
451 @throws AssertionError if more than one metric returned
452 """
453 return await self._client.single_metric_optional(path, labels=labels)
454
455 @_wrap_client_error
456 async def single_metric(
457 self,
458 path: str,
459 *,
460 labels: typing.Optional[typing.Dict[str, str]] = None,
461 ) -> typing.Optional[Metric]:
462 """
463 Returns the Metric.
464
465 @param path Full metric path
466 @param labels Optional dictionary of labels that must be in the metric
467
468 @throws AssertionError if more than one metric or no metric found
469 """
470 return await self._client.single_metric(path, labels=labels)
471
472 @_wrap_client_error
473 async def metrics_raw(
474 self,
475 output_format: str,
476 *,
477 path: typing.Optional[str] = None,
478 prefix: typing.Optional[str] = None,
479 labels: typing.Optional[typing.Dict[str, str]] = None,
480 ) -> typing.Dict[str, Metric]:
481 """
482 Low level function that returns metrics in a specific format.
483 Use `metrics` and `single_metric` instead if possible.
484
485 @param output_format Metric output format. See
486 server::handlers::ServerMonitor for a list of supported formats.
487 @param path Optional full metric path
488 @param prefix Optional prefix on which the metric paths should start
489 @param labels Optional dictionary of labels that must be in the metric
490 """
491 return await self._client.metrics_raw(
492 output_format=output_format,
493 path=path,
494 prefix=prefix,
495 labels=labels,
496 )
497
498 @_wrap_client_error
499 async def get_metrics(self, prefix=None):
500 """
501 @deprecated Use metrics() or single_metric() instead
502 """
503 return await self._client.get_metrics(prefix=prefix)
504
505 @_wrap_client_error
506 async def get_metric(self, metric_name):
507 """
508 @deprecated Use metrics() or single_metric() instead
509 """
510 return await self._client.get_metric(metric_name)
511
512 @_wrap_client_error
513 async def fired_alerts(self):
514 response = await self._client.get('/service/fired-alerts')
515 assert response.status == 200
516 return (await response.json())['alerts']
517
518
520 """
521 A helper class for computing metric differences.
522
523 @see ClientMonitor.metrics_diff
524 @ingroup userver_testsuite
525 """
526
527 # @cond
528 def __init__(
529 self,
530 _client: ClientMonitor,
531 _path: typing.Optional[str],
532 _prefix: typing.Optional[str],
533 _labels: typing.Optional[typing.Dict[str, str]],
534 _diff_gauge: bool,
535 ):
536 self._client = _client
537 self._path = _path
538 self._prefix = _prefix
539 self._labels = _labels
540 self._diff_gauge = _diff_gauge
541 self._baseline: typing.Optional[metric_module.MetricsSnapshot] = None
542 self._current: typing.Optional[metric_module.MetricsSnapshot] = None
543 self._diff: typing.Optional[metric_module.MetricsSnapshot] = None
544
545 # @endcond
546
547 @property
548 def baseline(self) -> metric_module.MetricsSnapshot:
549 assert self._baseline is not None
550 return self._baseline
551
552 @baseline.setter
553 def baseline(self, value: metric_module.MetricsSnapshot) -> None:
554 self._baseline = value
555 if self._current is not None:
556 self._diff = _subtract_metrics_snapshots(
557 self._current, self._baseline, self._diff_gauge,
558 )
559
560 @property
561 def current(self) -> metric_module.MetricsSnapshot:
562 assert self._current is not None, 'Set self.current first'
563 return self._current
564
565 @current.setter
566 def current(self, value: metric_module.MetricsSnapshot) -> None:
567 self._current = value
568 assert self._baseline is not None, 'Set self.baseline first'
569 self._diff = _subtract_metrics_snapshots(
570 self._current, self._baseline, self._diff_gauge,
571 )
572
573 @property
574 def diff(self) -> metric_module.MetricsSnapshot:
575 assert self._diff is not None, 'Set self.current first'
576 return self._diff
577
579 self,
580 subpath: typing.Optional[str] = None,
581 add_labels: typing.Optional[typing.Dict] = None,
582 *,
583 default: typing.Optional[float] = None,
584 ) -> metric_module.MetricValue:
585 """
586 Returns a single metric value at the specified path, prepending
587 the path provided at construction. If a dict of labels is provided,
588 does en exact match of labels, prepending the labels provided
589 at construction.
590
591 @param subpath Suffix of the metric path; the path provided
592 at construction is prepended
593 @param add_labels Labels that the metric must have in addition
594 to the labels provided at construction
595 @param default An optional default value in case the metric is missing
596 @throws AssertionError if not one metric by path
597 """
598 base_path = self._path or self._prefix
599 if base_path and subpath:
600 path = f'{base_path}.{subpath}'
601 else:
602 assert base_path or subpath, 'No path provided'
603 path = base_path or subpath or ''
604 labels: typing.Optional[dict] = None
605 if self._labels is not None or add_labels is not None:
606 labels = {**(self._labels or {}), **(add_labels or {})}
607 return self.diff.value_at(path, labels, default=default)
608
609 async def fetch(self) -> metric_module.MetricsSnapshot:
610 """
611 Fetches metric values from the service.
612 """
613 return await self._client.metrics(
614 path=self._path, prefix=self._prefix, labels=self._labels,
615 )
616
617 async def __aenter__(self) -> 'MetricsDiffer':
618 self._baseline = await self.fetch()
619 self._current = None
620 return self
621
622 async def __aexit__(self, exc_type, exc, exc_tb) -> None:
623 self.currentcurrentcurrent = await self.fetch()
624
625
626# @cond
627
628
629def _subtract_metrics_snapshots(
630 current: metric_module.MetricsSnapshot,
631 initial: metric_module.MetricsSnapshot,
632 diff_gauge: bool,
633) -> metric_module.MetricsSnapshot:
634 return metric_module.MetricsSnapshot({
635 path: {
636 _subtract_metrics(path, current_metric, initial, diff_gauge)
637 for current_metric in current_group
638 }
639 for path, current_group in current.items()
640 })
641
642
643def _subtract_metrics(
644 path: str,
645 current_metric: metric_module.Metric,
646 initial: metric_module.MetricsSnapshot,
647 diff_gauge: bool,
648) -> metric_module.Metric:
649 initial_group = initial.get(path, None)
650 if initial_group is None:
651 return current_metric
652 initial_metric = next(
653 (x for x in initial_group if x.labels == current_metric.labels), None,
654 )
655 if initial_metric is None:
656 return current_metric
657
658 return metric_module.Metric(
659 labels=current_metric.labels,
660 value=_subtract_metric_values(
661 current=current_metric,
662 initial=initial_metric,
663 diff_gauge=diff_gauge,
664 ),
665 _type=current_metric.type(),
666 )
667
668
669def _subtract_metric_values(
670 current: metric_module.Metric,
671 initial: metric_module.Metric,
672 diff_gauge: bool,
673) -> metric_module.MetricValue:
674 assert current.type() is not metric_module.MetricType.UNSPECIFIED
675 assert initial.type() is not metric_module.MetricType.UNSPECIFIED
676 assert current.type() == initial.type()
677
678 if isinstance(current.value, metric_module.Histogram):
679 assert isinstance(initial.value, metric_module.Histogram)
680 return _subtract_metric_values_hist(current=current, initial=initial)
681 else:
682 assert not isinstance(initial.value, metric_module.Histogram)
683 return _subtract_metric_values_num(
684 current=current, initial=initial, diff_gauge=diff_gauge,
685 )
686
687
688def _subtract_metric_values_num(
689 current: metric_module.Metric,
690 initial: metric_module.Metric,
691 diff_gauge: bool,
692) -> float:
693 current_value = typing.cast(float, current.value)
694 initial_value = typing.cast(float, initial.value)
695 should_diff = (
696 current.type() is metric_module.MetricType.RATE
697 or initial.type() is metric_module.MetricType.RATE
698 or diff_gauge
699 )
700 return current_value - initial_value if should_diff else current_value
701
702
703def _subtract_metric_values_hist(
704 current: metric_module.Metric, initial: metric_module.Metric,
705) -> metric_module.Histogram:
706 current_value = typing.cast(metric_module.Histogram, current.value)
707 initial_value = typing.cast(metric_module.Histogram, initial.value)
708 assert current_value.bounds == initial_value.bounds
709 return metric_module.Histogram(
710 bounds=current_value.bounds,
711 buckets=[
712 t[0] - t[1]
713 for t in zip(current_value.buckets, initial_value.buckets)
714 ],
715 inf=current_value.inf - initial_value.inf,
716 )
717
718
719class AiohttpClient(service_client.AiohttpClient):
720 PeriodicTaskFailed = PeriodicTaskFailed
721 TestsuiteActionFailed = TestsuiteActionFailed
722 TestsuiteTaskNotFound = TestsuiteTaskNotFound
723 TestsuiteTaskConflict = TestsuiteTaskConflict
724 TestsuiteTaskFailed = TestsuiteTaskFailed
725
726 def __init__(
727 self,
728 base_url: str,
729 *,
730 config: TestsuiteClientConfig,
731 mocked_time,
732 log_capture_fixture,
733 testpoint,
734 testpoint_control,
735 cache_invalidation_state,
736 span_id_header=None,
737 api_coverage_report=None,
738 periodic_tasks_state: typing.Optional[PeriodicTasksState] = None,
739 allow_all_caches_invalidation: bool = True,
740 cache_control: typing.Optional[caches.CacheControl] = None,
741 asyncexc_check=None,
742 **kwargs,
743 ):
744 super().__init__(base_url, span_id_header=span_id_header, **kwargs)
745 self._config = config
746 self._periodic_tasks = periodic_tasks_state
747 self._testpoint = testpoint
748 self._log_capture_fixture = log_capture_fixture
749 self._state_manager = _StateManager(
750 mocked_time=mocked_time,
751 testpoint=self._testpoint,
752 testpoint_control=testpoint_control,
753 invalidation_state=cache_invalidation_state,
754 cache_control=cache_control,
755 )
756 self._api_coverage_report = api_coverage_report
757 self._allow_all_caches_invalidation = allow_all_caches_invalidation
758 self._asyncexc_check = asyncexc_check
759
760 async def run_periodic_task(self, name):
761 response = await self._testsuite_action('run_periodic_task', name=name)
762 if not response['status']:
763 raise self.PeriodicTaskFailed(f'Periodic task {name} failed')
764
765 async def suspend_periodic_tasks(self, names: typing.List[str]) -> None:
766 if not self._periodic_tasks:
767 raise ConfigurationError('No periodic_tasks_state given')
768 self._periodic_tasks.tasks_to_suspend.update(names)
769 await self._suspend_periodic_tasks()
770
771 async def resume_periodic_tasks(self, names: typing.List[str]) -> None:
772 if not self._periodic_tasks:
773 raise ConfigurationError('No periodic_tasks_state given')
774 self._periodic_tasks.tasks_to_suspend.difference_update(names)
775 await self._suspend_periodic_tasks()
776
777 async def resume_all_periodic_tasks(self) -> None:
778 if not self._periodic_tasks:
779 raise ConfigurationError('No periodic_tasks_state given')
780 self._periodic_tasks.tasks_to_suspend.clear()
781 await self._suspend_periodic_tasks()
782
783 async def write_cache_dumps(
784 self, names: typing.List[str], *, testsuite_skip_prepare=False,
785 ) -> None:
786 await self._testsuite_action(
787 'write_cache_dumps',
788 names=names,
789 testsuite_skip_prepare=testsuite_skip_prepare,
790 )
791
792 async def read_cache_dumps(
793 self, names: typing.List[str], *, testsuite_skip_prepare=False,
794 ) -> None:
795 await self._testsuite_action(
796 'read_cache_dumps',
797 names=names,
798 testsuite_skip_prepare=testsuite_skip_prepare,
799 )
800
801 async def run_distlock_task(self, name: str) -> None:
802 await self.run_task(f'distlock/{name}')
803
804 async def reset_metrics(self) -> None:
805 await self._testsuite_action('reset_metrics')
806
807 async def metrics_portability(
808 self, *, prefix: typing.Optional[str] = None,
809 ) -> typing.Dict[str, typing.List[typing.Dict[str, str]]]:
810 return await self._testsuite_action(
811 'metrics_portability', prefix=prefix,
812 )
813
814 async def list_tasks(self) -> typing.List[str]:
815 response = await self._do_testsuite_action('tasks_list')
816 async with response:
817 response.raise_for_status()
818 body = await response.json(content_type=None)
819 return body['tasks']
820
821 async def run_task(self, name: str) -> None:
822 response = await self._do_testsuite_action(
823 'task_run', json={'name': name},
824 )
825 await _task_check_response(name, response)
826
827 @contextlib.asynccontextmanager
828 async def spawn_task(self, name: str):
829 task_id = await self._task_spawn(name)
830 try:
831 yield
832 finally:
833 await self._task_stop_spawned(task_id)
834
835 async def _task_spawn(self, name: str) -> str:
836 response = await self._do_testsuite_action(
837 'task_spawn', json={'name': name},
838 )
839 data = await _task_check_response(name, response)
840 return data['task_id']
841
842 async def _task_stop_spawned(self, task_id: str) -> None:
843 response = await self._do_testsuite_action(
844 'task_stop', json={'task_id': task_id},
845 )
846 await _task_check_response(task_id, response)
847
848 async def http_allowed_urls_extra(
849 self, http_allowed_urls_extra: typing.List[str],
850 ) -> None:
851 await self._do_testsuite_action(
852 'http_allowed_urls_extra',
853 json={'allowed_urls_extra': http_allowed_urls_extra},
854 testsuite_skip_prepare=True,
855 )
856
857 @contextlib.asynccontextmanager
858 async def capture_logs(
859 self, *, log_level: str = 'DEBUG', testsuite_skip_prepare: bool = False,
860 ):
861 async with self._log_capture_fixture.start_capture(
862 log_level=log_level,
863 ) as capture:
864 logger.debug('Starting logcapture')
865 await self._testsuite_action(
866 'log_capture',
867 log_level=log_level,
868 socket_logging_duplication=True,
869 testsuite_skip_prepare=testsuite_skip_prepare,
870 )
871
872 try:
873 await self._log_capture_fixture.wait_for_client()
874 yield capture
875 finally:
876 logger.debug('Finishing logcapture')
877 await self._testsuite_action(
878 'log_capture',
879 log_level=self._log_capture_fixture.default_log_level,
880 socket_logging_duplication=False,
881 testsuite_skip_prepare=testsuite_skip_prepare,
882 )
883
884 async def log_flush(self, logger_name: typing.Optional[str] = None):
885 await self._testsuite_action(
886 'log_flush', logger_name=logger_name, testsuite_skip_prepare=True,
887 )
888
889 async def invalidate_caches(
890 self,
891 *,
892 clean_update: bool = True,
893 cache_names: typing.Optional[typing.List[str]] = None,
894 testsuite_skip_prepare: bool = False,
895 ) -> None:
896 if cache_names is None and clean_update:
897 if self._allow_all_caches_invalidation:
898 warnings.warn(CACHE_INVALIDATION_MESSAGE, DeprecationWarning)
899 else:
900 __tracebackhide__ = True
901 raise RuntimeError(CACHE_INVALIDATION_MESSAGE)
902
903 if testsuite_skip_prepare:
904 await self._tests_control({
905 'invalidate_caches': {
906 'update_type': ('full' if clean_update else 'incremental'),
907 **({'names': cache_names} if cache_names else {}),
908 },
909 })
910 else:
911 await self.tests_control(
912 invalidate_caches=True,
913 clean_update=clean_update,
914 cache_names=cache_names,
915 )
916
917 async def tests_control(
918 self,
919 *,
920 invalidate_caches: bool = True,
921 clean_update: bool = True,
922 cache_names: typing.Optional[typing.List[str]] = None,
923 http_allowed_urls_extra=None,
924 ) -> typing.Dict[str, typing.Any]:
925 body: typing.Dict[str, typing.Any] = (
926 self._state_manager.get_pending_update()
927 )
928
929 if 'invalidate_caches' in body and invalidate_caches:
930 if not clean_update or cache_names:
931 logger.warning(
932 'Manual cache invalidation leads to indirect initial '
933 'full cache invalidation',
934 )
935 await self._prepare()
936 body = {}
937
938 if invalidate_caches:
939 body['invalidate_caches'] = {
940 'update_type': ('full' if clean_update else 'incremental'),
941 }
942 if cache_names:
943 body['invalidate_caches']['names'] = cache_names
944
945 if http_allowed_urls_extra is not None:
946 await self.http_allowed_urls_extra(http_allowed_urls_extra)
947
948 return await self._tests_control(body)
949
950 async def update_server_state(self) -> None:
951 await self._prepare()
952
953 async def enable_testpoints(self, *, no_auto_cache_cleanup=False) -> None:
954 if not self._testpoint:
955 return
956 if no_auto_cache_cleanup:
957 await self._tests_control({
958 'testpoints': sorted(self._testpoint.keys()),
959 })
960 else:
961 await self.update_server_state()
962
963 async def get_dynamic_config_defaults(
964 self,
965 ) -> typing.Dict[str, typing.Any]:
966 return await self._testsuite_action(
967 'get_dynamic_config_defaults', testsuite_skip_prepare=True,
968 )
969
970 async def _tests_control(self, body: dict) -> typing.Dict[str, typing.Any]:
971 with self._state_manager.updating_state(body):
972 async with await self._do_testsuite_action(
973 'control', json=body, testsuite_skip_prepare=True,
974 ) as response:
975 if response.status == 404:
976 raise ConfigurationError(
977 'It seems that testsuite support is not enabled '
978 'for your service',
979 )
980 response.raise_for_status()
981 return await response.json(content_type=None)
982
983 async def _suspend_periodic_tasks(self):
984 if (
985 self._periodic_tasks.tasks_to_suspend
986 != self._periodic_tasks.suspended_tasks
987 ):
988 await self._testsuite_action(
989 'suspend_periodic_tasks',
990 names=sorted(self._periodic_tasks.tasks_to_suspend),
991 )
992 self._periodic_tasks.suspended_tasks = set(
993 self._periodic_tasks.tasks_to_suspend,
994 )
995
996 def _do_testsuite_action(self, action, **kwargs):
997 if not self._config.testsuite_action_path:
998 raise ConfigurationError(
999 'tests-control component is not properly configured',
1000 )
1001 path = self._config.testsuite_action_path.format(action=action)
1002 return self.post(path, **kwargs)
1003
1004 async def _testsuite_action(
1005 self, action, *, testsuite_skip_prepare=False, **kwargs,
1006 ):
1007 async with await self._do_testsuite_action(
1008 action, json=kwargs, testsuite_skip_prepare=testsuite_skip_prepare,
1009 ) as response:
1010 if response.status == 500:
1011 raise TestsuiteActionFailed
1012 response.raise_for_status()
1013 return await response.json(content_type=None)
1014
1015 async def _prepare(self) -> None:
1016 with self._state_manager.cache_control_update() as pending_update:
1017 if pending_update:
1018 await self._tests_control(pending_update)
1019
1020 async def _request( # pylint: disable=arguments-differ
1021 self,
1022 http_method: str,
1023 path: str,
1024 headers: typing.Optional[typing.Dict[str, str]] = None,
1025 bearer: typing.Optional[str] = None,
1026 x_real_ip: typing.Optional[str] = None,
1027 *,
1028 testsuite_skip_prepare: bool = False,
1029 **kwargs,
1030 ) -> aiohttp.ClientResponse:
1031 if self._asyncexc_check:
1032 # Check for pending background exceptions before call.
1033 self._asyncexc_check()
1034
1035 if not testsuite_skip_prepare:
1036 await self._prepare()
1037
1038 response = await super()._request(
1039 http_method, path, headers, bearer, x_real_ip, **kwargs,
1040 )
1041 if self._api_coverage_report:
1042 self._api_coverage_report.update_usage_stat(
1043 path, http_method, response.status, response.content_type,
1044 )
1045
1046 if self._asyncexc_check:
1047 # Check for pending background exceptions after call.
1048 self._asyncexc_check()
1049
1050 return response
1051
1052
1053# @endcond
1054
1055
1057 """
1058 Asyncio userver client, typically retrieved from
1059 @ref service_client "plugins.service_client.service_client"
1060 fixture.
1061
1062 Compatible with werkzeug interface.
1063
1064 @ingroup userver_testsuite
1065 """
1066
1067 PeriodicTaskFailed = PeriodicTaskFailed
1068 TestsuiteActionFailed = TestsuiteActionFailed
1069 TestsuiteTaskNotFound = TestsuiteTaskNotFound
1070 TestsuiteTaskConflict = TestsuiteTaskConflict
1071 TestsuiteTaskFailed = TestsuiteTaskFailed
1072
1073 def _wrap_client_response(
1074 self, response: aiohttp.ClientResponse,
1075 ) -> typing.Awaitable[http.ClientResponse]:
1076 return http.wrap_client_response(
1077 response, json_loads=approx.json_loads,
1078 )
1079
1080 @_wrap_client_error
1081 async def run_periodic_task(self, name):
1082 await self._client.run_periodic_task(name)
1083
1084 @_wrap_client_error
1085 async def suspend_periodic_tasks(self, names: typing.List[str]) -> None:
1086 await self._client.suspend_periodic_tasks(names)
1087
1088 @_wrap_client_error
1089 async def resume_periodic_tasks(self, names: typing.List[str]) -> None:
1090 await self._client.resume_periodic_tasks(names)
1091
1092 @_wrap_client_error
1093 async def resume_all_periodic_tasks(self) -> None:
1094 await self._client.resume_all_periodic_tasks()
1095
1096 @_wrap_client_error
1097 async def write_cache_dumps(
1098 self, names: typing.List[str], *, testsuite_skip_prepare=False,
1099 ) -> None:
1100 await self._client.write_cache_dumps(
1101 names=names, testsuite_skip_prepare=testsuite_skip_prepare,
1102 )
1103
1104 @_wrap_client_error
1105 async def read_cache_dumps(
1106 self, names: typing.List[str], *, testsuite_skip_prepare=False,
1107 ) -> None:
1108 await self._client.read_cache_dumps(
1109 names=names, testsuite_skip_prepare=testsuite_skip_prepare,
1110 )
1111
1112 async def run_task(self, name: str) -> None:
1113 await self._client.run_task(name)
1114
1115 async def run_distlock_task(self, name: str) -> None:
1116 await self._client.run_distlock_task(name)
1117
1118 async def reset_metrics(self) -> None:
1119 """
1120 Calls `ResetMetric(metric);` for each metric that has such C++ function
1121 """
1122 await self._client.reset_metrics()
1123
1125 self, *, prefix: typing.Optional[str] = None,
1126 ) -> typing.Dict[str, typing.List[typing.Dict[str, str]]]:
1127 """
1128 Reports metrics related issues that could be encountered on
1129 different monitoring systems.
1130
1131 @sa @ref utils::statistics::GetPortabilityInfo
1132 """
1133 return await self._client.metrics_portability(prefix=prefix)
1134
1135 def list_tasks(self) -> typing.List[str]:
1136 return self._client.list_tasks()
1137
1138 def spawn_task(self, name: str):
1139 return self._client.spawn_task(name)
1140
1142 self, *, log_level: str = 'DEBUG', testsuite_skip_prepare: bool = False,
1143 ):
1144 """
1145 Captures logs from the service.
1146
1147 @param log_level Do not capture logs below this level.
1148
1149 @see @ref testsuite_logs_capture
1150 """
1151 return self._client.capture_logs(
1152 log_level=log_level, testsuite_skip_prepare=testsuite_skip_prepare,
1153 )
1154
1155 def log_flush(self, logger_name: typing.Optional[str] = None):
1156 """
1157 Flush service logs.
1158 """
1159 return self._client.log_flush(logger_name=logger_name)
1160
1161 @_wrap_client_error
1163 self,
1164 *,
1165 clean_update: bool = True,
1166 cache_names: typing.Optional[typing.List[str]] = None,
1167 testsuite_skip_prepare: bool = False,
1168 ) -> None:
1169 """
1170 Send request to service to update caches.
1171
1172 @param clean_update if False, service will do a faster incremental
1173 update of caches whenever possible.
1174 @param cache_names which caches specifically should be updated;
1175 update all if None.
1176 @param testsuite_skip_prepare if False, service will automatically do
1177 update_server_state().
1178 """
1179 __tracebackhide__ = True
1180 await self._client.invalidate_caches(
1181 clean_update=clean_update,
1182 cache_names=cache_names,
1183 testsuite_skip_prepare=testsuite_skip_prepare,
1184 )
1185
1186 @_wrap_client_error
1187 async def tests_control(
1188 self, *args, **kwargs,
1189 ) -> typing.Dict[str, typing.Any]:
1190 return await self._client.tests_control(*args, **kwargs)
1191
1192 @_wrap_client_error
1193 async def update_server_state(self) -> None:
1194 """
1195 Update service-side state through http call to 'tests/control':
1196 - clear dirty (from other tests) caches
1197 - set service-side mocked time,
1198 - resume / suspend periodic tasks
1199 - enable testpoints
1200 If service is up-to-date, does nothing.
1201 """
1202 await self._client.update_server_state()
1203
1204 @_wrap_client_error
1205 async def enable_testpoints(self, *args, **kwargs) -> None:
1206 """
1207 Send list of handled testpoint pats to service. For these paths service
1208 will no more skip http calls from TESTPOINT(...) macro.
1209
1210 @param no_auto_cache_cleanup prevent automatic cache cleanup.
1211 When calling service client first time in scope of current test, client
1212 makes additional http call to `tests/control` to update caches, to get
1213 rid of data from previous test.
1214 """
1215 await self._client.enable_testpoints(*args, **kwargs)
1216
1217 @_wrap_client_error
1218 async def get_dynamic_config_defaults(
1219 self,
1220 ) -> typing.Dict[str, typing.Any]:
1221 return await self._client.get_dynamic_config_defaults()
1222
1223
1224@dataclasses.dataclass
1226 """Reflects the (supposed) current service state."""
1227
1228 invalidation_state: caches.InvalidationState
1229 now: typing.Optional[str] = _UNKNOWN_STATE
1230 testpoints: typing.FrozenSet[str] = frozenset([_UNKNOWN_STATE])
1231
1232
1234 """
1235 Used for computing the requests that we need to automatically align
1236 the service state with the test fixtures state.
1237 """
1238
1239 def __init__(
1240 self,
1241 *,
1242 mocked_time,
1243 testpoint,
1244 testpoint_control,
1245 invalidation_state: caches.InvalidationState,
1246 cache_control: typing.Optional[caches.CacheControl],
1247 ):
1248 self._state = _State(
1249 invalidation_state=copy.deepcopy(invalidation_state),
1250 )
1251 self._mocked_time = mocked_time
1252 self._testpoint = testpoint
1253 self._testpoint_control = testpoint_control
1254 self._invalidation_state = invalidation_state
1255 self._cache_control = cache_control
1256
1257 @contextlib.contextmanager
1258 def updating_state(self, body: typing.Dict[str, typing.Any]):
1259 """
1260 Whenever `tests_control` handler is invoked
1261 (by the client itself during `prepare` or manually by the user),
1262 we need to synchronize `_state` with the (supposed) service state.
1263 The state update is decoded from the request body.
1264 """
1265 saved_state = copy.deepcopy(self._state)
1266 try:
1267 self._update_state(body)
1268 self._apply_new_state()
1269 yield
1270 except Exception: # noqa
1271 self._state = saved_state
1272 self._apply_new_state()
1273 raise
1274
1275 def get_pending_update(self) -> typing.Dict[str, typing.Any]:
1276 """
1277 Compose the body of the `tests_control` request required to completely
1278 synchronize the service state with the state of test fixtures.
1279 """
1280 body: typing.Dict[str, typing.Any] = {}
1281
1282 if self._invalidation_state.has_caches_to_update:
1283 body['invalidate_caches'] = {'update_type': 'full'}
1284 if not self._invalidation_state.should_update_all_caches:
1285 body['invalidate_caches']['names'] = list(
1286 self._invalidation_state.caches_to_update,
1287 )
1288
1289 desired_testpoints = self._testpoint.keys()
1290 if self._state.testpoints != frozenset(desired_testpoints):
1291 body['testpoints'] = sorted(desired_testpoints)
1292
1293 desired_now = self._get_desired_now()
1294 if self._state.now != desired_now:
1295 body['mock_now'] = desired_now
1296
1297 return body
1298
1299 @contextlib.contextmanager
1300 def cache_control_update(self) -> typing.ContextManager[typing.Dict]:
1301 pending_update = self.get_pending_update()
1302 invalidate_caches = pending_update.get('invalidate_caches')
1303 if not invalidate_caches or not self._cache_control:
1304 yield pending_update
1305 else:
1306 cache_names = invalidate_caches.get('names')
1307 staged, actions = self._cache_control.query_caches(cache_names)
1308 self._apply_cache_control_actions(invalidate_caches, actions)
1309 yield pending_update
1310 self._cache_control.commit_staged(staged)
1311
1312 @staticmethod
1313 def _apply_cache_control_actions(
1314 invalidate_caches: typing.Dict,
1315 actions: typing.List[typing.Tuple[str, caches.CacheControlAction]],
1316 ) -> None:
1317 cache_names = invalidate_caches.get('names')
1318 exclude_names = invalidate_caches.setdefault('exclude_names', [])
1319 force_incremental_names = invalidate_caches.setdefault(
1320 'force_incremental_names', [],
1321 )
1322 for cache_name, action in actions:
1323 if action == caches.CacheControlAction.INCREMENTAL:
1324 force_incremental_names.append(cache_name)
1325 elif action == caches.CacheControlAction.EXCLUDE:
1326 if cache_names is not None:
1327 cache_names.remove(cache_name)
1328 else:
1329 exclude_names.append(cache_name)
1330
1331 def _update_state(self, body: typing.Dict[str, typing.Any]) -> None:
1332 body_invalidate_caches = body.get('invalidate_caches', {})
1333 update_type = body_invalidate_caches.get('update_type', 'full')
1334 body_cache_names = body_invalidate_caches.get('names', None)
1335 # An incremental update is considered insufficient to bring a cache
1336 # to a known state.
1337 if body_invalidate_caches and update_type == 'full':
1338 if body_cache_names is None:
1339 self._state.invalidation_state.on_all_caches_updated()
1340 else:
1341 self._state.invalidation_state.on_caches_updated(
1342 body_cache_names,
1343 )
1344
1345 if 'mock_now' in body:
1346 self._state.now = body['mock_now']
1347
1348 testpoints: typing.Optional[typing.List[str]] = body.get(
1349 'testpoints', None,
1350 )
1351 if testpoints is not None:
1352 self._state.testpoints = frozenset(testpoints)
1353
1355 """Apply new state to related components."""
1356 self._testpoint_control.enabled_testpoints = self._state.testpoints
1357 self._invalidation_state.assign_copy(self._state.invalidation_state)
1358
1359 def _get_desired_now(self) -> typing.Optional[str]:
1360 if self._mocked_time.is_enabled:
1361 return utils.timestring(self._mocked_time.now())
1362 return None
1363
1364
1365async def _task_check_response(name: str, response) -> dict:
1366 async with response:
1367 if response.status == 404:
1368 raise TestsuiteTaskNotFound(f'Testsuite task {name!r} not found')
1369 if response.status == 409:
1370 raise TestsuiteTaskConflict(f'Testsuite task {name!r} conflict')
1371 assert response.status == 200
1372 data = await response.json()
1373 if not data.get('status', True):
1374 raise TestsuiteTaskFailed(name, data['reason'])
1375 return data