135 timestamp: datetime.datetime
136 committed_entries: List[_ChangelogEntry]
137 staged_entry: _ChangelogEntry
142 datetime.timezone.utc,
150 def service_timestamp(self) -> str:
153 def next_timestamp(self) -> str:
158 """Commit staged changed if any and return last committed entry."""
168 def get_updated_since(
170 config_dict: _ConfigDict,
172 ids: Optional[List[str]] =
None,
177 config_dict = {name: config_dict[name]
for name
in ids
if name
in config_dict}
178 removed = [name
for name
in removed
if name
in ids]
181 kill_switches_disabled = []
182 for name, config_entry
in config_dict.items():
183 values[name] = config_entry.value
184 if config_entry.static_default_preferred:
185 kill_switches_disabled.append(name)
188 timestamp=entry.timestamp,
191 kill_switches_disabled=kill_switches_disabled,
194 def _get_updated_since(
196 config_dict: _ConfigDict,
198 ) -> Tuple[_ConfigDict, List[str]]:
199 if not updated_since:
200 return config_dict, []
202 last_known_state = {}
204 if entry.timestamp > updated_since:
205 dirty_keys.update(entry.dirty_keys)
207 if entry.timestamp == updated_since:
208 last_known_state = entry.state
213 for key
in dirty_keys:
214 config_entry = config_dict.get(key, _REMOVE_KEY)
215 if last_known_state.get(key, _MISSING) != config_entry:
216 if config_entry
is _REMOVE_KEY:
219 result[key] = config_entry
220 return result, removed
222 def add_entries(self, config_dict: _ConfigDict) ->
None:
225 @contextlib.contextmanager
226 def rollback(self, defaults: ConfigValuesDict) -> Iterator[
None]:
232 def _do_rollback(self, defaults: ConfigValuesDict) ->
None:
238 maybe_dirty.update(entry.dirty_keys)
241 last_state = last.state
242 config_dict = _create_config_dict(defaults)
245 for key
in maybe_dirty:
246 original = config_dict.get(key, _REMOVE_KEY)
247 if last_state[key] != original:
249 reverted[key] = original
252 timestamp=last.timestamp,
254 dirty_keys=dirty_keys,
260 dirty_keys=dirty_keys.copy(),
262 prev_state=entry.state,
268 @brief Simple dynamic config backend.
270 @see @ref pytest_userver.plugins.dynamic_config.dynamic_config "dynamic_config"
276 initial_values: ConfigValuesDict,
277 defaults: Optional[ConfigValuesDict],
278 config_cache_components: Iterable[str],
279 cache_invalidation_state: caches.InvalidationState,
280 changelog: _Changelog,
282 self.
_values = initial_values.copy()
291 def set_values(self, values: ConfigValuesDict) ->
None:
294 def set_values_unsafe(self, values: ConfigValuesDict) ->
None:
299 config_dict = _create_config_dict(values)
303 def set(self, **values) -> None:
306 def switch_to_static_default(self, *keys: str) ->
None:
310 config_dict = _create_config_dict(
311 values={key: self.
_values.get(key,
None)
for key
in keys}, kill_switches_disabled=set(keys)
316 def switch_to_dynamic_value(self, *keys: str) ->
None:
320 config_dict = _create_config_dict(values={key: self.
_values[key]
for key
in keys
if key
in self.
_values})
324 def get_values_unsafe(self) -> ConfigValuesDict:
327 def get_kill_switches_disabled_unsafe(self) -> Set[str]:
330 def get(self, key: str, default: Any =
None) -> Any:
332 return copy.deepcopy(self.
_values[key])
334 return copy.deepcopy(self.
_defaults[key])
335 if default
is not None:
339 f
'Defaults for config {key!r} have not yet been fetched '
340 'from the service. Options:\n'
341 '1. add a dependency on service_client in your fixture;\n'
342 '2. pass `default` parameter to `dynamic_config.get`',
346 def remove_values(self, keys: Iterable[str]) ->
None:
347 extra_keys = set(keys).difference(self.
_values.keys())
350 f
'Attempting to remove nonexistent configs: {extra_keys}',
356 self.
_changelog.add_entries({key: _REMOVE_KEY
for key
in keys})
359 def remove(self, key: str) ->
None:
362 @contextlib.contextmanager
363 def modify(self, key: str) -> Any:
364 value = self.
get(key)
368 @contextlib.contextmanager
371 *keys: Tuple[str, ...],
372 ) -> Tuple[Any, ...]:
373 values = tuple(self.
get(key)
for key
in keys)
377 def _sync_with_service(self) -> None: