userver: /home/antonyzhilin/arcadia/taxi/uservices/userver/testsuite/pytest_plugins/pytest_userver/plugins/dumps.py Source File
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages Concepts
dumps.py
1# pylint: disable=redefined-outer-name
2import pathlib
3import shutil
4import typing
5
6import pytest
7
8USERVER_CONFIG_HOOKS = ['_userver_config_dumps']
9
10
11@pytest.fixture(scope='session')
12def userver_dumps_root(tmp_path_factory) -> pathlib.Path:
13 """
14 The directory which the service will use for cache dumps.
15 Dumps of individual components will be stored in
16 `{userver_dumps_root}/{component-name}/{datetime}-v{format-version}`
17
18 @see userver::dump::Dumper
19 @ingroup userver_testsuite_fixtures
20 """
21 path = tmp_path_factory.mktemp('userver-cache-dumps', numbered=False)
22 return path.resolve()
23
24
25@pytest.fixture
26def read_latest_dump(userver_dumps_root):
27 """
28 Read the latest dump produced by a specified dumper.
29
30 @see userver::dump::Dumper
31 @ingroup userver_testsuite_fixtures
32 """
33
34 def read(dumper_name: str) -> typing.Optional[bytes]:
35 specific_dir = userver_dumps_root.joinpath(dumper_name)
36 if not specific_dir.is_dir():
37 return None
38 latest_dump_filename = max(
39 (f for f in specific_dir.iterdir() if specific_dir.joinpath(f).is_file()),
40 default=None,
41 )
42 if not latest_dump_filename:
43 return None
44 with open(specific_dir.joinpath(latest_dump_filename), 'rb') as dump:
45 return dump.read()
46
47 return read
48
49
50@pytest.fixture
51def cleanup_userver_dumps(userver_dumps_root, request):
52 """
53 To avoid leaking dumps between tests, cache_dump_dir must be cleaned after
54 each test. To observe the dumps, add a final `time.sleep(1000000)` to your
55 test locally. The returned function may also be used to clean dumps
56 manually as appropriate.
57
58 @see userver::dump::Dumper
59 @ingroup userver_testsuite_fixtures
60 """
61
62 def cleanup() -> None:
63 shutil.rmtree(userver_dumps_root, ignore_errors=True)
64 userver_dumps_root.mkdir()
65
66 request.addfinalizer(cleanup)
67 return cleanup
68
69
70@pytest.fixture(scope='session')
71def _userver_config_dumps(pytestconfig, userver_dumps_root):
72 def patch_config(_config, config_vars) -> None:
73 config_vars['userver-dumps-root'] = str(userver_dumps_root)
74 if not pytestconfig.option.service_runner_mode:
75 config_vars['userver-dumps-periodic'] = False
76
77 return patch_config