userver: /data/code/userver/testsuite/pytest_plugins/pytest_userver/plugins/caches.py Source File
Loading...
Searching...
No Matches
caches.py
1"""
2Fixtures for controlling userver caches.
3"""
4
5from collections.abc import Callable
6from collections.abc import Iterable
7from collections.abc import Sequence
8from collections.abc import Set
9import copy
10import enum
11import types
12from typing import Any
13
14import pytest
15
16
18 def __init__(self):
19 self._hooks = {}
20
21 @property
22 def userver_cache_control_hooks(self) -> dict[str, str]:
23 return self._hooks
24
25 def pytest_plugin_registered(self, plugin, manager):
26 if not isinstance(plugin, types.ModuleType):
27 return
28 uhooks = getattr(plugin, 'USERVER_CACHE_CONTROL_HOOKS', None)
29 if uhooks is None:
30 return
31 if not isinstance(uhooks, dict):
32 raise RuntimeError(
33 f'USERVER_CACHE_CONTROL_HOOKS must be dictionary: {{cache_name: fixture_name}}, got {uhooks} instead',
34 )
35 for cache_name, fixture_name in uhooks.items():
36 if cache_name in self._hooks:
37 raise RuntimeError(
38 f'USERVER_CACHE_CONTROL_HOOKS: hook already registered for cache {cache_name}',
39 )
40 self._hooks[cache_name] = fixture_name
41
42
44 def __init__(self) -> None:
45 # None means that we should update all caches.
46 # We invalidate all caches at the start of each test.
47 self._invalidated_caches: set[str] | None = None
48
49 def invalidate_all(self) -> None:
50 self._invalidated_caches = None
51
52 def invalidate(self, caches: Iterable[str]) -> None:
53 if self._invalidated_caches is not None:
54 self._invalidated_caches.update(caches)
55
56 @property
57 def should_update_all_caches(self) -> bool:
58 return self._invalidated_caches is None
59
60 @property
61 def caches_to_update(self) -> frozenset[str]:
62 assert self._invalidated_caches is not None
63 return frozenset(self._invalidated_caches)
64
65 @property
66 def has_caches_to_update(self) -> bool:
67 caches = self._invalidated_caches
68 return caches is None or bool(caches)
69
70 def on_caches_updated(self, caches: Iterable[str]) -> None:
71 if self._invalidated_caches is not None:
72 self._invalidated_caches.difference_update(caches)
73
74 def on_all_caches_updated(self) -> None:
75 self._invalidated_caches = set()
76
77 def assign_copy(self, other: 'InvalidationState') -> None:
78 # pylint: disable=protected-access
79 self._invalidated_caches = copy.deepcopy(other._invalidated_caches)
80
81
82class CacheControlAction(enum.Enum):
83 FULL = 0
84 INCREMENTAL = 1
85 EXCLUDE = 2
86
87
89 action = CacheControlAction.FULL
90
91 def exclude(self) -> None:
92 """Exclude cache from update."""
93 self.action = CacheControlAction.EXCLUDE
94
95 def incremental(self) -> None:
96 """Request incremental update instead of full."""
97 self.action = CacheControlAction.INCREMENTAL
98
99
101 def __init__(
102 self,
103 *,
104 enabled: bool,
105 context: dict[Any, Any],
106 fixtures: dict[str, Callable[[...], Any]],
107 caches_disabled: Set[str],
108 ):
109 self._enabled = enabled
110 self._context = context
111 self._fixtures = fixtures
112 self._caches_disabled = caches_disabled
113
115 self,
116 cache_names: list[str] | None,
117 ) -> tuple[dict[Any, Any], list[tuple[str, CacheControlAction]]]:
118 """Query cache control handlers.
119
120 Returns pair (staged, [(cache_name, action), ...])
121 """
122 if not self._enabled:
123 if cache_names is None:
124 cache_names = self._fixtures.keys()
125 return {cache_name: None for cache_name in cache_names}, []
126 staged = {}
127 actions = []
128 for cache_name, fixture in self._fixtures.items():
129 if cache_names and cache_name not in cache_names:
130 continue
131 if cache_name in self._caches_disabled:
132 staged[cache_name] = None
133 continue
134 context = self._context.get(cache_name)
135 request = CacheControlRequest()
136 staged[cache_name] = fixture(request, context)
137 actions.append((cache_name, request.action))
138 return staged, actions
139
140 def commit_staged(self, staged: dict[str, Any]) -> None:
141 """Apply recently committed state."""
142 self._context.update(staged)
143
144
145def pytest_configure(config):
146 config.pluginmanager.register(UserverCachePlugin(), 'userver_cache')
147 config.addinivalue_line(
148 'markers',
149 'userver_cache_control_disabled: disable cache control',
150 )
151
152
153@pytest.fixture
154def cache_invalidation_state() -> InvalidationState:
155 """
156 A fixture for notifying the service of changes in cache data sources.
157
158 Intended to be used by other fixtures that represent those data sources,
159 not by tests directly.
160
161 @ingroup userver_testsuite_fixtures
162 """
163 return InvalidationState()
164
165
166@pytest.fixture(scope='session')
167def _userver_cache_control_context(daemon_scoped_mark) -> dict[Any, Any]:
168 return {}
169
170
171@pytest.fixture
172def _userver_cache_fixtures(
173 pytestconfig,
174 request,
175) -> dict[str, Callable[[...], Any]]:
176 plugin: UserverCachePlugin = pytestconfig.pluginmanager.get_plugin(
177 'userver_cache',
178 )
179 result = {}
180 for cache_name, fixture_name in plugin.userver_cache_control_hooks.items():
181 result[cache_name] = request.getfixturevalue(fixture_name)
182 return result
183
184
185@pytest.fixture
187 _userver_cache_control_context,
188 _userver_cache_fixtures,
189 request,
190) -> CacheControl:
191 """Userver cache control handler.
192
193 To install per cache handler use USERVER_CACHE_CONTROL_HOOKS variable
194 in your pytest plugin:
195
196 @code
197 USERVER_CACHE_CONTROL_HOOKS = {
198 'my-cache-name': 'my_cache_cc',
199 }
200
201 @pytest.fixture
202 def my_cache_cc(my_cache_context):
203 def cache_control(request, state):
204 new_state = my_cache_context.get_state()
205 if state == new_state:
206 # Cache is already up to date, no need to update
207 request.exclude()
208 else:
209 # Request incremental update, if you cache supports it
210 request.incremental()
211 return new_state
212 return cache_control
213 @endcode
214
215 @ingroup userver_testsuite_fixtures
216 """
217 enabled = True
218 caches_disabled = set()
219
220 def userver_cache_control_disabled(caches: Sequence[str] = None, *, reason: str):
221 if caches is not None:
222 caches_disabled.update(caches)
223 return enabled
224 return False
225
226 for mark in request.node.iter_markers('userver_cache_control_disabled'):
227 enabled = userver_cache_control_disabled(*mark.args, **mark.kwargs)
228
229 return CacheControl(
230 context=_userver_cache_control_context,
231 fixtures=_userver_cache_fixtures,
232 enabled=enabled,
233 caches_disabled=caches_disabled,
234 )
235
236
237@pytest.fixture(scope='session')
239 """
240 Allows use of invalidate_caches() without cache_names.
241 """
242 return True