userver
C++ Async Framework
Loading...
Searching...
No Matches
sql_coverage.py
1
"""
2
Plugin that imports the required fixtures for checking SQL/YQL coverage. See
3
@ref sql_coverage_test_info "SQL coverage tests" for more info.
4
5
@ingroup userver_testsuite_fixtures
6
"""
7
8
from
typing
import
Set
9
10
import
pytest
11
12
from
.
import
coverage
13
14
_SQL_COVERAGE_TEST_NAME =
'test_sql_coverage'
15
16
17
@pytest.fixture
18
def
on_uncovered
():
19
"""
20
Called when the coverage is incomplete.
21
22
Override this fixture to change the way uncovered statements are reported or to ignore some of the statements
23
from coverage report.
24
25
See @ref sql_coverage_test_info "SQL coverage tests" for more info.
26
27
@ingroup userver_testsuite_fixtures
28
"""
29
30
def
_on_uncovered(uncovered_statements: Set[str]):
31
msg = f
'Uncovered SQL/YQL statements: {uncovered_statements}'
32
raise
coverage.UncoveredError
(msg)
33
34
return
_on_uncovered
35
36
37
class
Coverage
:
38
"""
39
Contains data about the current coverage of statements.
40
"""
41
42
def
__init__(self, files: Set[str]):
43
self.
covered_statements
: Set[str] = set()
44
self.
uncovered_statements
: Set[str] = files
45
self.
extra_covered_statements
: Set[str] = set()
46
47
def
cover(self, statement: str) ->
None
:
48
if
statement
in
self.
uncovered_statements
:
49
self.
covered_statements
.add(statement)
50
self.
uncovered_statements
.remove(statement)
51
elif
statement
not
in
self.
covered_statements
:
52
self.
extra_covered_statements
.add(statement)
53
54
def
validate(self, uncovered_callback: callable) ->
None
:
55
if
self.
uncovered_statements
:
56
uncovered_callback(self.
uncovered_statements
)
57
58
59
@pytest.fixture(scope='session')
60
def
sql_coverage(sql_files) -> Coverage:
61
"""
62
Returns data about the current coverage of statements.
63
64
See @ref sql_coverage_test_info "SQL coverage tests" for more info.
65
66
@ingroup userver_testsuite_fixtures
67
"""
68
return
Coverage
(set(sql_files))
69
70
71
@pytest.fixture(scope='function', autouse=True)
72
async def
sql_statement_hook
(testpoint, sql_coverage):
73
"""
74
Hook that accepts requests from the testpoint with information on PostgreSQL statements coverage.
75
76
See @ref sql_coverage_test_info "SQL coverage tests" for more info.
77
78
@ingroup userver_testsuite_fixtures
79
"""
80
81
@testpoint('sql_statement')
82
def
_hook(request):
83
sql_coverage.cover(request[
'name'
])
84
85
yield
_hook
86
87
88
@pytest.fixture(scope='function', autouse=True)
89
async def
yql_statement_hook
(testpoint, sql_coverage):
90
"""
91
Hook that accepts requests from the testpoint with information on YDB statements coverage.
92
93
See @ref sql_coverage_test_info "SQL coverage tests" for more info.
94
95
@ingroup userver_testsuite_fixtures
96
"""
97
98
@testpoint('yql_statement')
99
def
_hook(request):
100
sql_coverage.cover(request[
'name'
])
101
102
yield
_hook
103
104
105
@pytest.hookimpl(hookwrapper=True)
106
def
pytest_collection_modifyitems(config, items):
107
yield
108
if
not
items:
109
return
110
111
coverage.collection_modifyitems(_SQL_COVERAGE_TEST_NAME, config, items)
testsuite
pytest_plugins
pytest_userver
plugins
sql_coverage.py
Generated on Tue Sep 2 2025 12:40:37 for userver by
Doxygen
1.13.2