userver: /data/code/service_template/third_party/userver/testsuite/pytest_plugins/pytest_userver/client.py Source File
Loading...
Searching...
No Matches
client.py
1"""
2Python module that provides clients for functional tests with
3testsuite; see
4@ref scripts/docs/en/userver/functional_testing.md for an introduction.
5
6@ingroup userver_testsuite
7"""
8
9# pylint: disable=too-many-lines
10
11import contextlib
12import copy
13import dataclasses
14import json
15import logging
16import typing
17
18import aiohttp
19
20from testsuite import annotations
21from testsuite import utils
22from testsuite.daemons import service_client
23from testsuite.utils import approx
24from testsuite.utils import http
25
26import pytest_userver.metrics as metric_module # pylint: disable=import-error
27from pytest_userver.plugins import caches
28
29# @cond
30logger = logging.getLogger(__name__)
31# @endcond
32
33_UNKNOWN_STATE = '__UNKNOWN__'
34
35
36class BaseError(Exception):
37 """Base class for exceptions of this module."""
38
39
41 pass
42
43
45 pass
46
47
57 pass
58
59
61 pass
62
63
65 def __init__(self):
66 self.suspended_tasks: typing.Set[str] = set()
67 self.tasks_to_suspend: typing.Set[str] = set()
68
69
71 def __init__(self, name, reason):
72 self.name = name
73 self.reason = reason
74 super().__init__(f'Testsuite task {name!r} failed: {reason}')
75
76
77@dataclasses.dataclass(frozen=True)
79 testsuite_action_path: typing.Optional[str] = None
80 server_monitor_path: typing.Optional[str] = None
81
82
83Metric = metric_module.Metric
84
85
87 """
88 Base asyncio userver client that implements HTTP requests to service.
89
90 Compatible with werkzeug interface.
91
92 @ingroup userver_testsuite
93 """
94
95 def __init__(self, client):
96 self._client = client
97
98 async def post(
99 self,
100 path: str,
101 json: annotations.JsonAnyOptional = None,
102 data: typing.Any = None,
103 params: typing.Optional[typing.Dict[str, str]] = None,
104 bearer: typing.Optional[str] = None,
105 x_real_ip: typing.Optional[str] = None,
106 headers: typing.Optional[typing.Dict[str, str]] = None,
107 **kwargs,
108 ) -> http.ClientResponse:
109 """
110 Make a HTTP POST request
111 """
112 response = await self._client.post(
113 path,
114 json=json,
115 data=data,
116 params=params,
117 headers=headers,
118 bearer=bearer,
119 x_real_ip=x_real_ip,
120 **kwargs,
121 )
122 return await self._wrap_client_response(response)
123
124 async def put(
125 self,
126 path,
127 json: annotations.JsonAnyOptional = None,
128 data: typing.Any = None,
129 params: typing.Optional[typing.Dict[str, str]] = None,
130 bearer: typing.Optional[str] = None,
131 x_real_ip: typing.Optional[str] = None,
132 headers: typing.Optional[typing.Dict[str, str]] = None,
133 **kwargs,
134 ) -> http.ClientResponse:
135 """
136 Make a HTTP PUT request
137 """
138 response = await self._client.put(
139 path,
140 json=json,
141 data=data,
142 params=params,
143 headers=headers,
144 bearer=bearer,
145 x_real_ip=x_real_ip,
146 **kwargs,
147 )
148 return await self._wrap_client_response(response)
149
150 async def patch(
151 self,
152 path,
153 json: annotations.JsonAnyOptional = None,
154 data: typing.Any = None,
155 params: typing.Optional[typing.Dict[str, str]] = None,
156 bearer: typing.Optional[str] = None,
157 x_real_ip: typing.Optional[str] = None,
158 headers: typing.Optional[typing.Dict[str, str]] = None,
159 **kwargs,
160 ) -> http.ClientResponse:
161 """
162 Make a HTTP PATCH request
163 """
164 response = await self._client.patch(
165 path,
166 json=json,
167 data=data,
168 params=params,
169 headers=headers,
170 bearer=bearer,
171 x_real_ip=x_real_ip,
172 **kwargs,
173 )
174 return await self._wrap_client_response(response)
175
176 async def get(
177 self,
178 path: str,
179 headers: typing.Optional[typing.Dict[str, str]] = None,
180 bearer: typing.Optional[str] = None,
181 x_real_ip: typing.Optional[str] = None,
182 **kwargs,
183 ) -> http.ClientResponse:
184 """
185 Make a HTTP GET request
186 """
187 response = await self._client.get(
188 path,
189 headers=headers,
190 bearer=bearer,
191 x_real_ip=x_real_ip,
192 **kwargs,
193 )
194 return await self._wrap_client_response(response)
195
196 async def delete(
197 self,
198 path: str,
199 headers: typing.Optional[typing.Dict[str, str]] = None,
200 bearer: typing.Optional[str] = None,
201 x_real_ip: typing.Optional[str] = None,
202 **kwargs,
203 ) -> http.ClientResponse:
204 """
205 Make a HTTP DELETE request
206 """
207 response = await self._client.delete(
208 path,
209 headers=headers,
210 bearer=bearer,
211 x_real_ip=x_real_ip,
212 **kwargs,
213 )
214 return await self._wrap_client_response(response)
215
216 async def options(
217 self,
218 path: str,
219 headers: typing.Optional[typing.Dict[str, str]] = None,
220 bearer: typing.Optional[str] = None,
221 x_real_ip: typing.Optional[str] = None,
222 **kwargs,
223 ) -> http.ClientResponse:
224 """
225 Make a HTTP OPTIONS request
226 """
227 response = await self._client.options(
228 path,
229 headers=headers,
230 bearer=bearer,
231 x_real_ip=x_real_ip,
232 **kwargs,
233 )
234 return await self._wrap_client_response(response)
235
236 async def request(
237 self, http_method: str, path: str, **kwargs,
238 ) -> http.ClientResponse:
239 """
240 Make a HTTP request with the specified method
241 """
242 response = await self._client.request(http_method, path, **kwargs)
243 return await self._wrap_client_response(response)
244
245 def _wrap_client_response(
246 self, response: aiohttp.ClientResponse,
247 ) -> typing.Awaitable[http.ClientResponse]:
248 return http.wrap_client_response(response)
249
250
251# @cond
252
253
254def _wrap_client_error(func):
255 async def _wrapper(*args, **kwargs):
256 try:
257 return await func(*args, **kwargs)
258 except aiohttp.client_exceptions.ClientResponseError as exc:
259 raise http.HttpResponseError(
260 url=exc.request_info.url, status=exc.status,
261 )
262
263 return _wrapper
264
265
266class AiohttpClientMonitor(service_client.AiohttpClient):
267 _config: TestsuiteClientConfig
268
269 def __init__(self, base_url, *, config: TestsuiteClientConfig, **kwargs):
270 super().__init__(base_url, **kwargs)
271 self._config = config
272
273 async def get_metrics(self, prefix=None):
274 if not self._config.server_monitor_path:
275 raise ConfigurationError(
276 'handler-server-monitor component is not configured',
277 )
278 if prefix is not None:
279 params = {'prefix': prefix}
280 else:
281 params = None
282 response = await self.get(
283 self._config.server_monitor_path, params=params,
284 )
285 async with response:
286 response.raise_for_status()
287 return await response.json(content_type=None)
288
289 async def get_metric(self, metric_name):
290 metrics = await self.get_metrics(metric_name)
291 assert metric_name in metrics, (
292 f'No metric with name {metric_name!r}. '
293 f'Use "single_metric" function instead of "get_metric"'
294 )
295 return metrics[metric_name]
296
297 async def metrics_raw(
298 self,
299 output_format,
300 *,
301 path: str = None,
302 prefix: str = None,
303 labels: typing.Optional[typing.Dict[str, str]] = None,
304 ) -> str:
305 if not self._config.server_monitor_path:
306 raise ConfigurationError(
307 'handler-server-monitor component is not configured',
308 )
309
310 params = {'format': output_format}
311 if prefix:
312 params['prefix'] = prefix
313
314 if path:
315 params['path'] = path
316
317 if labels:
318 params['labels'] = json.dumps(labels)
319
320 response = await self.get(
321 self._config.server_monitor_path, params=params,
322 )
323 async with response:
324 response.raise_for_status()
325 return await response.text()
326
327 async def metrics(
328 self,
329 *,
330 path: str = None,
331 prefix: str = None,
332 labels: typing.Optional[typing.Dict[str, str]] = None,
333 ) -> metric_module.MetricsSnapshot:
334 response = await self.metrics_raw(
335 output_format='json', path=path, prefix=prefix, labels=labels,
336 )
337 return metric_module.MetricsSnapshot.from_json(str(response))
338
339 async def single_metric_optional(
340 self,
341 path: str,
342 *,
343 labels: typing.Optional[typing.Dict[str, str]] = None,
344 ) -> typing.Optional[Metric]:
345 response = await self.metrics(path=path, labels=labels)
346 metrics_list = response.get(path, [])
347
348 assert len(metrics_list) <= 1, (
349 f'More than one metric found for path {path} and labels {labels}: '
350 f'{response}',
351 )
352
353 if not metrics_list:
354 return None
355
356 return next(iter(metrics_list))
357
358 async def single_metric(
359 self,
360 path: str,
361 *,
362 labels: typing.Optional[typing.Dict[str, str]] = None,
363 ) -> Metric:
364 value = await self.single_metric_optional(path, labels=labels)
365 assert value is not None, (
366 f'No metric was found for path {path} and labels {labels}',
367 )
368 return value
369
370
371# @endcond
372
373
375 """
376 Asyncio userver client for monitor listeners, typically retrieved from
377 plugins.service_client.monitor_client fixture.
378
379 Compatible with werkzeug interface.
380
381 @ingroup userver_testsuite
382 """
383
385 self,
386 *,
387 path: typing.Optional[str] = None,
388 prefix: typing.Optional[str] = None,
389 labels: typing.Optional[typing.Dict[str, str]] = None,
390 diff_gauge: bool = False,
391 ) -> 'MetricsDiffer':
392 """
393 Creates a `MetricsDiffer` that fetches metrics using this client.
394 It's recommended to use this method over `metrics` to make sure
395 the tests don't affect each other.
396
397 With `diff_gauge` off, only RATE metrics are differentiated.
398 With `diff_gauge` on, GAUGE metrics are differentiated as well,
399 which may lead to nonsensical results for those.
400
401 @param path Optional full metric path
402 @param prefix Optional prefix on which the metric paths should start
403 @param labels Optional dictionary of labels that must be in the metric
404 @param diff_gauge Whether to differentiate GAUGE metrics
405
406 @code
407 async with monitor_client.metrics_diff(prefix='foo') as differ:
408 # Do something that makes the service update its metrics
409 assert differ.value_at('path-suffix', {'label'}) == 42
410 @endcode
411 """
412 return MetricsDiffer(
413 _client=self,
414 _path=path,
415 _prefix=prefix,
416 _labels=labels,
417 _diff_gauge=diff_gauge,
418 )
419
420 @_wrap_client_error
421 async def metrics(
422 self,
423 *,
424 path: typing.Optional[str] = None,
425 prefix: typing.Optional[str] = None,
426 labels: typing.Optional[typing.Dict[str, str]] = None,
427 ) -> metric_module.MetricsSnapshot:
428 """
429 Returns a dict of metric names to Metric.
430
431 @param path Optional full metric path
432 @param prefix Optional prefix on which the metric paths should start
433 @param labels Optional dictionary of labels that must be in the metric
434 """
435 return await self._client.metrics(
436 path=path, prefix=prefix, labels=labels,
437 )
438
439 @_wrap_client_error
441 self,
442 path: str,
443 *,
444 labels: typing.Optional[typing.Dict[str, str]] = None,
445 ) -> typing.Optional[Metric]:
446 """
447 Either return a Metric or None if there's no such metric.
448
449 @param path Full metric path
450 @param labels Optional dictionary of labels that must be in the metric
451
452 @throws AssertionError if more than one metric returned
453 """
454 return await self._client.single_metric_optional(path, labels=labels)
455
456 @_wrap_client_error
457 async def single_metric(
458 self,
459 path: str,
460 *,
461 labels: typing.Optional[typing.Dict[str, str]] = None,
462 ) -> typing.Optional[Metric]:
463 """
464 Returns the Metric.
465
466 @param path Full metric path
467 @param labels Optional dictionary of labels that must be in the metric
468
469 @throws AssertionError if more than one metric or no metric found
470 """
471 return await self._client.single_metric(path, labels=labels)
472
473 @_wrap_client_error
474 async def metrics_raw(
475 self,
476 output_format: str,
477 *,
478 path: typing.Optional[str] = None,
479 prefix: typing.Optional[str] = None,
480 labels: typing.Optional[typing.Dict[str, str]] = None,
481 ) -> typing.Dict[str, Metric]:
482 """
483 Low level function that returns metrics in a specific format.
484 Use `metrics` and `single_metric` instead if possible.
485
486 @param output_format Metric output format. See
487 server::handlers::ServerMonitor for a list of supported formats.
488 @param path Optional full metric path
489 @param prefix Optional prefix on which the metric paths should start
490 @param labels Optional dictionary of labels that must be in the metric
491 """
492 return await self._client.metrics_raw(
493 output_format=output_format,
494 path=path,
495 prefix=prefix,
496 labels=labels,
497 )
498
499 @_wrap_client_error
500 async def get_metrics(self, prefix=None):
501 """
502 @deprecated Use metrics() or single_metric() instead
503 """
504 return await self._client.get_metrics(prefix=prefix)
505
506 @_wrap_client_error
507 async def get_metric(self, metric_name):
508 """
509 @deprecated Use metrics() or single_metric() instead
510 """
511 return await self._client.get_metric(metric_name)
512
513
515 """
516 A helper class for computing metric differences.
517
518 @see ClientMonitor.metrics_diff
519 @ingroup userver_testsuite
520 """
521
522 # @cond
523 def __init__(
524 self,
525 _client: ClientMonitor,
526 _path: typing.Optional[str],
527 _prefix: typing.Optional[str],
528 _labels: typing.Optional[typing.Dict[str, str]],
529 _diff_gauge: bool,
530 ):
531 self._client = _client
532 self._path = _path
533 self._prefix = _prefix
534 self._labels = _labels
535 self._diff_gauge = _diff_gauge
536 self._baseline: typing.Optional[metric_module.MetricsSnapshot] = None
537 self._current: typing.Optional[metric_module.MetricsSnapshot] = None
538 self._diff: typing.Optional[metric_module.MetricsSnapshot] = None
539
540 # @endcond
541
542 @property
543 def baseline(self) -> metric_module.MetricsSnapshot:
544 assert self._baseline is not None
545 return self._baseline
546
547 @baseline.setter
548 def baseline(self, value: metric_module.MetricsSnapshot) -> None:
549 self._baseline = value
550 if self._current is not None:
551 self._diff = _subtract_metrics_snapshots(
552 self._current, self._baseline, self._diff_gauge,
553 )
554
555 @property
556 def current(self) -> metric_module.MetricsSnapshot:
557 assert self._current is not None, 'Set self.current first'
558 return self._current
559
560 @current.setter
561 def current(self, value: metric_module.MetricsSnapshot) -> None:
562 self._current = value
563 assert self._baseline is not None, 'Set self.baseline first'
564 self._diff = _subtract_metrics_snapshots(
565 self._current, self._baseline, self._diff_gauge,
566 )
567
568 @property
569 def diff(self) -> metric_module.MetricsSnapshot:
570 assert self._diff is not None, 'Set self.current first'
571 return self._diff
572
574 self,
575 subpath: typing.Optional[str] = None,
576 add_labels: typing.Optional[typing.Dict] = None,
577 *,
578 default: typing.Optional[float] = None,
579 ) -> float:
580 """
581 Returns a single metric value at the specified path, prepending
582 the path provided at construction. If a dict of labels is provided,
583 does en exact match of labels, prepending the labels provided
584 at construction.
585
586 @param subpath Suffix of the metric path; the path provided
587 at construction is prepended
588 @param add_labels Labels that the metric must have in addition
589 to the labels provided at construction
590 @param default An optional default value in case the metric is missing
591 @throws AssertionError if not one metric by path
592 """
593 base_path = self._path or self._prefix
594 if base_path and subpath:
595 path = f'{base_path}.{subpath}'
596 else:
597 assert base_path or subpath, 'No path provided'
598 path = base_path or subpath or ''
599 labels: typing.Optional[dict] = None
600 if self._labels is not None or add_labels is not None:
601 labels = {**(self._labels or {}), **(add_labels or {})}
602 return self.diff.value_at(path, labels, default=default)
603
604 async def fetch(self) -> metric_module.MetricsSnapshot:
605 """
606 Fetches metric values from the service.
607 """
608 return await self._client.metrics(
609 path=self._path, prefix=self._prefix, labels=self._labels,
610 )
611
612 async def __aenter__(self) -> 'MetricsDiffer':
613 self._baseline = await self.fetch()
614 self._current = None
615 return self
616
617 async def __aexit__(self, exc_type, exc, tb) -> None:
618 self.currentcurrentcurrent = await self.fetch()
619
620
621# @cond
622
623
624def _subtract_metrics_snapshots(
625 current: metric_module.MetricsSnapshot,
626 initial: metric_module.MetricsSnapshot,
627 diff_gauge: bool,
628) -> metric_module.MetricsSnapshot:
629 return metric_module.MetricsSnapshot(
630 {
631 path: {
632 _subtract_metrics(path, current_metric, initial, diff_gauge)
633 for current_metric in current_group
634 }
635 for path, current_group in current.items()
636 },
637 )
638
639
640def _subtract_metrics(
641 path: str,
642 current_metric: metric_module.Metric,
643 initial: metric_module.MetricsSnapshot,
644 diff_gauge: bool,
645) -> metric_module.Metric:
646 assert diff_gauge, 'diff_gauge=False is unimplemented'
647
648 initial_group = initial.get(path, None)
649 if initial_group is None:
650 return current_metric
651 initial_metric = next(
652 (x for x in initial_group if x.labels == current_metric.labels), None,
653 )
654 if initial_metric is None:
655 return current_metric
656
657 return metric_module.Metric(
658 labels=current_metric.labels,
659 value=current_metric.value - initial_metric.value,
660 )
661
662
663class AiohttpClient(service_client.AiohttpClient):
664 PeriodicTaskFailed = PeriodicTaskFailed
665 TestsuiteActionFailed = TestsuiteActionFailed
666 TestsuiteTaskNotFound = TestsuiteTaskNotFound
667 TestsuiteTaskConflict = TestsuiteTaskConflict
668 TestsuiteTaskFailed = TestsuiteTaskFailed
669
670 def __init__(
671 self,
672 base_url: str,
673 *,
674 config: TestsuiteClientConfig,
675 mocked_time,
676 log_capture_fixture,
677 testpoint,
678 testpoint_control,
679 cache_invalidation_state,
680 span_id_header=None,
681 api_coverage_report=None,
682 periodic_tasks_state: typing.Optional[PeriodicTasksState] = None,
683 **kwargs,
684 ):
685 super().__init__(base_url, span_id_header=span_id_header, **kwargs)
686 self._config = config
687 self._periodic_tasks = periodic_tasks_state
688 self._testpoint = testpoint
689 self._log_capture_fixture = log_capture_fixture
690 self._state_manager = _StateManager(
691 mocked_time=mocked_time,
692 testpoint=self._testpoint,
693 testpoint_control=testpoint_control,
694 invalidation_state=cache_invalidation_state,
695 )
696 self._api_coverage_report = api_coverage_report
697
698 async def run_periodic_task(self, name):
699 response = await self._testsuite_action('run_periodic_task', name=name)
700 if not response['status']:
701 raise self.PeriodicTaskFailed(f'Periodic task {name} failed')
702
703 async def suspend_periodic_tasks(self, names: typing.List[str]) -> None:
704 if not self._periodic_tasks:
705 raise ConfigurationError('No periodic_tasks_state given')
706 self._periodic_tasks.tasks_to_suspend.update(names)
707 await self._suspend_periodic_tasks()
708
709 async def resume_periodic_tasks(self, names: typing.List[str]) -> None:
710 if not self._periodic_tasks:
711 raise ConfigurationError('No periodic_tasks_state given')
712 self._periodic_tasks.tasks_to_suspend.difference_update(names)
713 await self._suspend_periodic_tasks()
714
715 async def resume_all_periodic_tasks(self) -> None:
716 if not self._periodic_tasks:
717 raise ConfigurationError('No periodic_tasks_state given')
718 self._periodic_tasks.tasks_to_suspend.clear()
719 await self._suspend_periodic_tasks()
720
721 async def write_cache_dumps(
722 self, names: typing.List[str], *, testsuite_skip_prepare=False,
723 ) -> None:
724 await self._testsuite_action(
725 'write_cache_dumps',
726 names=names,
727 testsuite_skip_prepare=testsuite_skip_prepare,
728 )
729
730 async def read_cache_dumps(
731 self, names: typing.List[str], *, testsuite_skip_prepare=False,
732 ) -> None:
733 await self._testsuite_action(
734 'read_cache_dumps',
735 names=names,
736 testsuite_skip_prepare=testsuite_skip_prepare,
737 )
738
739 async def run_distlock_task(self, name: str) -> None:
740 await self.run_task(f'distlock/{name}')
741
742 async def reset_metrics(self) -> None:
743 await self._testsuite_action('reset_metrics')
744
745 async def metrics_portability(
746 self, *, prefix: typing.Optional[str] = None,
747 ) -> typing.Dict[str, typing.List[typing.Dict[str, str]]]:
748 return await self._testsuite_action(
749 'metrics_portability', prefix=prefix,
750 )
751
752 async def list_tasks(self) -> typing.List[str]:
753 response = await self._do_testsuite_action('tasks_list')
754 async with response:
755 response.raise_for_status()
756 body = await response.json(content_type=None)
757 return body['tasks']
758
759 async def run_task(self, name: str) -> None:
760 response = await self._do_testsuite_action(
761 'task_run', json={'name': name},
762 )
763 await _task_check_response(name, response)
764
765 @contextlib.asynccontextmanager
766 async def spawn_task(self, name: str):
767 task_id = await self._task_spawn(name)
768 try:
769 yield
770 finally:
771 await self._task_stop_spawned(task_id)
772
773 async def _task_spawn(self, name: str) -> str:
774 response = await self._do_testsuite_action(
775 'task_spawn', json={'name': name},
776 )
777 data = await _task_check_response(name, response)
778 return data['task_id']
779
780 async def _task_stop_spawned(self, task_id: str) -> None:
781 response = await self._do_testsuite_action(
782 'task_stop', json={'task_id': task_id},
783 )
784 await _task_check_response(task_id, response)
785
786 async def http_allowed_urls_extra(
787 self, http_allowed_urls_extra: typing.List[str],
788 ) -> None:
789 await self._do_testsuite_action(
790 'http_allowed_urls_extra',
791 json={'allowed_urls_extra': http_allowed_urls_extra},
792 testsuite_skip_prepare=True,
793 )
794
795 @contextlib.asynccontextmanager
796 async def capture_logs(self):
797 async with self._log_capture_fixture.start_capture() as capture:
798 await self._testsuite_action(
799 'log_capture', socket_logging_duplication=True,
800 )
801 try:
802 yield capture
803 finally:
804 await self._testsuite_action(
805 'log_capture', socket_logging_duplication=False,
806 )
807
808 async def invalidate_caches(
809 self,
810 *,
811 clean_update: bool = True,
812 cache_names: typing.Optional[typing.List[str]] = None,
813 ) -> None:
814 await self.tests_control(
815 invalidate_caches=True,
816 clean_update=clean_update,
817 cache_names=cache_names,
818 )
819
820 async def tests_control(
821 self,
822 *,
823 invalidate_caches: bool = True,
824 clean_update: bool = True,
825 cache_names: typing.Optional[typing.List[str]] = None,
826 http_allowed_urls_extra=None,
827 ) -> typing.Dict[str, typing.Any]:
828 body: typing.Dict[
829 str, typing.Any,
830 ] = self._state_manager.get_pending_update()
831
832 if 'invalidate_caches' in body and invalidate_caches:
833 if not clean_update or cache_names:
834 logger.warning(
835 'Manual cache invalidation leads to indirect initial '
836 'full cache invalidation',
837 )
838 await self._prepare()
839 body = {}
840
841 if invalidate_caches:
842 body['invalidate_caches'] = {
843 'update_type': ('full' if clean_update else 'incremental'),
844 }
845 if cache_names:
846 body['invalidate_caches']['names'] = cache_names
847
848 if http_allowed_urls_extra is not None:
849 await self.http_allowed_urls_extra(http_allowed_urls_extra)
850
851 return await self._tests_control(body)
852
853 async def update_server_state(self) -> None:
854 await self._prepare()
855
856 async def enable_testpoints(self, *, no_auto_cache_cleanup=False) -> None:
857 if not self._testpoint:
858 return
859 if no_auto_cache_cleanup:
860 await self._tests_control(
861 {'testpoints': sorted(self._testpoint.keys())},
862 )
863 else:
864 await self.update_server_state()
865
866 async def _tests_control(self, body: dict) -> typing.Dict[str, typing.Any]:
867 with self._state_manager.updating_state(body):
868 async with await self._do_testsuite_action(
869 'control', json=body, testsuite_skip_prepare=True,
870 ) as response:
871 if response.status == 404:
872 raise ConfigurationError(
873 'It seems that testsuite support is not enabled '
874 'for your service',
875 )
876 response.raise_for_status()
877 return await response.json(content_type=None)
878
879 async def _suspend_periodic_tasks(self):
880 if (
881 self._periodic_tasks.tasks_to_suspend
882 != self._periodic_tasks.suspended_tasks
883 ):
884 await self._testsuite_action(
885 'suspend_periodic_tasks',
886 names=sorted(self._periodic_tasks.tasks_to_suspend),
887 )
888 self._periodic_tasks.suspended_tasks = set(
889 self._periodic_tasks.tasks_to_suspend,
890 )
891
892 def _do_testsuite_action(self, action, **kwargs):
893 if not self._config.testsuite_action_path:
894 raise ConfigurationError(
895 'tests-control component is not properly configured',
896 )
897 path = self._config.testsuite_action_path.format(action=action)
898 return self.post(path, **kwargs)
899
900 async def _testsuite_action(
901 self, action, *, testsuite_skip_prepare=False, **kwargs,
902 ):
903 async with await self._do_testsuite_action(
904 action,
905 json=kwargs,
906 testsuite_skip_prepare=testsuite_skip_prepare,
907 ) as response:
908 if response.status == 500:
909 raise TestsuiteActionFailed
910 response.raise_for_status()
911 return await response.json(content_type=None)
912
913 async def _prepare(self) -> None:
914 pending_update = self._state_manager.get_pending_update()
915 if pending_update:
916 await self._tests_control(pending_update)
917
918 async def _request( # pylint: disable=arguments-differ
919 self,
920 http_method: str,
921 path: str,
922 headers: typing.Optional[typing.Dict[str, str]] = None,
923 bearer: typing.Optional[str] = None,
924 x_real_ip: typing.Optional[str] = None,
925 *,
926 testsuite_skip_prepare: bool = False,
927 **kwargs,
928 ) -> aiohttp.ClientResponse:
929 if not testsuite_skip_prepare:
930 await self._prepare()
931
932 response = await super()._request(
933 http_method, path, headers, bearer, x_real_ip, **kwargs,
934 )
935 if self._api_coverage_report:
936 self._api_coverage_report.update_usage_stat(
937 path, http_method, response.status, response.content_type,
938 )
939
940 return response
941
942
943# @endcond
944
945
947 """
948 Asyncio userver client, typically retrieved from
949 @ref service_client "plugins.service_client.service_client"
950 fixture.
951
952 Compatible with werkzeug interface.
953
954 @ingroup userver_testsuite
955 """
956
957 PeriodicTaskFailed = PeriodicTaskFailed
958 TestsuiteActionFailed = TestsuiteActionFailed
959 TestsuiteTaskNotFound = TestsuiteTaskNotFound
960 TestsuiteTaskConflict = TestsuiteTaskConflict
961 TestsuiteTaskFailed = TestsuiteTaskFailed
962
963 def _wrap_client_response(
964 self, response: aiohttp.ClientResponse,
965 ) -> typing.Awaitable[http.ClientResponse]:
966 return http.wrap_client_response(
967 response, json_loads=approx.json_loads,
968 )
969
970 @_wrap_client_error
971 async def run_periodic_task(self, name):
972 await self._client.run_periodic_task(name)
973
974 @_wrap_client_error
975 async def suspend_periodic_tasks(self, names: typing.List[str]) -> None:
976 await self._client.suspend_periodic_tasks(names)
977
978 @_wrap_client_error
979 async def resume_periodic_tasks(self, names: typing.List[str]) -> None:
980 await self._client.resume_periodic_tasks(names)
981
982 @_wrap_client_error
983 async def resume_all_periodic_tasks(self) -> None:
984 await self._client.resume_all_periodic_tasks()
985
986 @_wrap_client_error
987 async def write_cache_dumps(
988 self, names: typing.List[str], *, testsuite_skip_prepare=False,
989 ) -> None:
990 await self._client.write_cache_dumps(
991 names=names, testsuite_skip_prepare=testsuite_skip_prepare,
992 )
993
994 @_wrap_client_error
995 async def read_cache_dumps(
996 self, names: typing.List[str], *, testsuite_skip_prepare=False,
997 ) -> None:
998 await self._client.read_cache_dumps(
999 names=names, testsuite_skip_prepare=testsuite_skip_prepare,
1000 )
1001
1002 async def run_task(self, name: str) -> None:
1003 await self._client.run_task(name)
1004
1005 async def run_distlock_task(self, name: str) -> None:
1006 await self._client.run_distlock_task(name)
1007
1008 async def reset_metrics(self) -> None:
1009 """
1010 Calls `ResetMetric(metric);` for each metric that has such C++ function
1011 """
1012 await self._client.reset_metrics()
1013
1015 self, *, prefix: typing.Optional[str] = None,
1016 ) -> typing.Dict[str, typing.List[typing.Dict[str, str]]]:
1017 """
1018 Reports metrics related issues that could be encountered on
1019 different monitoring systems.
1020
1021 @sa @ref utils::statistics::GetPortabilityInfo
1022 """
1023 return await self._client.metrics_portability(prefix=prefix)
1024
1025 def list_tasks(self) -> typing.List[str]:
1026 return self._client.list_tasks()
1027
1028 def spawn_task(self, name: str):
1029 return self._client.spawn_task(name)
1030
1031 def capture_logs(self):
1032 return self._client.capture_logs()
1033
1034 @_wrap_client_error
1036 self,
1037 *,
1038 clean_update: bool = True,
1039 cache_names: typing.Optional[typing.List[str]] = None,
1040 ) -> None:
1041 """
1042 Send request to service to update caches.
1043
1044 @param clean_update if False, service will do a faster incremental
1045 update of caches whenever possible.
1046 @param cache_names which caches specifically should be updated;
1047 update all if None.
1048 """
1049 await self._client.invalidate_caches(
1050 clean_update=clean_update, cache_names=cache_names,
1051 )
1052
1053 @_wrap_client_error
1054 async def tests_control(
1055 self, *args, **kwargs,
1056 ) -> typing.Dict[str, typing.Any]:
1057 return await self._client.tests_control(*args, **kwargs)
1058
1059 @_wrap_client_error
1060 async def update_server_state(self) -> None:
1061 """
1062 Update service-side state through http call to 'tests/control':
1063 - clear dirty (from other tests) caches
1064 - set service-side mocked time,
1065 - resume / suspend periodic tasks
1066 - enable testpoints
1067 If service is up-to-date, does nothing.
1068 """
1069 await self._client.update_server_state()
1070
1071 @_wrap_client_error
1072 async def enable_testpoints(self, *args, **kwargs) -> None:
1073 """
1074 Send list of handled testpoint pats to service. For these paths service
1075 will no more skip http calls from TESTPOINT(...) macro.
1076
1077 @param no_auto_cache_cleanup prevent automatic cache cleanup.
1078 When calling service client first time in scope of current test, client
1079 makes additional http call to `tests/control` to update caches, to get
1080 rid of data from previous test.
1081 """
1082 await self._client.enable_testpoints(*args, **kwargs)
1083
1084
1085@dataclasses.dataclass
1087 """Reflects the (supposed) current service state."""
1088
1089 invalidation_state: caches.InvalidationState
1090 now: typing.Optional[str] = _UNKNOWN_STATE
1091 testpoints: typing.FrozenSet[str] = frozenset([_UNKNOWN_STATE])
1092
1093
1095 """
1096 Used for computing the requests that we need to automatically align
1097 the service state with the test fixtures state.
1098 """
1099
1100 def __init__(
1101 self,
1102 *,
1103 mocked_time,
1104 testpoint,
1105 testpoint_control,
1106 invalidation_state: caches.InvalidationState,
1107 ):
1108 self._state = _State(
1109 invalidation_state=copy.deepcopy(invalidation_state),
1110 )
1111 self._mocked_time = mocked_time
1112 self._testpoint = testpoint
1113 self._testpoint_control = testpoint_control
1114 self._invalidation_state = invalidation_state
1115
1116 @contextlib.contextmanager
1117 def updating_state(self, body: typing.Dict[str, typing.Any]):
1118 """
1119 Whenever `tests_control` handler is invoked
1120 (by the client itself during `prepare` or manually by the user),
1121 we need to synchronize `_state` with the (supposed) service state.
1122 The state update is decoded from the request body.
1123 """
1124 saved_state = copy.deepcopy(self._state)
1125 try:
1126 self._update_state(body)
1127 self._apply_new_state()
1128 yield
1129 except Exception: # noqa
1130 self._state = saved_state
1131 self._apply_new_state()
1132 raise
1133
1134 def get_pending_update(self) -> typing.Dict[str, typing.Any]:
1135 """
1136 Compose the body of the `tests_control` request required to completely
1137 synchronize the service state with the state of test fixtures.
1138 """
1139 body: typing.Dict[str, typing.Any] = {}
1140
1141 if self._invalidation_state.has_caches_to_update:
1142 body['invalidate_caches'] = {'update_type': 'full'}
1143 if not self._invalidation_state.should_update_all_caches:
1144 body['invalidate_caches']['names'] = list(
1145 self._invalidation_state.caches_to_update,
1146 )
1147
1148 desired_testpoints = self._testpoint.keys()
1149 if self._state.testpoints != frozenset(desired_testpoints):
1150 body['testpoints'] = sorted(desired_testpoints)
1151
1152 desired_now = self._get_desired_now()
1153 if self._state.now != desired_now:
1154 body['mock_now'] = desired_now
1155
1156 return body
1157
1158 def _update_state(self, body: typing.Dict[str, typing.Any]) -> None:
1159 body_invalidate_caches = body.get('invalidate_caches', {})
1160 update_type = body_invalidate_caches.get('update_type', 'full')
1161 body_cache_names = body_invalidate_caches.get('names', None)
1162 # An incremental update is considered insufficient to bring a cache
1163 # to a known state.
1164 if body_invalidate_caches and update_type == 'full':
1165 if body_cache_names is None:
1166 self._state.invalidation_state.on_all_caches_updated()
1167 else:
1168 self._state.invalidation_state.on_caches_updated(
1169 body_cache_names,
1170 )
1171
1172 if 'mock_now' in body:
1173 self._state.now = body['mock_now']
1174
1175 testpoints: typing.Optional[typing.List[str]] = body.get(
1176 'testpoints', None,
1177 )
1178 if testpoints is not None:
1179 self._state.testpoints = frozenset(testpoints)
1180
1182 """Apply new state to related components."""
1183 self._testpoint_control.enabled_testpoints = self._state.testpoints
1184 self._invalidation_state.assign_copy(self._state.invalidation_state)
1185
1186 def _get_desired_now(self) -> typing.Optional[str]:
1187 if self._mocked_time.is_enabled:
1188 return utils.timestring(self._mocked_time.now())
1189 return None
1190
1191
1192async def _task_check_response(name: str, response) -> dict:
1193 async with response:
1194 if response.status == 404:
1195 raise TestsuiteTaskNotFound(f'Testsuite task {name!r} not found')
1196 if response.status == 409:
1197 raise TestsuiteTaskConflict(f'Testsuite task {name!r} conflict')
1198 assert response.status == 200
1199 data = await response.json()
1200 if not data.get('status', True):
1201 raise TestsuiteTaskFailed(name, data['reason'])
1202 return data