userver: /data/code/userver/testsuite/pytest_plugins/pytest_userver/plugins/ydb/ydbsupport.py Source File
Loading...
Searching...
No Matches
ydbsupport.py
1# pylint: disable=redefined-outer-name
2import concurrent.futures
3import contextlib
4import os
5import pathlib
6import subprocess
7from typing import List
8
9import pytest
10import yaml
11
12from testsuite.environment import shell
13
14from pytest_userver import sql
15from . import client
16from . import discover
17from . import service
18
19if hasattr(yaml, 'CLoader'):
20 _YamlLoader = yaml.CLoader # type: ignore
21else:
22 _YamlLoader = yaml.Loader # type: ignore
23
24USERVER_CONFIG_HOOKS = ['userver_config_ydb']
25
26
27@pytest.fixture
28def ydb(_ydb_client, _ydb_init):
29 return _ydb_client
30
31
32@pytest.fixture(scope='session')
33def _ydb_client(_ydb_client_pool):
34 with _ydb_client_pool() as ydb_client:
35 yield ydb_client
36
37
38@pytest.fixture(scope='session')
39def _ydb_client_pool(_ydb_service, ydb_service_settings):
40 endpoint = '{}:{}'.format(
41 ydb_service_settings.host, ydb_service_settings.grpc_port,
42 )
43 pool = []
44
45 @contextlib.contextmanager
46 def get_client():
47 try:
48 ydb_client = pool.pop()
49 except IndexError:
50 ydb_client = client.YdbClient(
51 endpoint, ydb_service_settings.database,
52 )
53 try:
54 yield ydb_client
55 finally:
56 pool.append(ydb_client)
57
58 return get_client
59
60
61def pytest_service_register(register_service):
62 register_service('ydb', service.create_ydb_service)
63
64
65@pytest.fixture(scope='session')
66def _ydb_service(pytestconfig, ensure_service_started, ydb_service_settings):
67 if os.environ.get('YDB_ENDPOINT') or pytestconfig.option.ydb_host:
68 return
69 ensure_service_started('ydb', settings=ydb_service_settings)
70
71
72@pytest.fixture(scope='session')
73def ydb_service_settings(pytestconfig) -> service.ServiceSettings:
74 endpoint_from_env = os.environ.get('YDB_ENDPOINT')
75 database = os.environ.get('YDB_DATABASE', 'local')
76
77 if endpoint_from_env:
78 host, grpc_port = endpoint_from_env.split(':', 1)
79 return service.ServiceSettings(
80 host=host,
81 grpc_port=grpc_port,
82 mon_port=None,
83 ic_port=None,
84 database=database,
85 )
86
87 if pytestconfig.option.ydb_host:
88 return service.ServiceSettings(
89 host=pytestconfig.option.ydb_host,
90 grpc_port=pytestconfig.option.ydb_grpc_port,
91 mon_port=pytestconfig.option.ydb_mon_port,
92 ic_port=pytestconfig.option.ydb_ic_port,
93 database=database,
94 )
95 return service.get_service_settings()
96
97
98@pytest.fixture(scope='session')
99def _ydb_service_schemas(service_source_dir):
100 service_schemas_ydb = service_source_dir / 'ydb' / 'schemas'
101 return discover.find_schemas([service_schemas_ydb])
102
103
104@pytest.fixture(scope='session')
105def ydb_settings_substitute(ydb_service_settings):
106 def secdist_settings(*args, **kwargs):
107 return {
108 'endpoint': '{}:{}'.format(
109 ydb_service_settings.host, ydb_service_settings.grpc_port,
110 ),
111 'database': '/{}'.format(ydb_service_settings.database),
112 'token': '',
113 }
114
115 return {'ydb_settings': secdist_settings}
116
117
118@pytest.fixture(scope='session')
119def _ydb_state():
120 class State:
121 def __init__(self):
122 self.init = False
123 self.tables = []
124
125 return State()
126
127
128@pytest.fixture(scope='session')
129def ydb_migration_dir(service_source_dir) -> pathlib.Path:
130 """
131 Directory with migration files
132
133 @ingroup userver_testsuite_fixtures
134 """
135 return service_source_dir / 'ydb' / 'migrations'
136
137
138YDB_MIGRATION_TABLE = 'goose_db_version'
139
140
141def _ydb_migrate(ydb_service_settings, ydb_migration_dir, goose_binary_path):
142 if not ydb_migration_dir.exists():
143 return
144 if not list(ydb_migration_dir.iterdir()):
145 return
146
147 host = ydb_service_settings.host
148 port = ydb_service_settings.grpc_port
149
150 command = [
151 str(goose_binary_path),
152 '-dir',
153 str(ydb_migration_dir),
154 '-table',
155 YDB_MIGRATION_TABLE,
156 'ydb',
157 (
158 f'grpc://{host}:{port}/local?go_query_mode=scripting&'
159 'go_fake_tx=scripting&go_query_bind=declare,numeric'
160 ),
161 'up',
162 ]
163 try:
164 shell.execute(command, verbose=True, command_alias='ydb/migrations')
165 except shell.SubprocessFailed as exc:
166 raise Exception(f'YDB run migration failed:\n{exc}')
167
168
169@pytest.fixture(scope='session')
170def goose_binary_path() -> pathlib.Path:
171 """
172 Path to 'goose' migration tool.
173
174 Override this fixture to change the way 'goose' binary is discovered.
175
176 @ingroup userver_testsuite_fixtures
177 """
178 try:
179 import yatest
180
181 return yatest.common.runtime.binary_path(
182 'contrib/go/patched/goose/cmd/goose/goose',
183 )
184 except ImportError:
185 return 'goose'
186
187
188def _ydb_fetch_table_names(ydb_service_settings, ydb_cli) -> List[str]:
189 try:
190 host = ydb_service_settings.host
191 port = ydb_service_settings.grpc_port
192 output = subprocess.check_output(
193 [
194 str(ydb_cli),
195 '-e',
196 f'grpc://{host}:{port}',
197 '-d',
198 '/local',
199 'scheme',
200 'ls',
201 '-lR',
202 ],
203 encoding='utf-8',
204 )
205 tables = []
206
207 for line in output.split('\n'):
208 if ' table ' not in line:
209 continue
210 if '.sys' in line:
211 continue
212 if YDB_MIGRATION_TABLE in line:
213 continue
214 path = line.split('│')[6].strip()
215 tables.append(path)
216 return tables
217 except subprocess.CalledProcessError as exc:
218 raise Exception(f'Could not fetch table names:\n{exc}')
219
220
221@pytest.fixture(scope='session')
222def ydb_cli() -> pathlib.Path:
223 """
224 Path to YDB CLI executable.
225
226 Override this fixture to change the way YDB CLI is discovered.
227
228 @ingroup userver_testsuite_fixtures
229 """
230 try:
231 import yatest
232
233 return yatest.common.runtime.binary_path('contrib/ydb/apps/ydb/ydb')
234 except ImportError:
235 return 'ydb'
236
237
238@pytest.fixture(scope='session')
239def _ydb_prepare(
240 _ydb_client,
241 _ydb_service_schemas,
242 ydb_service_settings,
243 _ydb_state,
244 ydb_migration_dir,
245 goose_binary_path,
246):
247 if _ydb_service_schemas and ydb_migration_dir.exists():
248 raise Exception(
249 'Both ydb/schema and ydb/migrations exist, '
250 'which are mutually exclusive',
251 )
252
253 # testsuite legacy
254 for schema_path in _ydb_service_schemas:
255 with open(schema_path) as fp:
256 tables_schemas = yaml.load(fp.read(), Loader=_YamlLoader)
257 for table_schema in tables_schemas:
258 client.drop_table(_ydb_client, table_schema['path'])
259 client.create_table(_ydb_client, table_schema)
260 _ydb_state.tables.append(table_schema['path'])
261
262 # goose
263 _ydb_migrate(ydb_service_settings, ydb_migration_dir, goose_binary_path)
264
265 _ydb_state.init = True
266
267
268@pytest.fixture(scope='session')
269def _ydb_tables(_ydb_state, _ydb_prepare, ydb_service_settings, ydb_cli):
270 tables = {
271 *_ydb_state.tables,
272 *_ydb_fetch_table_names(ydb_service_settings, ydb_cli),
273 }
274 return tuple(sorted(tables))
275
276
277@pytest.fixture
278def _ydb_init(
279 request,
280 _ydb_client,
281 _ydb_state,
282 ydb_service_settings,
283 _ydb_prepare,
284 _ydb_tables,
285 _ydb_client_pool,
286 load,
287):
288 def ydb_mark_queries(files=(), queries=()):
289 result_queries = []
290 for path in files:
291 result_queries.append(load(path))
292 result_queries.extend(queries)
293 return result_queries
294
295 def drop_table(table):
296 with _ydb_client_pool() as ydb_client:
297 ydb_client.execute('DELETE FROM `{}`'.format(table))
298
299 if _ydb_tables:
300 with concurrent.futures.ThreadPoolExecutor(
301 max_workers=len(_ydb_tables),
302 ) as executer:
303 executer.map(drop_table, _ydb_tables)
304
305 for mark in request.node.iter_markers('ydb'):
306 queries = ydb_mark_queries(**mark.kwargs)
307 for query in queries:
308 _ydb_client.execute(query)
309
310
311@pytest.fixture
312def userver_ydb_trx(testpoint) -> sql.RegisteredTrx:
313 """
314 The fixture maintains transaction fault injection state using
315 RegisteredTrx class.
316
317 @see pytest_userver.sql.RegisteredTrx
318
319 @snippet integration_tests/tests/test_trx_failure.py fault injection
320
321 @ingroup userver_testsuite_fixtures
322 """
323
324 registered = sql.RegisteredTrx()
325
326 @testpoint('ydb_trx_commit')
327 def _pg_trx_tp(data):
328 should_fail = registered.is_failure_enabled(data['trx_name'])
329 return {'trx_should_fail': should_fail}
330
331 return registered
332
333
334@pytest.fixture(scope='session')
335def userver_config_ydb(ydb_service_settings):
336 """
337 Returns a function that adjusts the static configuration file for testsuite.
338
339 For all `ydb.databases`, sets `endpoint` and `database` to the local test
340 YDB instance.
341
342 @ingroup userver_testsuite_fixtures
343 """
344
345 endpoint = f'{ydb_service_settings.host}:{ydb_service_settings.grpc_port}'
346 database = (
347 '' if ydb_service_settings.database.startswith('/') else '/'
348 ) + ydb_service_settings.database
349
350 def patch_config(config, config_vars):
351 ydb_component = config['components_manager']['components']['ydb']
352 if isinstance(ydb_component, str):
353 ydb_component = config_vars[ydb_component[1:]]
354 databases = ydb_component['databases']
355 for dbname, dbconfig in databases.items():
356 dbconfig['endpoint'] = endpoint
357 dbconfig['database'] = database
358
359 return patch_config