userver: /data/code/service_template/third_party/userver/testsuite/pytest_plugins/pytest_userver/client.py Source File
⚠️ This is the documentation for an old userver version. Click here to switch to the latest version.
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages Concepts
client.py
1"""
2Python module that provides clients for functional tests with
3testsuite; see
4@ref scripts/docs/en/userver/functional_testing.md for an introduction.
5
6@ingroup userver_testsuite
7"""
8
9# pylint: disable=too-many-lines
10
11import contextlib
12import copy
13import dataclasses
14import json
15import logging
16import typing
17
18import aiohttp
19
20from testsuite import annotations
21from testsuite import utils
22from testsuite.daemons import service_client
23from testsuite.utils import approx
24from testsuite.utils import http
25
26import pytest_userver.metrics as metric_module # pylint: disable=import-error
27from pytest_userver.plugins import caches
28
29# @cond
30logger = logging.getLogger(__name__)
31# @endcond
32
33_UNKNOWN_STATE = '__UNKNOWN__'
34
35
36class BaseError(Exception):
37 """Base class for exceptions of this module."""
38
39
41 pass
42
43
45 pass
46
47
57 pass
58
59
61 pass
62
63
65 def __init__(self):
66 self.suspended_tasks: typing.Set[str] = set()
67 self.tasks_to_suspend: typing.Set[str] = set()
68
69
71 def __init__(self, name, reason):
72 self.name = name
73 self.reason = reason
74 super().__init__(f'Testsuite task {name!r} failed: {reason}')
75
76
77@dataclasses.dataclass(frozen=True)
79 testsuite_action_path: typing.Optional[str] = None
80 server_monitor_path: typing.Optional[str] = None
81
82
83Metric = metric_module.Metric
84
85
87 """
88 Base asyncio userver client that implements HTTP requests to service.
89
90 Compatible with werkzeug interface.
91
92 @ingroup userver_testsuite
93 """
94
95 def __init__(self, client):
96 self._client = client
97
98 async def post(
99 self,
100 path: str,
101 json: annotations.JsonAnyOptional = None,
102 data: typing.Any = None,
103 params: typing.Optional[typing.Dict[str, str]] = None,
104 bearer: typing.Optional[str] = None,
105 x_real_ip: typing.Optional[str] = None,
106 headers: typing.Optional[typing.Dict[str, str]] = None,
107 **kwargs,
108 ) -> http.ClientResponse:
109 """
110 Make a HTTP POST request
111 """
112 response = await self._client.post(
113 path,
114 json=json,
115 data=data,
116 params=params,
117 headers=headers,
118 bearer=bearer,
119 x_real_ip=x_real_ip,
120 **kwargs,
121 )
122 return await self._wrap_client_response(response)
123
124 async def put(
125 self,
126 path,
127 json: annotations.JsonAnyOptional = None,
128 data: typing.Any = None,
129 params: typing.Optional[typing.Dict[str, str]] = None,
130 bearer: typing.Optional[str] = None,
131 x_real_ip: typing.Optional[str] = None,
132 headers: typing.Optional[typing.Dict[str, str]] = None,
133 **kwargs,
134 ) -> http.ClientResponse:
135 """
136 Make a HTTP PUT request
137 """
138 response = await self._client.put(
139 path,
140 json=json,
141 data=data,
142 params=params,
143 headers=headers,
144 bearer=bearer,
145 x_real_ip=x_real_ip,
146 **kwargs,
147 )
148 return await self._wrap_client_response(response)
149
150 async def patch(
151 self,
152 path,
153 json: annotations.JsonAnyOptional = None,
154 data: typing.Any = None,
155 params: typing.Optional[typing.Dict[str, str]] = None,
156 bearer: typing.Optional[str] = None,
157 x_real_ip: typing.Optional[str] = None,
158 headers: typing.Optional[typing.Dict[str, str]] = None,
159 **kwargs,
160 ) -> http.ClientResponse:
161 """
162 Make a HTTP PATCH request
163 """
164 response = await self._client.patch(
165 path,
166 json=json,
167 data=data,
168 params=params,
169 headers=headers,
170 bearer=bearer,
171 x_real_ip=x_real_ip,
172 **kwargs,
173 )
174 return await self._wrap_client_response(response)
175
176 async def get(
177 self,
178 path: str,
179 headers: typing.Optional[typing.Dict[str, str]] = None,
180 bearer: typing.Optional[str] = None,
181 x_real_ip: typing.Optional[str] = None,
182 **kwargs,
183 ) -> http.ClientResponse:
184 """
185 Make a HTTP GET request
186 """
187 response = await self._client.get(
188 path,
189 headers=headers,
190 bearer=bearer,
191 x_real_ip=x_real_ip,
192 **kwargs,
193 )
194 return await self._wrap_client_response(response)
195
196 async def delete(
197 self,
198 path: str,
199 headers: typing.Optional[typing.Dict[str, str]] = None,
200 bearer: typing.Optional[str] = None,
201 x_real_ip: typing.Optional[str] = None,
202 **kwargs,
203 ) -> http.ClientResponse:
204 """
205 Make a HTTP DELETE request
206 """
207 response = await self._client.delete(
208 path,
209 headers=headers,
210 bearer=bearer,
211 x_real_ip=x_real_ip,
212 **kwargs,
213 )
214 return await self._wrap_client_response(response)
215
216 async def options(
217 self,
218 path: str,
219 headers: typing.Optional[typing.Dict[str, str]] = None,
220 bearer: typing.Optional[str] = None,
221 x_real_ip: typing.Optional[str] = None,
222 **kwargs,
223 ) -> http.ClientResponse:
224 """
225 Make a HTTP OPTIONS request
226 """
227 response = await self._client.options(
228 path,
229 headers=headers,
230 bearer=bearer,
231 x_real_ip=x_real_ip,
232 **kwargs,
233 )
234 return await self._wrap_client_response(response)
235
236 async def request(
237 self, http_method: str, path: str, **kwargs,
238 ) -> http.ClientResponse:
239 """
240 Make a HTTP request with the specified method
241 """
242 response = await self._client.request(http_method, path, **kwargs)
243 return await self._wrap_client_response(response)
244
245 def _wrap_client_response(
246 self, response: aiohttp.ClientResponse,
247 ) -> typing.Awaitable[http.ClientResponse]:
248 return http.wrap_client_response(response)
249
250
251# @cond
252
253
254def _wrap_client_error(func):
255 async def _wrapper(*args, **kwargs):
256 try:
257 return await func(*args, **kwargs)
258 except aiohttp.client_exceptions.ClientResponseError as exc:
259 raise http.HttpResponseError(
260 url=exc.request_info.url, status=exc.status,
261 )
262
263 return _wrapper
264
265
266class AiohttpClientMonitor(service_client.AiohttpClient):
267 _config: TestsuiteClientConfig
268
269 def __init__(self, base_url, *, config: TestsuiteClientConfig, **kwargs):
270 super().__init__(base_url, **kwargs)
271 self._config = config
272
273 async def get_metrics(self, prefix=None):
274 if not self._config.server_monitor_path:
275 raise ConfigurationError(
276 'handler-server-monitor component is not configured',
277 )
278 if prefix is not None:
279 params = {'prefix': prefix}
280 else:
281 params = None
282 response = await self.get(
283 self._config.server_monitor_path, params=params,
284 )
285 async with response:
286 response.raise_for_status()
287 return await response.json(content_type=None)
288
289 async def get_metric(self, metric_name):
290 metrics = await self.get_metrics(metric_name)
291 assert metric_name in metrics, (
292 f'No metric with name {metric_name!r}. '
293 f'Use "single_metric" function instead of "get_metric"'
294 )
295 return metrics[metric_name]
296
297 async def metrics_raw(
298 self,
299 output_format,
300 *,
301 path: str = None,
302 prefix: str = None,
303 labels: typing.Optional[typing.Dict[str, str]] = None,
304 ) -> str:
305 if not self._config.server_monitor_path:
306 raise ConfigurationError(
307 'handler-server-monitor component is not configured',
308 )
309
310 params = {'format': output_format}
311 if prefix:
312 params['prefix'] = prefix
313
314 if path:
315 params['path'] = path
316
317 if labels:
318 params['labels'] = json.dumps(labels)
319
320 response = await self.get(
321 self._config.server_monitor_path, params=params,
322 )
323 async with response:
324 response.raise_for_status()
325 return await response.text()
326
327 async def metrics(
328 self,
329 *,
330 path: str = None,
331 prefix: str = None,
332 labels: typing.Optional[typing.Dict[str, str]] = None,
333 ) -> metric_module.MetricsSnapshot:
334 response = await self.metrics_raw(
335 output_format='json', path=path, prefix=prefix, labels=labels,
336 )
337 return metric_module.MetricsSnapshot.from_json(str(response))
338
339 async def single_metric_optional(
340 self,
341 path: str,
342 *,
343 labels: typing.Optional[typing.Dict[str, str]] = None,
344 ) -> typing.Optional[Metric]:
345 response = await self.metrics(path=path, labels=labels)
346 metrics_list = response.get(path, [])
347
348 assert len(metrics_list) <= 1, (
349 f'More than one metric found for path {path} and labels {labels}: '
350 f'{response}',
351 )
352
353 if not metrics_list:
354 return None
355
356 return next(iter(metrics_list))
357
358 async def single_metric(
359 self,
360 path: str,
361 *,
362 labels: typing.Optional[typing.Dict[str, str]] = None,
363 ) -> Metric:
364 value = await self.single_metric_optional(path, labels=labels)
365 assert value is not None, (
366 f'No metric was found for path {path} and labels {labels}',
367 )
368 return value
369
370
371# @endcond
372
373
375 """
376 Asyncio userver client for monitor listeners, typically retrieved from
377 plugins.service_client.monitor_client fixture.
378
379 Compatible with werkzeug interface.
380
381 @ingroup userver_testsuite
382 """
383
385 self,
386 *,
387 path: typing.Optional[str] = None,
388 prefix: typing.Optional[str] = None,
389 labels: typing.Optional[typing.Dict[str, str]] = None,
390 diff_gauge: bool = False,
391 ) -> 'MetricsDiffer':
392 """
393 Creates a `MetricsDiffer` that fetches metrics using this client.
394 It's recommended to use this method over `metrics` to make sure
395 the tests don't affect each other.
396
397 With `diff_gauge` off, only RATE metrics are differentiated.
398 With `diff_gauge` on, GAUGE metrics are differentiated as well,
399 which may lead to nonsensical results for those.
400
401 @param path Optional full metric path
402 @param prefix Optional prefix on which the metric paths should start
403 @param labels Optional dictionary of labels that must be in the metric
404 @param diff_gauge Whether to differentiate GAUGE metrics
405
406 @code
407 async with monitor_client.metrics_diff(prefix='foo') as differ:
408 # Do something that makes the service update its metrics
409 assert differ.value_at('path-suffix', {'label'}) == 42
410 @endcode
411 """
412 return MetricsDiffer(
413 _client=self,
414 _path=path,
415 _prefix=prefix,
416 _labels=labels,
417 _diff_gauge=diff_gauge,
418 )
419
420 @_wrap_client_error
421 async def metrics(
422 self,
423 *,
424 path: typing.Optional[str] = None,
425 prefix: typing.Optional[str] = None,
426 labels: typing.Optional[typing.Dict[str, str]] = None,
427 ) -> metric_module.MetricsSnapshot:
428 """
429 Returns a dict of metric names to Metric.
430
431 @param path Optional full metric path
432 @param prefix Optional prefix on which the metric paths should start
433 @param labels Optional dictionary of labels that must be in the metric
434 """
435 return await self._client.metrics(
436 path=path, prefix=prefix, labels=labels,
437 )
438
439 @_wrap_client_error
441 self,
442 path: str,
443 *,
444 labels: typing.Optional[typing.Dict[str, str]] = None,
445 ) -> typing.Optional[Metric]:
446 """
447 Either return a Metric or None if there's no such metric.
448
449 @param path Full metric path
450 @param labels Optional dictionary of labels that must be in the metric
451
452 @throws AssertionError if more than one metric returned
453 """
454 return await self._client.single_metric_optional(path, labels=labels)
455
456 @_wrap_client_error
457 async def single_metric(
458 self,
459 path: str,
460 *,
461 labels: typing.Optional[typing.Dict[str, str]] = None,
462 ) -> typing.Optional[Metric]:
463 """
464 Returns the Metric.
465
466 @param path Full metric path
467 @param labels Optional dictionary of labels that must be in the metric
468
469 @throws AssertionError if more than one metric or no metric found
470 """
471 return await self._client.single_metric(path, labels=labels)
472
473 @_wrap_client_error
474 async def metrics_raw(
475 self,
476 output_format: str,
477 *,
478 path: typing.Optional[str] = None,
479 prefix: typing.Optional[str] = None,
480 labels: typing.Optional[typing.Dict[str, str]] = None,
481 ) -> typing.Dict[str, Metric]:
482 """
483 Low level function that returns metrics in a specific format.
484 Use `metrics` and `single_metric` instead if possible.
485
486 @param output_format Metric output format. See
487 server::handlers::ServerMonitor for a list of supported formats.
488 @param path Optional full metric path
489 @param prefix Optional prefix on which the metric paths should start
490 @param labels Optional dictionary of labels that must be in the metric
491 """
492 return await self._client.metrics_raw(
493 output_format=output_format,
494 path=path,
495 prefix=prefix,
496 labels=labels,
497 )
498
499 @_wrap_client_error
500 async def get_metrics(self, prefix=None):
501 """
502 @deprecated Use metrics() or single_metric() instead
503 """
504 return await self._client.get_metrics(prefix=prefix)
505
506 @_wrap_client_error
507 async def get_metric(self, metric_name):
508 """
509 @deprecated Use metrics() or single_metric() instead
510 """
511 return await self._client.get_metric(metric_name)
512
513
515 """
516 A helper class for computing metric differences.
517
518 @see ClientMonitor.metrics_diff
519 @ingroup userver_testsuite
520 """
521
522 # @cond
523 def __init__(
524 self,
525 _client: ClientMonitor,
526 _path: typing.Optional[str],
527 _prefix: typing.Optional[str],
528 _labels: typing.Optional[typing.Dict[str, str]],
529 _diff_gauge: bool,
530 ):
531 self._client = _client
532 self._path = _path
533 self._prefix = _prefix
534 self._labels = _labels
535 self._diff_gauge = _diff_gauge
536 self._baseline: typing.Optional[metric_module.MetricsSnapshot] = None
537 self._current: typing.Optional[metric_module.MetricsSnapshot] = None
538 self._diff: typing.Optional[metric_module.MetricsSnapshot] = None
539
540 # @endcond
541
542 @property
543 def baseline(self) -> metric_module.MetricsSnapshot:
544 assert self._baseline is not None
545 return self._baseline
546
547 @baseline.setter
548 def baseline(self, value: metric_module.MetricsSnapshot) -> None:
549 self._baseline = value
550 if self._current is not None:
551 self._diff = _subtract_metrics_snapshots(
552 self._current, self._baseline, self._diff_gauge,
553 )
554
555 @property
556 def current(self) -> metric_module.MetricsSnapshot:
557 assert self._current is not None, 'Set self.current first'
558 return self._current
559
560 @current.setter
561 def current(self, value: metric_module.MetricsSnapshot) -> None:
562 self._current = value
563 assert self._baseline is not None, 'Set self.baseline first'
564 self._diff = _subtract_metrics_snapshots(
565 self._current, self._baseline, self._diff_gauge,
566 )
567
568 @property
569 def diff(self) -> metric_module.MetricsSnapshot:
570 assert self._diff is not None, 'Set self.current first'
571 return self._diff
572
574 self,
575 subpath: typing.Optional[str] = None,
576 add_labels: typing.Optional[typing.Dict] = None,
577 *,
578 default: typing.Optional[float] = None,
579 ) -> float:
580 """
581 Returns a single metric value at the specified path, prepending
582 the path provided at construction. If a dict of labels is provided,
583 does en exact match of labels, prepending the labels provided
584 at construction.
585
586 @param subpath Suffix of the metric path; the path provided
587 at construction is prepended
588 @param add_labels Labels that the metric must have in addition
589 to the labels provided at construction
590 @param default An optional default value in case the metric is missing
591 @throws AssertionError if not one metric by path
592 """
593 base_path = self._path or self._prefix
594 if base_path and subpath:
595 path = f'{base_path}.{subpath}'
596 else:
597 assert base_path or subpath, 'No path provided'
598 path = base_path or subpath or ''
599 labels: typing.Optional[dict] = None
600 if self._labels is not None or add_labels is not None:
601 labels = {**(self._labels or {}), **(add_labels or {})}
602 return self.diff.value_at(path, labels, default=default)
603
604 async def fetch(self) -> metric_module.MetricsSnapshot:
605 """
606 Fetches metric values from the service.
607 """
608 return await self._client.metrics(
609 path=self._path, prefix=self._prefix, labels=self._labels,
610 )
611
612 async def __aenter__(self) -> 'MetricsDiffer':
613 self._baseline = await self.fetch()
614 self._current = None
615 return self
616
617 async def __aexit__(self, exc_type, exc, tb) -> None:
618 self.currentcurrentcurrent = await self.fetch()
619
620
621# @cond
622
623
624def _subtract_metrics_snapshots(
625 current: metric_module.MetricsSnapshot,
626 initial: metric_module.MetricsSnapshot,
627 diff_gauge: bool,
628) -> metric_module.MetricsSnapshot:
629 return metric_module.MetricsSnapshot(
630 {
631 path: {
632 _subtract_metrics(path, current_metric, initial, diff_gauge)
633 for current_metric in current_group
634 }
635 for path, current_group in current.items()
636 },
637 )
638
639
640def _subtract_metrics(
641 path: str,
642 current_metric: metric_module.Metric,
643 initial: metric_module.MetricsSnapshot,
644 diff_gauge: bool,
645) -> metric_module.Metric:
646 assert diff_gauge, 'diff_gauge=False is unimplemented'
647
648 initial_group = initial.get(path, None)
649 if initial_group is None:
650 return current_metric
651 initial_metric = next(
652 (x for x in initial_group if x.labels == current_metric.labels), None,
653 )
654 if initial_metric is None:
655 return current_metric
656
657 return metric_module.Metric(
658 labels=current_metric.labels,
659 value=current_metric.value - initial_metric.value,
660 )
661
662
663class AiohttpClient(service_client.AiohttpClient):
664 PeriodicTaskFailed = PeriodicTaskFailed
665 TestsuiteActionFailed = TestsuiteActionFailed
666 TestsuiteTaskNotFound = TestsuiteTaskNotFound
667 TestsuiteTaskConflict = TestsuiteTaskConflict
668 TestsuiteTaskFailed = TestsuiteTaskFailed
669
670 def __init__(
671 self,
672 base_url: str,
673 *,
674 config: TestsuiteClientConfig,
675 mocked_time,
676 log_capture_fixture,
677 testpoint,
678 testpoint_control,
679 cache_invalidation_state,
680 span_id_header=None,
681 api_coverage_report=None,
682 periodic_tasks_state: typing.Optional[PeriodicTasksState] = None,
683 **kwargs,
684 ):
685 super().__init__(base_url, span_id_header=span_id_header, **kwargs)
686 self._config = config
687 self._periodic_tasks = periodic_tasks_state
688 self._testpoint = testpoint
689 self._log_capture_fixture = log_capture_fixture
690 self._state_manager = _StateManager(
691 mocked_time=mocked_time,
692 testpoint=self._testpoint,
693 testpoint_control=testpoint_control,
694 invalidation_state=cache_invalidation_state,
695 )
696 self._api_coverage_report = api_coverage_report
697
698 async def run_periodic_task(self, name):
699 response = await self._testsuite_action('run_periodic_task', name=name)
700 if not response['status']:
701 raise self.PeriodicTaskFailed(f'Periodic task {name} failed')
702
703 async def suspend_periodic_tasks(self, names: typing.List[str]) -> None:
704 if not self._periodic_tasks:
705 raise ConfigurationError('No periodic_tasks_state given')
706 self._periodic_tasks.tasks_to_suspend.update(names)
707 await self._suspend_periodic_tasks()
708
709 async def resume_periodic_tasks(self, names: typing.List[str]) -> None:
710 if not self._periodic_tasks:
711 raise ConfigurationError('No periodic_tasks_state given')
712 self._periodic_tasks.tasks_to_suspend.difference_update(names)
713 await self._suspend_periodic_tasks()
714
715 async def resume_all_periodic_tasks(self) -> None:
716 if not self._periodic_tasks:
717 raise ConfigurationError('No periodic_tasks_state given')
718 self._periodic_tasks.tasks_to_suspend.clear()
719 await self._suspend_periodic_tasks()
720
721 async def write_cache_dumps(
722 self, names: typing.List[str], *, testsuite_skip_prepare=False,
723 ) -> None:
724 await self._testsuite_action(
725 'write_cache_dumps',
726 names=names,
727 testsuite_skip_prepare=testsuite_skip_prepare,
728 )
729
730 async def read_cache_dumps(
731 self, names: typing.List[str], *, testsuite_skip_prepare=False,
732 ) -> None:
733 await self._testsuite_action(
734 'read_cache_dumps',
735 names=names,
736 testsuite_skip_prepare=testsuite_skip_prepare,
737 )
738
739 async def run_distlock_task(self, name: str) -> None:
740 await self.run_task(f'distlock/{name}')
741
742 async def reset_metrics(self) -> None:
743 await self._testsuite_action('reset_metrics')
744
745 async def metrics_portability(
746 self, *, prefix: typing.Optional[str] = None,
747 ) -> typing.Dict[str, typing.List[typing.Dict[str, str]]]:
748 return await self._testsuite_action(
749 'metrics_portability', prefix=prefix,
750 )
751
752 async def list_tasks(self) -> typing.List[str]:
753 response = await self._do_testsuite_action('tasks_list')
754 async with response:
755 response.raise_for_status()
756 body = await response.json(content_type=None)
757 return body['tasks']
758
759 async def run_task(self, name: str) -> None:
760 response = await self._do_testsuite_action(
761 'task_run', json={'name': name},
762 )
763 await _task_check_response(name, response)
764
765 @contextlib.asynccontextmanager
766 async def spawn_task(self, name: str):
767 task_id = await self._task_spawn(name)
768 try:
769 yield
770 finally:
771 await self._task_stop_spawned(task_id)
772
773 async def _task_spawn(self, name: str) -> str:
774 response = await self._do_testsuite_action(
775 'task_spawn', json={'name': name},
776 )
777 data = await _task_check_response(name, response)
778 return data['task_id']
779
780 async def _task_stop_spawned(self, task_id: str) -> None:
781 response = await self._do_testsuite_action(
782 'task_stop', json={'task_id': task_id},
783 )
784 await _task_check_response(task_id, response)
785
786 async def http_allowed_urls_extra(
787 self, http_allowed_urls_extra: typing.List[str],
788 ) -> None:
789 await self._do_testsuite_action(
790 'http_allowed_urls_extra',
791 json={'allowed_urls_extra': http_allowed_urls_extra},
792 testsuite_skip_prepare=True,
793 )
794
795 @contextlib.asynccontextmanager
796 async def capture_logs(self):
797 async with self._log_capture_fixture.start_capture() as capture:
798 await self._testsuite_action(
799 'log_capture', socket_logging_duplication=True,
800 )
801 try:
802 yield capture
803 finally:
804 await self._testsuite_action(
805 'log_capture', socket_logging_duplication=False,
806 )
807
808 async def invalidate_caches(
809 self,
810 *,
811 clean_update: bool = True,
812 cache_names: typing.Optional[typing.List[str]] = None,
813 ) -> None:
814 await self.tests_control(
815 invalidate_caches=True,
816 clean_update=clean_update,
817 cache_names=cache_names,
818 )
819
820 async def tests_control(
821 self,
822 *,
823 invalidate_caches: bool = True,
824 clean_update: bool = True,
825 cache_names: typing.Optional[typing.List[str]] = None,
826 http_allowed_urls_extra=None,
827 ) -> typing.Dict[str, typing.Any]:
828 body: typing.Dict[
829 str, typing.Any,
830 ] = self._state_manager.get_pending_update()
831
832 if 'invalidate_caches' in body and invalidate_caches:
833 if not clean_update or cache_names:
834 logger.warning(
835 'Manual cache invalidation leads to indirect initial '
836 'full cache invalidation',
837 )
838 await self._prepare()
839 body = {}
840
841 if invalidate_caches:
842 body['invalidate_caches'] = {
843 'update_type': ('full' if clean_update else 'incremental'),
844 }
845 if cache_names:
846 body['invalidate_caches']['names'] = cache_names
847
848 if http_allowed_urls_extra is not None:
849 await self.http_allowed_urls_extra(http_allowed_urls_extra)
850
851 return await self._tests_control(body)
852
853 async def update_server_state(self) -> None:
854 await self._prepare()
855
856 async def enable_testpoints(self, *, no_auto_cache_cleanup=False) -> None:
857 if not self._testpoint:
858 return
859 if no_auto_cache_cleanup:
860 await self._tests_control(
861 {'testpoints': sorted(self._testpoint.keys())},
862 )
863 else:
864 await self.update_server_state()
865
866 async def _tests_control(self, body: dict) -> typing.Dict[str, typing.Any]:
867 with self._state_manager.updating_state(body):
868 async with await self._do_testsuite_action(
869 'control', json=body, testsuite_skip_prepare=True,
870 ) as response:
871 if response.status == 404:
872 raise ConfigurationError(
873 'It seems that testsuite support is not enabled '
874 'for your service',
875 )
876 response.raise_for_status()
877 return await response.json(content_type=None)
878
879 async def _suspend_periodic_tasks(self):
880 if (
881 self._periodic_tasks.tasks_to_suspend
882 != self._periodic_tasks.suspended_tasks
883 ):
884 await self._testsuite_action(
885 'suspend_periodic_tasks',
886 names=sorted(self._periodic_tasks.tasks_to_suspend),
887 )
888 self._periodic_tasks.suspended_tasks = set(
889 self._periodic_tasks.tasks_to_suspend,
890 )
891
892 def _do_testsuite_action(self, action, **kwargs):
893 if not self._config.testsuite_action_path:
894 raise ConfigurationError(
895 'tests-control component is not properly configured',
896 )
897 path = self._config.testsuite_action_path.format(action=action)
898 return self.post(path, **kwargs)
899
900 async def _testsuite_action(
901 self, action, *, testsuite_skip_prepare=False, **kwargs,
902 ):
903 async with await self._do_testsuite_action(
904 action,
905 json=kwargs,
906 testsuite_skip_prepare=testsuite_skip_prepare,
907 ) as response:
908 if response.status == 500:
909 raise TestsuiteActionFailed
910 response.raise_for_status()
911 return await response.json(content_type=None)
912
913 async def _prepare(self) -> None:
914 pending_update = self._state_manager.get_pending_update()
915 if pending_update:
916 await self._tests_control(pending_update)
917
918 async def _request( # pylint: disable=arguments-differ
919 self,
920 http_method: str,
921 path: str,
922 headers: typing.Optional[typing.Dict[str, str]] = None,
923 bearer: typing.Optional[str] = None,
924 x_real_ip: typing.Optional[str] = None,
925 *,
926 testsuite_skip_prepare: bool = False,
927 **kwargs,
928 ) -> aiohttp.ClientResponse:
929 if not testsuite_skip_prepare:
930 await self._prepare()
931
932 response = await super()._request(
933 http_method, path, headers, bearer, x_real_ip, **kwargs,
934 )
935 if self._api_coverage_report:
936 self._api_coverage_report.update_usage_stat(
937 path, http_method, response.status, response.content_type,
938 )
939
940 return response
941
942
943# @endcond
944
945
947 """
948 Asyncio userver client, typically retrieved from
949 @ref service_client "plugins.service_client.service_client"
950 fixture.
951
952 Compatible with werkzeug interface.
953
954 @ingroup userver_testsuite
955 """
956
957 PeriodicTaskFailed = PeriodicTaskFailed
958 TestsuiteActionFailed = TestsuiteActionFailed
959 TestsuiteTaskNotFound = TestsuiteTaskNotFound
960 TestsuiteTaskConflict = TestsuiteTaskConflict
961 TestsuiteTaskFailed = TestsuiteTaskFailed
962
963 def _wrap_client_response(
964 self, response: aiohttp.ClientResponse,
965 ) -> typing.Awaitable[http.ClientResponse]:
966 return http.wrap_client_response(
967 response, json_loads=approx.json_loads,
968 )
969
970 @_wrap_client_error
971 async def run_periodic_task(self, name):
972 await self._client.run_periodic_task(name)
973
974 @_wrap_client_error
975 async def suspend_periodic_tasks(self, names: typing.List[str]) -> None:
976 await self._client.suspend_periodic_tasks(names)
977
978 @_wrap_client_error
979 async def resume_periodic_tasks(self, names: typing.List[str]) -> None:
980 await self._client.resume_periodic_tasks(names)
981
982 @_wrap_client_error
983 async def resume_all_periodic_tasks(self) -> None:
984 await self._client.resume_all_periodic_tasks()
985
986 @_wrap_client_error
987 async def write_cache_dumps(
988 self, names: typing.List[str], *, testsuite_skip_prepare=False,
989 ) -> None:
990 await self._client.write_cache_dumps(
991 names=names, testsuite_skip_prepare=testsuite_skip_prepare,
992 )
993
994 @_wrap_client_error
995 async def read_cache_dumps(
996 self, names: typing.List[str], *, testsuite_skip_prepare=False,
997 ) -> None:
998 await self._client.read_cache_dumps(
999 names=names, testsuite_skip_prepare=testsuite_skip_prepare,
1000 )
1001
1002 async def run_task(self, name: str) -> None:
1003 await self._client.run_task(name)
1004
1005 async def run_distlock_task(self, name: str) -> None:
1006 await self._client.run_distlock_task(name)
1007
1008 async def reset_metrics(self) -> None:
1009 """
1010 Calls `ResetMetric(metric);` for each metric that has such C++ function
1011 """
1012 await self._client.reset_metrics()
1013
1015 self, *, prefix: typing.Optional[str] = None,
1016 ) -> typing.Dict[str, typing.List[typing.Dict[str, str]]]:
1017 """
1018 Reports metrics related issues that could be encountered on
1019 different monitoring systems.
1020
1021 @sa @ref utils::statistics::GetPortabilityInfo
1022 """
1023 return await self._client.metrics_portability(prefix=prefix)
1024
1025 def list_tasks(self) -> typing.List[str]:
1026 return self._client.list_tasks()
1027
1028 def spawn_task(self, name: str):
1029 return self._client.spawn_task(name)
1030
1031 def capture_logs(self):
1032 return self._client.capture_logs()
1033
1034 @_wrap_client_error
1036 self,
1037 *,
1038 clean_update: bool = True,
1039 cache_names: typing.Optional[typing.List[str]] = None,
1040 ) -> None:
1041 """
1042 Send request to service to update caches.
1043
1044 @param clean_update if False, service will do a faster incremental
1045 update of caches whenever possible.
1046 @param cache_names which caches specifically should be updated;
1047 update all if None.
1048 """
1049 await self._client.invalidate_caches(
1050 clean_update=clean_update, cache_names=cache_names,
1051 )
1052
1053 @_wrap_client_error
1054 async def tests_control(
1055 self, *args, **kwargs,
1056 ) -> typing.Dict[str, typing.Any]:
1057 return await self._client.tests_control(*args, **kwargs)
1058
1059 @_wrap_client_error
1060 async def update_server_state(self) -> None:
1061 """
1062 Update service-side state through http call to 'tests/control':
1063 - clear dirty (from other tests) caches
1064 - set service-side mocked time,
1065 - resume / suspend periodic tasks
1066 - enable testpoints
1067 If service is up-to-date, does nothing.
1068 """
1069 await self._client.update_server_state()
1070
1071 @_wrap_client_error
1072 async def enable_testpoints(self, *args, **kwargs) -> None:
1073 """
1074 Send list of handled testpoint pats to service. For these paths service
1075 will no more skip http calls from TESTPOINT(...) macro.
1076
1077 @param no_auto_cache_cleanup prevent automatic cache cleanup.
1078 When calling service client first time in scope of current test, client
1079 makes additional http call to `tests/control` to update caches, to get
1080 rid of data from previous test.
1081 """
1082 await self._client.enable_testpoints(*args, **kwargs)
1083
1084
1085@dataclasses.dataclass
1087 """Reflects the (supposed) current service state."""
1088
1089 invalidation_state: caches.InvalidationState
1090 now: typing.Optional[str] = _UNKNOWN_STATE
1091 testpoints: typing.FrozenSet[str] = frozenset([_UNKNOWN_STATE])
1092
1093
1095 """
1096 Used for computing the requests that we need to automatically align
1097 the service state with the test fixtures state.
1098 """
1099
1100 def __init__(
1101 self,
1102 *,
1103 mocked_time,
1104 testpoint,
1105 testpoint_control,
1106 invalidation_state: caches.InvalidationState,
1107 ):
1108 self._state = _State(
1109 invalidation_state=copy.deepcopy(invalidation_state),
1110 )
1111 self._mocked_time = mocked_time
1112 self._testpoint = testpoint
1113 self._testpoint_control = testpoint_control
1114 self._invalidation_state = invalidation_state
1115
1116 @contextlib.contextmanager
1117 def updating_state(self, body: typing.Dict[str, typing.Any]):
1118 """
1119 Whenever `tests_control` handler is invoked
1120 (by the client itself during `prepare` or manually by the user),
1121 we need to synchronize `_state` with the (supposed) service state.
1122 The state update is decoded from the request body.
1123 """
1124 saved_state = copy.deepcopy(self._state)
1125 try:
1126 self._update_state(body)
1127 self._apply_new_state()
1128 yield
1129 except Exception: # noqa
1130 self._state = saved_state
1131 self._apply_new_state()
1132 raise
1133
1134 def get_pending_update(self) -> typing.Dict[str, typing.Any]:
1135 """
1136 Compose the body of the `tests_control` request required to completely
1137 synchronize the service state with the state of test fixtures.
1138 """
1139 body: typing.Dict[str, typing.Any] = {}
1140
1141 if self._invalidation_state.has_caches_to_update:
1142 body['invalidate_caches'] = {'update_type': 'full'}
1143 if not self._invalidation_state.should_update_all_caches:
1144 body['invalidate_caches']['names'] = list(
1145 self._invalidation_state.caches_to_update,
1146 )
1147
1148 desired_testpoints = self._testpoint.keys()
1149 if self._state.testpoints != frozenset(desired_testpoints):
1150 body['testpoints'] = sorted(desired_testpoints)
1151
1152 desired_now = self._get_desired_now()
1153 if self._state.now != desired_now:
1154 body['mock_now'] = desired_now
1155
1156 return body
1157
1158 def _update_state(self, body: typing.Dict[str, typing.Any]) -> None:
1159 body_invalidate_caches = body.get('invalidate_caches', {})
1160 update_type = body_invalidate_caches.get('update_type', 'full')
1161 body_cache_names = body_invalidate_caches.get('names', None)
1162 # An incremental update is considered insufficient to bring a cache
1163 # to a known state.
1164 if body_invalidate_caches and update_type == 'full':
1165 if body_cache_names is None:
1166 self._state.invalidation_state.on_all_caches_updated()
1167 else:
1168 self._state.invalidation_state.on_caches_updated(
1169 body_cache_names,
1170 )
1171
1172 if 'mock_now' in body:
1173 self._state.now = body['mock_now']
1174
1175 testpoints: typing.Optional[typing.List[str]] = body.get(
1176 'testpoints', None,
1177 )
1178 if testpoints is not None:
1179 self._state.testpoints = frozenset(testpoints)
1180
1182 """Apply new state to related components."""
1183 self._testpoint_control.enabled_testpoints = self._state.testpoints
1184 self._invalidation_state.assign_copy(self._state.invalidation_state)
1185
1186 def _get_desired_now(self) -> typing.Optional[str]:
1187 if self._mocked_time.is_enabled:
1188 return utils.timestring(self._mocked_time.now())
1189 return None
1190
1191
1192async def _task_check_response(name: str, response) -> dict:
1193 async with response:
1194 if response.status == 404:
1195 raise TestsuiteTaskNotFound(f'Testsuite task {name!r} not found')
1196 if response.status == 409:
1197 raise TestsuiteTaskConflict(f'Testsuite task {name!r} conflict')
1198 assert response.status == 200
1199 data = await response.json()
1200 if not data.get('status', True):
1201 raise TestsuiteTaskFailed(name, data['reason'])
1202 return data