userver: /home/antonyzhilin/arcadia/taxi/uservices/userver/testsuite/pytest_plugins/pytest_userver/plugins/caches.py Source File
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages Concepts
caches.py
1"""
2Fixtures for controlling userver caches.
3"""
4
5import copy
6import enum
7import types
8import typing
9
10import pytest
11
12
14 def __init__(self):
15 self._hooks = {}
16
17 @property
18 def userver_cache_control_hooks(self) -> typing.Dict[str, str]:
19 return self._hooks
20
21 def pytest_plugin_registered(self, plugin, manager):
22 if not isinstance(plugin, types.ModuleType):
23 return
24 uhooks = getattr(plugin, 'USERVER_CACHE_CONTROL_HOOKS', None)
25 if uhooks is None:
26 return
27 if not isinstance(uhooks, dict):
28 raise RuntimeError(
29 f'USERVER_CACHE_CONTROL_HOOKS must be dictionary: {{cache_name: fixture_name}}, got {uhooks} instead',
30 )
31 for cache_name, fixture_name in uhooks.items():
32 if cache_name in self._hooks:
33 raise RuntimeError(
34 f'USERVER_CACHE_CONTROL_HOOKS: hook already registered for cache {cache_name}',
35 )
36 self._hooks[cache_name] = fixture_name
37
38
40 def __init__(self):
41 # None means that we should update all caches.
42 # We invalidate all caches at the start of each test.
43 self._invalidated_caches: typing.Optional[typing.Set[str]] = None
44
45 def invalidate_all(self) -> None:
46 self._invalidated_caches = None
47
48 def invalidate(self, caches: typing.Iterable[str]) -> None:
49 if self._invalidated_caches is not None:
50 self._invalidated_caches.update(caches)
51
52 @property
53 def should_update_all_caches(self) -> bool:
54 return self._invalidated_caches is None
55
56 @property
57 def caches_to_update(self) -> typing.FrozenSet[str]:
58 assert self._invalidated_caches is not None
59 return frozenset(self._invalidated_caches)
60
61 @property
62 def has_caches_to_update(self) -> bool:
63 caches = self._invalidated_caches
64 return caches is None or bool(caches)
65
66 def on_caches_updated(self, caches: typing.Iterable[str]) -> None:
67 if self._invalidated_caches is not None:
68 self._invalidated_caches.difference_update(caches)
69
70 def on_all_caches_updated(self) -> None:
71 self._invalidated_caches = set()
72
73 def assign_copy(self, other: 'InvalidationState') -> None:
74 # pylint: disable=protected-access
75 self._invalidated_caches = copy.deepcopy(other._invalidated_caches)
76
77
78class CacheControlAction(enum.Enum):
79 FULL = 0
80 INCREMENTAL = 1
81 EXCLUDE = 2
82
83
85 action = CacheControlAction.FULL
86
87 def exclude(self) -> None:
88 """Exclude cache from update."""
89 self.action = CacheControlAction.EXCLUDE
90
91 def incremental(self) -> None:
92 """Request incremental update instead of full."""
93 self.action = CacheControlAction.INCREMENTAL
94
95
97 def __init__(
98 self,
99 *,
100 enabled: bool,
101 context: typing.Dict,
102 fixtures: typing.Dict[str, typing.Callable],
103 caches_disabled: typing.Set[str],
104 ):
105 self._enabled = enabled
106 self._context = context
107 self._fixtures = fixtures
108 self._caches_disabled = caches_disabled
109
111 self,
112 cache_names: typing.Optional[typing.List[str]],
113 ) -> typing.Tuple[
114 typing.Dict,
115 typing.List[typing.Tuple[str, CacheControlAction]],
116 ]:
117 """Query cache control handlers.
118
119 Returns pair (staged, [(cache_name, action), ...])
120 """
121 if not self._enabled:
122 if cache_names is None:
123 cache_names = self._fixtures.keys()
124 return {cache_name: None for cache_name in cache_names}, []
125 staged = {}
126 actions = []
127 for cache_name, fixture in self._fixtures.items():
128 if cache_names and cache_name not in cache_names:
129 continue
130 if cache_name in self._caches_disabled:
131 staged[cache_name] = None
132 continue
133 context = self._context.get(cache_name)
134 request = CacheControlRequest()
135 staged[cache_name] = fixture(request, context)
136 actions.append((cache_name, request.action))
137 return staged, actions
138
139 def commit_staged(self, staged: typing.Dict[str, typing.Any]) -> None:
140 """Apply recently committed state."""
141 self._context.update(staged)
142
143
144def pytest_configure(config):
145 config.pluginmanager.register(UserverCachePlugin(), 'userver_cache')
146 config.addinivalue_line(
147 'markers',
148 'userver_cache_control_disabled: disable cache control',
149 )
150
151
152@pytest.fixture
153def cache_invalidation_state() -> InvalidationState:
154 """
155 A fixture for notifying the service of changes in cache data sources.
156
157 Intended to be used by other fixtures that represent those data sources,
158 not by tests directly.
159
160 @ingroup userver_testsuite_fixtures
161 """
162 return InvalidationState()
163
164
165@pytest.fixture(scope='session')
166def _userver_cache_control_context(daemon_scoped_mark) -> typing.Dict:
167 return {}
168
169
170@pytest.fixture
171def _userver_cache_fixtures(
172 pytestconfig,
173 request,
174) -> typing.Dict[str, typing.Callable]:
175 plugin: UserverCachePlugin = pytestconfig.pluginmanager.get_plugin(
176 'userver_cache',
177 )
178 result = {}
179 for cache_name, fixture_name in plugin.userver_cache_control_hooks.items():
180 result[cache_name] = request.getfixturevalue(fixture_name)
181 return result
182
183
184@pytest.fixture
186 _userver_cache_control_context,
187 _userver_cache_fixtures,
188 request,
189) -> CacheControl:
190 """Userver cache control handler.
191
192 To install per cache handler use USERVER_CACHE_CONTROL_HOOKS variable
193 in your pytest plugin:
194
195 @code
196 USERVER_CACHE_CONTROL_HOOKS = {
197 'my-cache-name': 'my_cache_cc',
198 }
199
200 @pytest.fixture
201 def my_cache_cc(my_cache_context):
202 def cache_control(request, state):
203 new_state = my_cache_context.get_state()
204 if state == new_state:
205 # Cache is already up to date, no need to update
206 request.exclude()
207 else:
208 # Request incremental update, if you cache supports it
209 request.incremental()
210 return new_state
211 return cache_control
212 @endcode
213
214 @ingroup userver_testsuite_fixtures
215 """
216 enabled = True
217 caches_disabled = set()
218
219 def userver_cache_control_disabled(
220 caches: typing.Sequence[str] = None,
221 *,
222 reason: str,
223 ):
224 if caches is not None:
225 caches_disabled.update(caches)
226 return enabled
227 return False
228
229 for mark in request.node.iter_markers('userver_cache_control_disabled'):
230 enabled = userver_cache_control_disabled(*mark.args, **mark.kwargs)
231
232 return CacheControl(
233 context=_userver_cache_control_context,
234 fixtures=_userver_cache_fixtures,
235 enabled=enabled,
236 caches_disabled=caches_disabled,
237 )
238
239
240@pytest.fixture(scope='session')
242 """
243 Allows use of invalidate_caches() without cache_names.
244 """
245 return True