userver: /data/code/service_template/third_party/userver/testsuite/pytest_plugins/pytest_userver/plugins/ydb/ydbsupport.py Source File
Loading...
Searching...
No Matches
ydbsupport.py
1# pylint: disable=redefined-outer-name
2import concurrent.futures
3import contextlib
4import os
5import pathlib
6import subprocess
7from typing import List
8from typing import Optional
9
10import pytest
11import yaml
12
13from testsuite.environment import shell
14
15from pytest_userver import sql
16from . import client
17from . import discover
18from . import service
19
20if hasattr(yaml, 'CLoader'):
21 _YamlLoader = yaml.CLoader # type: ignore
22else:
23 _YamlLoader = yaml.Loader # type: ignore
24
25
26@pytest.fixture
27def ydb(_ydb_client, _ydb_init):
28 return _ydb_client
29
30
31@pytest.fixture(scope='session')
32def _ydb_client(_ydb_client_pool):
33 with _ydb_client_pool() as ydb_client:
34 yield ydb_client
35
36
37@pytest.fixture(scope='session')
38def _ydb_client_pool(_ydb_service, _ydb_service_settings):
39 endpoint = '{}:{}'.format(
40 _ydb_service_settings.host, _ydb_service_settings.grpc_port,
41 )
42 pool = []
43
44 @contextlib.contextmanager
45 def get_client():
46 try:
47 ydb_client = pool.pop()
48 except IndexError:
49 ydb_client = client.YdbClient(
50 endpoint, _ydb_service_settings.database,
51 )
52 try:
53 yield ydb_client
54 finally:
55 pool.append(ydb_client)
56
57 return get_client
58
59
60def pytest_service_register(register_service):
61 register_service('ydb', service.create_ydb_service)
62
63
64@pytest.fixture(scope='session')
65def _ydb_service(pytestconfig, ensure_service_started, _ydb_service_settings):
66 if os.environ.get('YDB_ENDPOINT') or pytestconfig.option.ydb_host:
67 return
68 ensure_service_started('ydb', settings=_ydb_service_settings)
69
70
71@pytest.fixture(scope='session')
72def ydb_service_settings(_ydb_service_settings):
73 return _ydb_service_settings
74
75
76@pytest.fixture(scope='session')
77def _ydb_service_settings(pytestconfig):
78 endpoint_from_env = os.environ.get('YDB_ENDPOINT')
79 database = os.environ.get('YDB_DATABASE', 'local')
80
81 if endpoint_from_env:
82 host, grpc_port = endpoint_from_env.split(':', 1)
83 return service.ServiceSettings(
84 host=host,
85 grpc_port=grpc_port,
86 mon_port=None,
87 ic_port=None,
88 database=database,
89 )
90
91 if pytestconfig.option.ydb_host:
92 return service.ServiceSettings(
93 host=pytestconfig.option.ydb_host,
94 grpc_port=pytestconfig.option.ydb_grpc_port,
95 mon_port=pytestconfig.option.ydb_mon_port,
96 ic_port=pytestconfig.option.ydb_ic_port,
97 database=database,
98 )
99 return service.get_service_settings()
100
101
102@pytest.fixture(scope='session')
103def _ydb_service_schemas(service_source_dir):
104 service_schemas_ydb = service_source_dir / 'ydb' / 'schemas'
105 return discover.find_schemas([service_schemas_ydb])
106
107
108@pytest.fixture(scope='session')
109def ydb_settings_substitute(_ydb_service_settings):
110 def secdist_settings(*args, **kwargs):
111 return {
112 'endpoint': '{}:{}'.format(
113 _ydb_service_settings.host, _ydb_service_settings.grpc_port,
114 ),
115 'database': '/{}'.format(_ydb_service_settings.database),
116 'token': '',
117 }
118
119 return {'ydb_settings': secdist_settings}
120
121
122@pytest.fixture(scope='session')
123def _ydb_state():
124 class State:
125 def __init__(self):
126 self.init = False
127 self.tables = []
128
129 return State()
130
131
132@pytest.fixture(scope='session')
133def ydb_migrate_dir(service_source_dir) -> pathlib.Path:
134 return service_source_dir / 'ydb' / 'migrations'
135
136
137def _ydb_migrate(_ydb_service_settings, ydb_migrate_dir):
138 if not ydb_migrate_dir.exists():
139 return
140 if not list(ydb_migrate_dir.iterdir()):
141 return
142
143 if not _get_goose():
144 return
145
146 host = _ydb_service_settings.host
147 port = _ydb_service_settings.grpc_port
148
149 command = [
150 str(_get_goose()),
151 '-dir',
152 str(ydb_migrate_dir),
153 'ydb',
154 (
155 f'grpc://{host}:{port}/local?go_query_mode=scripting&'
156 'go_fake_tx=scripting&go_query_bind=declare,numeric'
157 ),
158 'up',
159 ]
160 try:
161 shell.execute(command, verbose=True, command_alias='ydb/migrations')
162 except shell.SubprocessFailed as exc:
163 raise Exception(f'YDB run migration failed:\n\n{exc}')
164
165
166def _get_goose() -> Optional[pathlib.Path]:
167 try:
168 import yatest
169
170 return yatest.common.runtime.binary_path(
171 'contrib/go/patched/goose/cmd/goose/goose',
172 )
173 except ImportError:
174 return None
175
176
177def _ydb_fetch_table_names(_ydb_service_settings) -> List[str]:
178 try:
179 import yatest
180
181 host = _ydb_service_settings.host
182 port = _ydb_service_settings.grpc_port
183 output = subprocess.check_output(
184 [
185 yatest.common.runtime.binary_path('contrib/ydb/apps/ydb/ydb'),
186 '-e',
187 f'grpc://{host}:{port}',
188 '-d',
189 '/local',
190 'scheme',
191 'ls',
192 '-lR',
193 ],
194 encoding='utf-8',
195 )
196 tables = []
197
198 for line in output.split('\n'):
199 if ' table ' not in line:
200 continue
201 if '.sys' in line:
202 continue
203 path = line.split('│')[6].strip()
204 tables.append(path)
205 return tables
206 except ImportError:
207 return []
208
209
210@pytest.fixture(scope='session')
211def _ydb_prepare(
212 _ydb_client,
213 _ydb_service_schemas,
214 _ydb_service_settings,
215 _ydb_state,
216 ydb_migrate_dir,
217):
218 if _ydb_service_schemas and ydb_migrate_dir.exists():
219 raise Exception(
220 'Both ydb/schema and ydb/migrations exist, '
221 'which are mutually exclusive',
222 )
223
224 # testsuite legacy
225 for schema_path in _ydb_service_schemas:
226 with open(schema_path) as fp:
227 tables_schemas = yaml.load(fp.read(), Loader=_YamlLoader)
228 for table_schema in tables_schemas:
229 client.drop_table(_ydb_client, table_schema['path'])
230 client.create_table(_ydb_client, table_schema)
231 _ydb_state.tables.append(table_schema['path'])
232
233 # goose
234 _ydb_migrate(_ydb_service_settings, ydb_migrate_dir)
235
236 _ydb_state.init = True
237
238
239@pytest.fixture(scope='session')
240def _ydb_tables(_ydb_state, _ydb_prepare, _ydb_service_settings):
241 tables = {
242 *_ydb_state.tables,
243 *_ydb_fetch_table_names(_ydb_service_settings),
244 }
245 return tuple(sorted(tables))
246
247
248@pytest.fixture
249def _ydb_init(
250 request,
251 _ydb_client,
252 _ydb_state,
253 _ydb_service_settings,
254 _ydb_prepare,
255 _ydb_tables,
256 _ydb_client_pool,
257 load,
258):
259 def ydb_mark_queries(files=(), queries=()):
260 result_queries = []
261 for path in files:
262 result_queries.append(load(path))
263 result_queries.extend(queries)
264 return result_queries
265
266 def drop_table(table):
267 with _ydb_client_pool() as ydb_client:
268 ydb_client.execute('DELETE FROM `{}`'.format(table))
269
270 if _ydb_tables:
271 with concurrent.futures.ThreadPoolExecutor(
272 max_workers=len(_ydb_tables),
273 ) as executer:
274 executer.map(drop_table, _ydb_tables)
275
276 for mark in request.node.iter_markers('ydb'):
277 queries = ydb_mark_queries(**mark.kwargs)
278 for query in queries:
279 _ydb_client.execute(query)
280
281
282@pytest.fixture
283def userver_ydb_trx(testpoint) -> sql.RegisteredTrx:
284 """
285 The fixture maintains transaction fault injection state using
286 RegisteredTrx class.
287
288 @see pytest_userver.sql.RegisteredTrx
289
290 @snippet integration_tests/tests/test_trx_failure.py fault injection
291
292 @ingroup userver_testsuite_fixtures
293 """
294
295 registered = sql.RegisteredTrx()
296
297 @testpoint('ydb_trx_commit')
298 def _pg_trx_tp(data):
299 should_fail = registered.is_failure_enabled(data['trx_name'])
300 return {'trx_should_fail': should_fail}
301
302 return registered