userver: /data/code/userver/testsuite/pytest_plugins/pytest_userver/dynconf.py Source File
Loading...
Searching...
No Matches
dynconf.py
1"""
2Python module that provides classes and constants
3that may be useful when working with the
4@ref dynamic_config_testsuite "dynamic config in testsuite".
5
6@ingroup userver_testsuite
7"""
8
9from __future__ import annotations
10
11from collections.abc import Iterable
12from collections.abc import Iterator
13from collections.abc import Set
14import contextlib
15import copy
16import dataclasses
17import datetime
18from typing import Any
19from typing import TypeAlias
20
21from pytest_userver.plugins import caches
22
23
24class BaseError(Exception):
25 """Base class for exceptions from this module"""
26
27
29 """Config parameter was not found and no default was provided"""
30
31
33 """
34 Calling `dynamic_config.get` before defaults are fetched from the service.
35 Try adding a dependency on `service_client` in your fixture.
36 """
37
38
40 """Dynamic config defaults action returned invalid response"""
41
42
44 """Invalid dynamic config name in `@pytest.mark.config`"""
45
46
47ConfigValuesDict: TypeAlias = dict[str, Any]
48
49
51 pass
52
53
54_REMOVE_KEY = _RemoveKey()
55
56
58 pass
59
60
61_MISSING = _Missing()
62
63
64@dataclasses.dataclass(frozen=True)
66 value: Any
67 static_default_preferred: bool
68
69
70_ConfigDict: TypeAlias = dict[str, _ConfigEntry | _RemoveKey]
71
72
73def _create_config_dict(values: ConfigValuesDict, kill_switches_disabled: Set[str] | None = None) -> _ConfigDict:
74 if kill_switches_disabled is None:
75 kill_switches_disabled = set()
76
77 result = {}
78 for key, value in values.items():
79 static_default_preferred = key in kill_switches_disabled
80 result[key] = _ConfigEntry(value, static_default_preferred)
81 return result
82
83
84@dataclasses.dataclass
86 timestamp: str
87 dirty_keys: set[str]
88 state: _ConfigDict
89 prev_state: _ConfigDict
90
91 @classmethod
92 def new(
93 cls,
94 *,
95 previous: _ChangelogEntry | None,
96 timestamp: str,
97 ) -> _ChangelogEntry:
98 if previous:
99 prev_state = previous.state
100 else:
101 prev_state = {}
102 return cls(
103 timestamp=timestamp,
104 dirty_keys=set(),
105 state=prev_state.copy(),
106 prev_state=prev_state,
107 )
108
109 @property
110 def has_changes(self) -> bool:
111 return bool(self.dirty_keys)
112
113 def update(self, values: _ConfigDict):
114 for key, value in values.items():
115 if value == self.prev_state.get(key, _MISSING):
116 self.dirty_keys.discard(key)
117 else:
118 self.dirty_keys.add(key)
119 self.state.update(values)
120
121
122@dataclasses.dataclass(frozen=True)
124 timestamp: str
125 values: ConfigValuesDict
126 removed: list[str]
127 kill_switches_disabled: list[str]
128
129 def is_empty(self) -> bool:
130 return not self.values and not self.removed
131
132
134 timestamp: datetime.datetime
135 committed_entries: list[_ChangelogEntry]
136 staged_entry: _ChangelogEntry
137
138 def __init__(self):
139 self.timestamp = datetime.datetime.fromtimestamp(
140 0,
141 datetime.timezone.utc,
142 )
143 self.committed_entries = []
144 self.staged_entry = _ChangelogEntry.new(
145 timestamp=self.service_timestamp(),
146 previous=None,
147 )
148
149 def service_timestamp(self) -> str:
150 return self.timestamp.strftime('%Y-%m-%dT%H:%M:%SZ')
151
152 def next_timestamp(self) -> str:
153 self.timestamp += datetime.timedelta(seconds=1)
154 return self.service_timestamp()
155
156 def commit(self) -> _ChangelogEntry:
157 """Commit staged changed if any and return last committed entry."""
158 entry = self.staged_entry
159 if entry.has_changes or not self.committed_entries:
160 self.staged_entry = _ChangelogEntry.new(
161 timestamp=self.next_timestamp(),
162 previous=entry,
163 )
164 self.committed_entries.append(entry)
165 return self.committed_entries[-1]
166
167 def get_updated_since(
168 self,
169 config_dict: _ConfigDict,
170 updated_since: str,
171 ids: list[str] | None = None,
172 ) -> _Updates:
173 entry = self.commit()
174 config_dict, removed = self._get_updated_since(config_dict, updated_since)
175 if ids:
176 config_dict = {name: config_dict[name] for name in ids if name in config_dict}
177 removed = [name for name in removed if name in ids]
178
179 values = {}
180 kill_switches_disabled = []
181 for name, config_entry in config_dict.items():
182 values[name] = config_entry.value
183 if config_entry.static_default_preferred:
184 kill_switches_disabled.append(name)
185
186 return _Updates(
187 timestamp=entry.timestamp,
188 values=values,
189 removed=removed,
190 kill_switches_disabled=kill_switches_disabled,
191 )
192
193 def _get_updated_since(
194 self,
195 config_dict: _ConfigDict,
196 updated_since: str,
197 ) -> tuple[_ConfigDict, list[str]]:
198 if not updated_since:
199 return config_dict, []
200 dirty_keys = set()
201 last_known_state = {}
202 for entry in reversed(self.committed_entries):
203 if entry.timestamp > updated_since:
204 dirty_keys.update(entry.dirty_keys)
205 else:
206 if entry.timestamp == updated_since:
207 last_known_state = entry.state
208 break
209 # We don't want to send them again
210 result = {}
211 removed = []
212 for key in dirty_keys:
213 config_entry = config_dict.get(key, _REMOVE_KEY)
214 if last_known_state.get(key, _MISSING) != config_entry:
215 if config_entry is _REMOVE_KEY:
216 removed.append(key)
217 else:
218 result[key] = config_entry
219 return result, removed
220
221 def add_entries(self, config_dict: _ConfigDict) -> None:
222 self.staged_entry.update(config_dict)
223
224 @contextlib.contextmanager
225 def rollback(self, defaults: ConfigValuesDict) -> Iterator[None]:
226 try:
227 yield
228 finally:
229 self._do_rollback(defaults)
230
231 def _do_rollback(self, defaults: ConfigValuesDict) -> None:
232 if not self.committed_entries:
233 return
234
235 maybe_dirty = set()
236 for entry in self.committed_entries:
237 maybe_dirty.update(entry.dirty_keys)
238
239 last = self.committed_entries[-1]
240 last_state = last.state
241 config_dict = _create_config_dict(defaults)
242 dirty_keys = set()
243 reverted = {}
244 for key in maybe_dirty:
245 original = config_dict.get(key, _REMOVE_KEY)
246 if last_state[key] != original:
247 dirty_keys.add(key)
248 reverted[key] = original
249
250 entry = _ChangelogEntry(
251 timestamp=last.timestamp,
252 state=last.state,
253 dirty_keys=dirty_keys,
254 prev_state={},
255 )
256 self.committed_entries = [entry]
258 timestamp=self.staged_entry.timestamp,
259 dirty_keys=dirty_keys.copy(),
260 state=reverted,
261 prev_state=entry.state,
262 )
263
264
266 """
267 @brief Simple dynamic config backend.
268
269 @see @ref pytest_userver.plugins.dynamic_config.dynamic_config "dynamic_config"
270 """
271
272 def __init__(
273 self,
274 *,
275 initial_values: ConfigValuesDict,
276 defaults: ConfigValuesDict | None,
277 config_cache_components: Iterable[str],
278 cache_invalidation_state: caches.InvalidationState,
279 changelog: _Changelog,
280 ) -> None:
281 self._values = initial_values.copy()
282 self._kill_switches_disabled = set()
283 # Defaults are only there for convenience, to allow accessing them
284 # in tests using dynamic_config.get. They are not sent to the service.
285 self._defaults = defaults
286 self._cache_invalidation_state = cache_invalidation_state
287 self._config_cache_components = config_cache_components
288 self._changelog = changelog
289
290 def set_values(self, values: ConfigValuesDict) -> None:
291 self.set_values_unsafe(copy.deepcopy(values))
292
293 def set_values_unsafe(self, values: ConfigValuesDict) -> None:
294 self._values.update(values)
295 for key in values:
296 self._kill_switches_disabled.discard(key)
297
298 config_dict = _create_config_dict(values)
299 self._changelog.add_entries(config_dict)
300 self._sync_with_service()
301
302 def set(self, **values) -> None:
303 self.set_values(values)
304
305 def switch_to_static_default(self, *keys: str) -> None:
306 for key in keys:
307 self._kill_switches_disabled.add(key)
308
309 config_dict = _create_config_dict(
310 values={key: self._values.get(key, None) for key in keys},
311 kill_switches_disabled=set(keys),
312 )
313 self._changelog.add_entries(config_dict)
314 self._sync_with_service()
315
316 def switch_to_dynamic_value(self, *keys: str) -> None:
317 for key in keys:
318 self._kill_switches_disabled.discard(key)
319
320 config_dict = _create_config_dict(values={key: self._values[key] for key in keys if key in self._values})
321 self._changelog.add_entries(config_dict)
322 self._sync_with_service()
323
324 def get_values_unsafe(self) -> ConfigValuesDict:
325 return self._values
326
327 def get_kill_switches_disabled_unsafe(self) -> Set[str]:
328 return self._kill_switches_disabled
329
330 def get(self, key: str, default: Any = None) -> Any:
331 if key in self._values and key not in self._kill_switches_disabled:
332 return copy.deepcopy(self._values[key])
333 if self._defaults is not None and key in self._defaults:
334 return copy.deepcopy(self._defaults[key])
335 if default is not None:
336 return default
337 if self._defaults is None:
339 f'Defaults for config {key!r} have not yet been fetched '
340 'from the service. Options:\n'
341 '1. add a dependency on service_client in your fixture;\n'
342 '2. pass `default` parameter to `dynamic_config.get`',
343 )
344 raise DynamicConfigNotFoundError(f'Config {key!r} is not found')
345
346 def remove_values(self, keys: Iterable[str]) -> None:
347 extra_keys = set(keys).difference(self._values.keys())
348 if extra_keys:
350 f'Attempting to remove nonexistent configs: {extra_keys}',
351 )
352 for key in keys:
353 self._values.pop(key)
354 self._kill_switches_disabled.discard(key)
355
356 self._changelog.add_entries({key: _REMOVE_KEY for key in keys})
357 self._sync_with_service()
358
359 def remove(self, key: str) -> None:
360 return self.remove_values([key])
361
362 @contextlib.contextmanager
363 def modify(self, key: str) -> Any:
364 value = self.get(key)
365 yield value
366 self.set_values({key: value})
367
368 @contextlib.contextmanager
369 def modify_many(
370 self,
371 *keys: tuple[str, ...],
372 ) -> tuple[Any, ...]:
373 values = tuple(self.get(key) for key in keys)
374 yield values
375 self.set_values(dict(zip(keys, values, strict=True)))
376
377 def _sync_with_service(self) -> None:
378 self._cache_invalidation_state.invalidate(
380 )
381
382
384 pass
385
386
387USE_STATIC_DEFAULT = UseStaticDefault()