userver: /data/code/userver/testsuite/pytest_plugins/pytest_userver/plugins/ydb/ydbsupport.py Source File
Loading...
Searching...
No Matches
ydbsupport.py
1# pylint: disable=redefined-outer-name
2import concurrent.futures
3import contextlib
4import os
5import pathlib
6import subprocess
7
8import pytest
9import yaml
10
11from testsuite.environment import shell
12
13from pytest_userver import sql
14from . import client
15from . import discover
16from . import service
17
18if hasattr(yaml, 'CLoader'):
19 _YamlLoader = yaml.CLoader # type: ignore
20else:
21 _YamlLoader = yaml.Loader # type: ignore
22
23USERVER_CONFIG_HOOKS = ['userver_config_ydb']
24
25
26@pytest.fixture
27def ydb(_ydb_client, _ydb_init) -> client.YdbClient:
28 """
29 YDB client fixture
30
31 @ingroup userver_testsuite_fixtures
32 """
33 return _ydb_client
34
35
36@pytest.fixture(scope='session')
37def _ydb_client(_ydb_client_pool):
38 with _ydb_client_pool() as ydb_client:
39 yield ydb_client
40
41
42@pytest.fixture(scope='session')
43def _ydb_client_pool(_ydb_service, ydb_service_settings):
44 endpoint = '{}:{}'.format(
45 ydb_service_settings.host,
46 ydb_service_settings.grpc_port,
47 )
48 pool = []
49
50 @contextlib.contextmanager
51 def get_client():
52 try:
53 ydb_client = pool.pop()
54 except IndexError:
55 ydb_client = client.YdbClient(
56 endpoint,
57 ydb_service_settings.database,
58 )
59 try:
60 yield ydb_client
61 finally:
62 pool.append(ydb_client)
63
64 return get_client
65
66
67def pytest_service_register(register_service):
68 register_service('ydb', service.create_ydb_service)
69
70
71@pytest.fixture(scope='session')
72def _ydb_service(pytestconfig, ensure_service_started, ydb_service_settings):
73 if os.environ.get('YDB_ENDPOINT') or pytestconfig.option.ydb_host:
74 return
75 ensure_service_started('ydb', settings=ydb_service_settings)
76
77
78@pytest.fixture(scope='session')
79def ydb_service_settings(pytestconfig) -> service.ServiceSettings:
80 endpoint_from_env = os.environ.get('YDB_ENDPOINT')
81 database = os.environ.get('YDB_DATABASE', 'local')
82
83 if endpoint_from_env:
84 host, grpc_port = endpoint_from_env.split(':', 1)
85 return service.ServiceSettings(
86 host=host,
87 grpc_port=grpc_port,
88 mon_port=None,
89 ic_port=None,
90 database=database,
91 )
92
93 if pytestconfig.option.ydb_host:
94 return service.ServiceSettings(
95 host=pytestconfig.option.ydb_host,
96 grpc_port=pytestconfig.option.ydb_grpc_port,
97 mon_port=pytestconfig.option.ydb_mon_port,
98 ic_port=pytestconfig.option.ydb_ic_port,
99 database=database,
100 )
101 return service.get_service_settings()
102
103
104@pytest.fixture(scope='session')
105def _ydb_service_schemas(service_source_dir):
106 service_schemas_ydb = service_source_dir / 'ydb' / 'schemas'
107 return discover.find_schemas([service_schemas_ydb])
108
109
110@pytest.fixture(scope='session')
111def ydb_settings_substitute(ydb_service_settings):
112 def secdist_settings(*args, **kwargs):
113 return {
114 'endpoint': '{}:{}'.format(
115 ydb_service_settings.host,
116 ydb_service_settings.grpc_port,
117 ),
118 'database': '/{}'.format(ydb_service_settings.database),
119 'token': '',
120 }
121
122 return {'ydb_settings': secdist_settings}
123
124
125@pytest.fixture(scope='session')
126def _ydb_state():
127 class State:
128 def __init__(self):
129 self.init = False
130 self.tables = []
131
132 return State()
133
134
135@pytest.fixture(scope='session')
136def ydb_migration_dir(service_source_dir) -> pathlib.Path:
137 """
138 Directory with migration files
139
140 @ingroup userver_testsuite_fixtures
141 """
142 return service_source_dir / 'ydb' / 'migrations'
143
144
145YDB_MIGRATION_TABLE = 'goose_db_version'
146
147
148def _ydb_migrate(ydb_service_settings, ydb_migration_dir, goose_binary_path):
149 if not ydb_migration_dir.exists():
150 return
151 if not list(ydb_migration_dir.iterdir()):
152 return
153
154 host = ydb_service_settings.host
155 port = ydb_service_settings.grpc_port
156
157 command = [
158 str(goose_binary_path),
159 '-dir',
160 str(ydb_migration_dir),
161 '-table',
162 YDB_MIGRATION_TABLE,
163 'ydb',
164 (f'grpc://{host}:{port}/local?go_query_mode=scripting&go_fake_tx=scripting&go_query_bind=declare,numeric'),
165 'up',
166 ]
167 try:
168 shell.execute(command, verbose=True, command_alias='ydb/migrations')
169 except shell.SubprocessFailed as exc:
170 raise Exception(f'YDB run migration failed:\n{exc}')
171
172
173@pytest.fixture(scope='session')
174def goose_binary_path() -> pathlib.Path:
175 """
176 Path to 'goose' migration tool.
177
178 Override this fixture to change the way 'goose' binary is discovered.
179
180 @ingroup userver_testsuite_fixtures
181 """
182 try:
183 import yatest
184
185 return yatest.common.runtime.binary_path(
186 'contrib/go/patched/goose/cmd/goose/goose',
187 )
188 except ImportError:
189 return 'goose'
190
191
192def _ydb_fetch_table_names(ydb_service_settings, ydb_cli) -> list[str]:
193 try:
194 host = ydb_service_settings.host
195 port = ydb_service_settings.grpc_port
196 output = subprocess.check_output(
197 [
198 str(ydb_cli),
199 '-e',
200 f'grpc://{host}:{port}',
201 '-d',
202 '/local',
203 'scheme',
204 'ls',
205 '-lR',
206 ],
207 encoding='utf-8',
208 )
209 tables = []
210
211 for line in output.split('\n'):
212 if ' table ' not in line:
213 continue
214 if '.sys' in line:
215 continue
216 if YDB_MIGRATION_TABLE in line:
217 continue
218 path = line.split('│')[6].strip()
219 tables.append(path)
220 return tables
221 except subprocess.CalledProcessError as exc:
222 raise Exception(f'Could not fetch table names:\n{exc}')
223
224
225@pytest.fixture(scope='session')
226def ydb_cli() -> pathlib.Path:
227 """
228 Path to YDB CLI executable.
229
230 Override this fixture to change the way YDB CLI is discovered.
231
232 @ingroup userver_testsuite_fixtures
233 """
234 try:
235 import yatest
236
237 return yatest.common.runtime.binary_path('contrib/ydb/apps/ydb/ydb')
238 except ImportError:
239 return 'ydb'
240
241
242@pytest.fixture(scope='session')
243def _ydb_prepare(
244 _ydb_client,
245 _ydb_service_schemas,
246 ydb_service_settings,
247 _ydb_state,
248 ydb_migration_dir,
249 goose_binary_path,
250):
251 if _ydb_service_schemas and ydb_migration_dir.exists():
252 raise Exception(
253 'Both ydb/schema and ydb/migrations exist, which are mutually exclusive',
254 )
255
256 # testsuite legacy
257 for schema_path in _ydb_service_schemas:
258 with open(schema_path) as fp:
259 tables_schemas = yaml.load(fp.read(), Loader=_YamlLoader)
260 for table_schema in tables_schemas:
261 client.drop_table(_ydb_client, table_schema['path'])
262 client.create_table(_ydb_client, table_schema)
263 _ydb_state.tables.append(table_schema['path'])
264
265 # goose
266 _ydb_migrate(ydb_service_settings, ydb_migration_dir, goose_binary_path)
267
268 _ydb_state.init = True
269
270
271@pytest.fixture(scope='session')
272def _ydb_tables(_ydb_state, _ydb_prepare, ydb_service_settings, ydb_cli):
273 tables = {
274 *_ydb_state.tables,
275 *_ydb_fetch_table_names(ydb_service_settings, ydb_cli),
276 }
277 return tuple(sorted(tables))
278
279
280@pytest.fixture
281def _ydb_init(
282 request,
283 _ydb_client,
284 _ydb_state,
285 ydb_service_settings,
286 _ydb_prepare,
287 _ydb_tables,
288 _ydb_client_pool,
289 load,
290):
291 def ydb_mark_queries(files=(), queries=()):
292 result_queries = []
293 for path in files:
294 result_queries.append(load(path))
295 result_queries.extend(queries)
296 return result_queries
297
298 def drop_table(table):
299 with _ydb_client_pool() as ydb_client:
300 ydb_client.execute('DELETE FROM `{}`'.format(table))
301
302 if _ydb_tables:
303 with concurrent.futures.ThreadPoolExecutor(
304 max_workers=len(_ydb_tables),
305 ) as executer:
306 executer.map(drop_table, _ydb_tables)
307
308 for mark in request.node.iter_markers('ydb'):
309 queries = ydb_mark_queries(**mark.kwargs)
310 for query in queries:
311 _ydb_client.execute(query)
312
313
314@pytest.fixture
315def userver_ydb_trx(testpoint) -> sql.RegisteredTrx:
316 """
317 The fixture maintains transaction fault injection state using
318 RegisteredTrx class.
319
320 @see pytest_userver.sql.RegisteredTrx
321
322 @snippet integration_tests/tests/test_trx_failure.py fault injection
323
324 @ingroup userver_testsuite_fixtures
325 """
326
327 registered = sql.RegisteredTrx()
328
329 @testpoint('ydb_trx_commit')
330 def _pg_trx_tp(data):
331 should_fail = registered.is_failure_enabled(data['trx_name'])
332 return {'trx_should_fail': should_fail}
333
334 return registered
335
336
337@pytest.fixture(scope='session')
338def userver_config_ydb(ydb_service_settings):
339 """
340 Returns a function that adjusts the static configuration file for testsuite.
341
342 For all `ydb.databases`, sets `endpoint` and `database` to the local test
343 YDB instance.
344
345 @ingroup userver_testsuite_fixtures
346 """
347
348 endpoint = f'{ydb_service_settings.host}:{ydb_service_settings.grpc_port}'
349 database = ('' if ydb_service_settings.database.startswith('/') else '/') + ydb_service_settings.database
350
351 def patch_config(config, config_vars):
352 ydb_component = config['components_manager']['components']['ydb']
353 if isinstance(ydb_component, str):
354 ydb_component = config_vars[ydb_component[1:]]
355 databases = ydb_component['databases']
356 for dbconfig in databases.values():
357 dbconfig['endpoint'] = endpoint
358 dbconfig['database'] = database
359
360 return patch_config