userver: /data/code/userver/testsuite/pytest_plugins/pytest_userver/sql.py Source File
Loading...
Searching...
No Matches
sql.py
2 """
3 RegisteredTrx maintains transaction fault injection state to test
4 transaction failure code path.
5
6 You may enable specific transaction failure calling `enable_trx_failure`
7 on that transaction name. After that, the transaction's `Commit` method
8 will throw an exception.
9
10 If you don't need a fault injection anymore (e.g. you want to test
11 a successfull retry), you may call `disable_trx_failure` afterwards.
12
13 Example usage:
14 @snippet integration_tests/tests/test_trx_failure.py fault injection
15
16 @ingroup userver_testsuite
17 """
18
19 def __init__(self):
20 self._registered_trx = set()
21
22 def enable_failure(self, name: str) -> None:
23 self._registered_trx.add(name)
24
25 def disable_failure(self, name: str) -> None:
26 if self.is_failure_enabled(name):
27 self._registered_trx.remove(name)
28
29 def is_failure_enabled(self, name: str) -> bool:
30 return name in self._registered_trx