userver: /data/code/userver/testsuite/pytest_plugins/pytest_userver/plugins/ydb/ydbsupport.py Source File
Loading...
Searching...
No Matches
ydbsupport.py
1# pylint: disable=redefined-outer-name
2import concurrent.futures
3import contextlib
4import os
5import pathlib
6import subprocess
7from typing import List
8
9import pytest
10import yaml
11
12from testsuite.environment import shell
13
14from pytest_userver import sql
15from . import client
16from . import discover
17from . import service
18
19if hasattr(yaml, 'CLoader'):
20 _YamlLoader = yaml.CLoader # type: ignore
21else:
22 _YamlLoader = yaml.Loader # type: ignore
23
24USERVER_CONFIG_HOOKS = ['userver_config_ydb']
25
26
27@pytest.fixture
28def ydb(_ydb_client, _ydb_init):
29 return _ydb_client
30
31
32@pytest.fixture(scope='session')
33def _ydb_client(_ydb_client_pool):
34 with _ydb_client_pool() as ydb_client:
35 yield ydb_client
36
37
38@pytest.fixture(scope='session')
39def _ydb_client_pool(_ydb_service, ydb_service_settings):
40 endpoint = '{}:{}'.format(
41 ydb_service_settings.host,
42 ydb_service_settings.grpc_port,
43 )
44 pool = []
45
46 @contextlib.contextmanager
47 def get_client():
48 try:
49 ydb_client = pool.pop()
50 except IndexError:
51 ydb_client = client.YdbClient(
52 endpoint,
53 ydb_service_settings.database,
54 )
55 try:
56 yield ydb_client
57 finally:
58 pool.append(ydb_client)
59
60 return get_client
61
62
63def pytest_service_register(register_service):
64 register_service('ydb', service.create_ydb_service)
65
66
67@pytest.fixture(scope='session')
68def _ydb_service(pytestconfig, ensure_service_started, ydb_service_settings):
69 if os.environ.get('YDB_ENDPOINT') or pytestconfig.option.ydb_host:
70 return
71 ensure_service_started('ydb', settings=ydb_service_settings)
72
73
74@pytest.fixture(scope='session')
75def ydb_service_settings(pytestconfig) -> service.ServiceSettings:
76 endpoint_from_env = os.environ.get('YDB_ENDPOINT')
77 database = os.environ.get('YDB_DATABASE', 'local')
78
79 if endpoint_from_env:
80 host, grpc_port = endpoint_from_env.split(':', 1)
81 return service.ServiceSettings(
82 host=host,
83 grpc_port=grpc_port,
84 mon_port=None,
85 ic_port=None,
86 database=database,
87 )
88
89 if pytestconfig.option.ydb_host:
90 return service.ServiceSettings(
91 host=pytestconfig.option.ydb_host,
92 grpc_port=pytestconfig.option.ydb_grpc_port,
93 mon_port=pytestconfig.option.ydb_mon_port,
94 ic_port=pytestconfig.option.ydb_ic_port,
95 database=database,
96 )
97 return service.get_service_settings()
98
99
100@pytest.fixture(scope='session')
101def _ydb_service_schemas(service_source_dir):
102 service_schemas_ydb = service_source_dir / 'ydb' / 'schemas'
103 return discover.find_schemas([service_schemas_ydb])
104
105
106@pytest.fixture(scope='session')
107def ydb_settings_substitute(ydb_service_settings):
108 def secdist_settings(*args, **kwargs):
109 return {
110 'endpoint': '{}:{}'.format(
111 ydb_service_settings.host,
112 ydb_service_settings.grpc_port,
113 ),
114 'database': '/{}'.format(ydb_service_settings.database),
115 'token': '',
116 }
117
118 return {'ydb_settings': secdist_settings}
119
120
121@pytest.fixture(scope='session')
122def _ydb_state():
123 class State:
124 def __init__(self):
125 self.init = False
126 self.tables = []
127
128 return State()
129
130
131@pytest.fixture(scope='session')
132def ydb_migration_dir(service_source_dir) -> pathlib.Path:
133 """
134 Directory with migration files
135
136 @ingroup userver_testsuite_fixtures
137 """
138 return service_source_dir / 'ydb' / 'migrations'
139
140
141YDB_MIGRATION_TABLE = 'goose_db_version'
142
143
144def _ydb_migrate(ydb_service_settings, ydb_migration_dir, goose_binary_path):
145 if not ydb_migration_dir.exists():
146 return
147 if not list(ydb_migration_dir.iterdir()):
148 return
149
150 host = ydb_service_settings.host
151 port = ydb_service_settings.grpc_port
152
153 command = [
154 str(goose_binary_path),
155 '-dir',
156 str(ydb_migration_dir),
157 '-table',
158 YDB_MIGRATION_TABLE,
159 'ydb',
160 (f'grpc://{host}:{port}/local?go_query_mode=scripting&go_fake_tx=scripting&go_query_bind=declare,numeric'),
161 'up',
162 ]
163 try:
164 shell.execute(command, verbose=True, command_alias='ydb/migrations')
165 except shell.SubprocessFailed as exc:
166 raise Exception(f'YDB run migration failed:\n{exc}')
167
168
169@pytest.fixture(scope='session')
170def goose_binary_path() -> pathlib.Path:
171 """
172 Path to 'goose' migration tool.
173
174 Override this fixture to change the way 'goose' binary is discovered.
175
176 @ingroup userver_testsuite_fixtures
177 """
178 try:
179 import yatest
180
181 return yatest.common.runtime.binary_path(
182 'contrib/go/patched/goose/cmd/goose/goose',
183 )
184 except ImportError:
185 return 'goose'
186
187
188def _ydb_fetch_table_names(ydb_service_settings, ydb_cli) -> List[str]:
189 try:
190 host = ydb_service_settings.host
191 port = ydb_service_settings.grpc_port
192 output = subprocess.check_output(
193 [
194 str(ydb_cli),
195 '-e',
196 f'grpc://{host}:{port}',
197 '-d',
198 '/local',
199 'scheme',
200 'ls',
201 '-lR',
202 ],
203 encoding='utf-8',
204 )
205 tables = []
206
207 for line in output.split('\n'):
208 if ' table ' not in line:
209 continue
210 if '.sys' in line:
211 continue
212 if YDB_MIGRATION_TABLE in line:
213 continue
214 path = line.split('│')[6].strip()
215 tables.append(path)
216 return tables
217 except subprocess.CalledProcessError as exc:
218 raise Exception(f'Could not fetch table names:\n{exc}')
219
220
221@pytest.fixture(scope='session')
222def ydb_cli() -> pathlib.Path:
223 """
224 Path to YDB CLI executable.
225
226 Override this fixture to change the way YDB CLI is discovered.
227
228 @ingroup userver_testsuite_fixtures
229 """
230 try:
231 import yatest
232
233 return yatest.common.runtime.binary_path('contrib/ydb/apps/ydb/ydb')
234 except ImportError:
235 return 'ydb'
236
237
238@pytest.fixture(scope='session')
239def _ydb_prepare(
240 _ydb_client,
241 _ydb_service_schemas,
242 ydb_service_settings,
243 _ydb_state,
244 ydb_migration_dir,
245 goose_binary_path,
246):
247 if _ydb_service_schemas and ydb_migration_dir.exists():
248 raise Exception(
249 'Both ydb/schema and ydb/migrations exist, which are mutually exclusive',
250 )
251
252 # testsuite legacy
253 for schema_path in _ydb_service_schemas:
254 with open(schema_path) as fp:
255 tables_schemas = yaml.load(fp.read(), Loader=_YamlLoader)
256 for table_schema in tables_schemas:
257 client.drop_table(_ydb_client, table_schema['path'])
258 client.create_table(_ydb_client, table_schema)
259 _ydb_state.tables.append(table_schema['path'])
260
261 # goose
262 _ydb_migrate(ydb_service_settings, ydb_migration_dir, goose_binary_path)
263
264 _ydb_state.init = True
265
266
267@pytest.fixture(scope='session')
268def _ydb_tables(_ydb_state, _ydb_prepare, ydb_service_settings, ydb_cli):
269 tables = {
270 *_ydb_state.tables,
271 *_ydb_fetch_table_names(ydb_service_settings, ydb_cli),
272 }
273 return tuple(sorted(tables))
274
275
276@pytest.fixture
277def _ydb_init(
278 request,
279 _ydb_client,
280 _ydb_state,
281 ydb_service_settings,
282 _ydb_prepare,
283 _ydb_tables,
284 _ydb_client_pool,
285 load,
286):
287 def ydb_mark_queries(files=(), queries=()):
288 result_queries = []
289 for path in files:
290 result_queries.append(load(path))
291 result_queries.extend(queries)
292 return result_queries
293
294 def drop_table(table):
295 with _ydb_client_pool() as ydb_client:
296 ydb_client.execute('DELETE FROM `{}`'.format(table))
297
298 if _ydb_tables:
299 with concurrent.futures.ThreadPoolExecutor(
300 max_workers=len(_ydb_tables),
301 ) as executer:
302 executer.map(drop_table, _ydb_tables)
303
304 for mark in request.node.iter_markers('ydb'):
305 queries = ydb_mark_queries(**mark.kwargs)
306 for query in queries:
307 _ydb_client.execute(query)
308
309
310@pytest.fixture
311def userver_ydb_trx(testpoint) -> sql.RegisteredTrx:
312 """
313 The fixture maintains transaction fault injection state using
314 RegisteredTrx class.
315
316 @see pytest_userver.sql.RegisteredTrx
317
318 @snippet integration_tests/tests/test_trx_failure.py fault injection
319
320 @ingroup userver_testsuite_fixtures
321 """
322
323 registered = sql.RegisteredTrx()
324
325 @testpoint('ydb_trx_commit')
326 def _pg_trx_tp(data):
327 should_fail = registered.is_failure_enabled(data['trx_name'])
328 return {'trx_should_fail': should_fail}
329
330 return registered
331
332
333@pytest.fixture(scope='session')
334def userver_config_ydb(ydb_service_settings):
335 """
336 Returns a function that adjusts the static configuration file for testsuite.
337
338 For all `ydb.databases`, sets `endpoint` and `database` to the local test
339 YDB instance.
340
341 @ingroup userver_testsuite_fixtures
342 """
343
344 endpoint = f'{ydb_service_settings.host}:{ydb_service_settings.grpc_port}'
345 database = ('' if ydb_service_settings.database.startswith('/') else '/') + ydb_service_settings.database
346
347 def patch_config(config, config_vars):
348 ydb_component = config['components_manager']['components']['ydb']
349 if isinstance(ydb_component, str):
350 ydb_component = config_vars[ydb_component[1:]]
351 databases = ydb_component['databases']
352 for dbname, dbconfig in databases.items():
353 dbconfig['endpoint'] = endpoint
354 dbconfig['database'] = database
355
356 return patch_config