userver: /data/code/userver/testsuite/pytest_plugins/pytest_userver/dynconf.py Source File
Loading...
Searching...
No Matches
dynconf.py
1"""
2Python module that provides classes and constants
3that may be useful when working with the
4@ref dynamic_config_testsuite "dynamic config in testsuite".
5
6@ingroup userver_testsuite
7"""
8
9import contextlib
10import copy
11import dataclasses
12import datetime
13from typing import Any
14from typing import Dict
15from typing import Iterable
16from typing import Iterator
17from typing import List
18from typing import Optional
19from typing import Set
20from typing import Tuple
21
22from pytest_userver.plugins import caches
23
24
25class BaseError(Exception):
26 """Base class for exceptions from this module"""
27
28
30 """Config parameter was not found and no default was provided"""
31
32
34 """
35 Calling `dynamic_config.get` before defaults are fetched from the service.
36 Try adding a dependency on `service_client` in your fixture.
37 """
38
39
41 """Dynamic config defaults action returned invalid response"""
42
43
45 """Invalid dynamic config name in `@pytest.mark.config`"""
46
47
48ConfigValuesDict = Dict[str, Any]
49
50
52 pass
53
54
55_REMOVE_KEY = _RemoveKey()
56
57
59 pass
60
61
62_MISSING = _Missing()
63
64
65@dataclasses.dataclass(frozen=True)
67 value: Any
68 static_default_preferred: bool
69
70
71_ConfigDict = Dict[str, _ConfigEntry | _RemoveKey]
72
73
74def _create_config_dict(values: ConfigValuesDict, kill_switches_disabled: Optional[Set[str]] = None) -> _ConfigDict:
75 if kill_switches_disabled is None:
76 kill_switches_disabled = set()
77
78 result = {}
79 for key, value in values.items():
80 static_default_preferred = key in kill_switches_disabled
81 result[key] = _ConfigEntry(value, static_default_preferred)
82 return result
83
84
85@dataclasses.dataclass
87 timestamp: str
88 dirty_keys: Set[str]
89 state: _ConfigDict
90 prev_state: _ConfigDict
91
92 @classmethod
93 def new(
94 cls,
95 *,
96 previous: Optional['_ChangelogEntry'],
97 timestamp: str,
98 ) -> '_ChangelogEntry':
99 if previous:
100 prev_state = previous.state
101 else:
102 prev_state = {}
103 return cls(
104 timestamp=timestamp,
105 dirty_keys=set(),
106 state=prev_state.copy(),
107 prev_state=prev_state,
108 )
109
110 @property
111 def has_changes(self) -> bool:
112 return bool(self.dirty_keysdirty_keys)
113
114 def update(self, values: _ConfigDict):
115 for key, value in values.items():
116 if value == self.prev_state.get(key, _MISSING):
117 self.dirty_keysdirty_keys.discard(key)
118 else:
119 self.dirty_keysdirty_keys.add(key)
120 self.state.update(values)
121
122
123@dataclasses.dataclass(frozen=True)
125 timestamp: str
126 values: ConfigValuesDict
127 removed: List[str]
128 kill_switches_disabled: List[str]
129
130 def is_empty(self) -> bool:
131 return not self.values and not self.removed
132
133
135 timestamp: datetime.datetime
136 committed_entries: List[_ChangelogEntry]
137 staged_entry: _ChangelogEntry
138
139 def __init__(self):
140 self.timestamptimestamp = datetime.datetime.fromtimestamp(
141 0,
142 datetime.timezone.utc,
143 )
145 self.staged_entrystaged_entry = _ChangelogEntry.new(
146 timestamp=self.service_timestamp(),
147 previous=None,
148 )
149
150 def service_timestamp(self) -> str:
151 return self.timestamptimestamp.strftime('%Y-%m-%dT%H:%M:%SZ')
152
153 def next_timestamp(self) -> str:
154 self.timestamptimestamp += datetime.timedelta(seconds=1)
155 return self.service_timestamp()
156
157 def commit(self) -> _ChangelogEntry:
158 """Commit staged changed if any and return last committed entry."""
159 entry = self.staged_entrystaged_entry
160 if entry.has_changes or not self.committed_entriescommitted_entries:
161 self.staged_entrystaged_entry = _ChangelogEntry.new(
162 timestamp=self.next_timestamp(),
163 previous=entry,
164 )
165 self.committed_entriescommitted_entries.append(entry)
167
168 def get_updated_since(
169 self,
170 config_dict: _ConfigDict,
171 updated_since: str,
172 ids: Optional[List[str]] = None,
173 ) -> _Updates:
174 entry = self.commit()
175 config_dict, removed = self._get_updated_since(config_dict, updated_since)
176 if ids:
177 config_dict = {name: config_dict[name] for name in ids if name in config_dict}
178 removed = [name for name in removed if name in ids]
179
180 values = {}
181 kill_switches_disabled = []
182 for name, config_entry in config_dict.items():
183 values[name] = config_entry.value
184 if config_entry.static_default_preferred:
185 kill_switches_disabled.append(name)
186
187 return _Updates(
188 timestamp=entry.timestamp,
189 values=values,
190 removed=removed,
191 kill_switches_disabled=kill_switches_disabled,
192 )
193
194 def _get_updated_since(
195 self,
196 config_dict: _ConfigDict,
197 updated_since: str,
198 ) -> Tuple[_ConfigDict, List[str]]:
199 if not updated_since:
200 return config_dict, []
201 dirty_keys = set()
202 last_known_state = {}
203 for entry in reversed(self.committed_entriescommitted_entries):
204 if entry.timestamp > updated_since:
205 dirty_keys.update(entry.dirty_keys)
206 else:
207 if entry.timestamp == updated_since:
208 last_known_state = entry.state
209 break
210 # We don't want to send them again
211 result = {}
212 removed = []
213 for key in dirty_keys:
214 config_entry = config_dict.get(key, _REMOVE_KEY)
215 if last_known_state.get(key, _MISSING) != config_entry:
216 if config_entry is _REMOVE_KEY:
217 removed.append(key)
218 else:
219 result[key] = config_entry
220 return result, removed
221
222 def add_entries(self, config_dict: _ConfigDict) -> None:
223 self.staged_entrystaged_entry.update(config_dict)
224
225 @contextlib.contextmanager
226 def rollback(self, defaults: ConfigValuesDict) -> Iterator[None]:
227 try:
228 yield
229 finally:
230 self._do_rollback(defaults)
231
232 def _do_rollback(self, defaults: ConfigValuesDict) -> None:
234 return
235
236 maybe_dirty = set()
237 for entry in self.committed_entriescommitted_entries:
238 maybe_dirty.update(entry.dirty_keys)
239
241 last_state = last.state
242 config_dict = _create_config_dict(defaults)
243 dirty_keys = set()
244 reverted = {}
245 for key in maybe_dirty:
246 original = config_dict.get(key, _REMOVE_KEY)
247 if last_state[key] != original:
248 dirty_keys.add(key)
249 reverted[key] = original
250
251 entry = _ChangelogEntry(
252 timestamp=last.timestamp,
253 state=last.state,
254 dirty_keys=dirty_keys,
255 prev_state={},
256 )
259 timestamp=self.staged_entrystaged_entry.timestamp,
260 dirty_keys=dirty_keys.copy(),
261 state=reverted,
262 prev_state=entry.state,
263 )
264
265
267 """
268 @brief Simple dynamic config backend.
269
270 @see @ref pytest_userver.plugins.dynamic_config.dynamic_config "dynamic_config"
271 """
272
273 def __init__(
274 self,
275 *,
276 initial_values: ConfigValuesDict,
277 defaults: Optional[ConfigValuesDict],
278 config_cache_components: Iterable[str],
279 cache_invalidation_state: caches.InvalidationState,
280 changelog: _Changelog,
281 ) -> None:
282 self._values = initial_values.copy()
283 self._kill_switches_disabled = set()
284 # Defaults are only there for convenience, to allow accessing them
285 # in tests using dynamic_config.get. They are not sent to the service.
286 self._defaults = defaults
287 self._cache_invalidation_state = cache_invalidation_state
288 self._config_cache_components = config_cache_components
289 self._changelog = changelog
290
291 def set_values(self, values: ConfigValuesDict) -> None:
292 self.set_values_unsafe(copy.deepcopy(values))
293
294 def set_values_unsafe(self, values: ConfigValuesDict) -> None:
295 self._values.update(values)
296 for key in values:
297 self._kill_switches_disabled.discard(key)
298
299 config_dict = _create_config_dict(values)
300 self._changelog.add_entries(config_dict)
301 self._sync_with_service()
302
303 def set(self, **values) -> None:
304 self.set_values(values)
305
306 def switch_to_static_default(self, *keys: str) -> None:
307 for key in keys:
308 self._kill_switches_disabled.add(key)
309
310 config_dict = _create_config_dict(
311 values={key: self._values.get(key, None) for key in keys}, kill_switches_disabled=set(keys)
312 )
313 self._changelog.add_entries(config_dict)
314 self._sync_with_service()
315
316 def switch_to_dynamic_value(self, *keys: str) -> None:
317 for key in keys:
318 self._kill_switches_disabled.discard(key)
319
320 config_dict = _create_config_dict(values={key: self._values[key] for key in keys if key in self._values})
321 self._changelog.add_entries(config_dict)
322 self._sync_with_service()
323
324 def get_values_unsafe(self) -> ConfigValuesDict:
325 return self._values
326
327 def get_kill_switches_disabled_unsafe(self) -> Set[str]:
328 return self._kill_switches_disabled
329
330 def get(self, key: str, default: Any = None) -> Any:
331 if key in self._values and key not in self._kill_switches_disabled:
332 return copy.deepcopy(self._values[key])
333 if self._defaults is not None and key in self._defaults:
334 return copy.deepcopy(self._defaults[key])
335 if default is not None:
336 return default
337 if self._defaults is None:
339 f'Defaults for config {key!r} have not yet been fetched '
340 'from the service. Options:\n'
341 '1. add a dependency on service_client in your fixture;\n'
342 '2. pass `default` parameter to `dynamic_config.get`',
343 )
344 raise DynamicConfigNotFoundError(f'Config {key!r} is not found')
345
346 def remove_values(self, keys: Iterable[str]) -> None:
347 extra_keys = set(keys).difference(self._values.keys())
348 if extra_keys:
350 f'Attempting to remove nonexistent configs: {extra_keys}',
351 )
352 for key in keys:
353 self._values.pop(key)
354 self._kill_switches_disabled.discard(key)
355
356 self._changelog.add_entries({key: _REMOVE_KEY for key in keys})
357 self._sync_with_service()
358
359 def remove(self, key: str) -> None:
360 return self.remove_values([key])
361
362 @contextlib.contextmanager
363 def modify(self, key: str) -> Any:
364 value = self.get(key)
365 yield value
366 self.set_values({key: value})
367
368 @contextlib.contextmanager
369 def modify_many(
370 self,
371 *keys: Tuple[str, ...],
372 ) -> Tuple[Any, ...]:
373 values = tuple(self.get(key) for key in keys)
374 yield values
375 self.set_values(dict(zip(keys, values)))
376
377 def _sync_with_service(self) -> None:
378 self._cache_invalidation_state.invalidate(
380 )
381
382
384 pass
385
386
387USE_STATIC_DEFAULT = UseStaticDefault()