userver: /home/antonyzhilin/arcadia/taxi/uservices/userver/testsuite/pytest_plugins/pytest_userver/plugins/ydb/ydbsupport.py Source File
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages Concepts
ydbsupport.py
1# pylint: disable=redefined-outer-name
2import concurrent.futures
3import contextlib
4import os
5import pathlib
6import subprocess
7from typing import List
8
9import pytest
10import yaml
11
12from testsuite.environment import shell
13
14from pytest_userver import sql
15from . import client
16from . import discover
17from . import service
18
19if hasattr(yaml, 'CLoader'):
20 _YamlLoader = yaml.CLoader # type: ignore
21else:
22 _YamlLoader = yaml.Loader # type: ignore
23
24USERVER_CONFIG_HOOKS = ['userver_config_ydb']
25
26
27@pytest.fixture
28def ydb(_ydb_client, _ydb_init) -> client.YdbClient:
29 """
30 YDB client fixture
31
32 @ingroup userver_testsuite_fixtures
33 """
34 return _ydb_client
35
36
37@pytest.fixture(scope='session')
38def _ydb_client(_ydb_client_pool):
39 with _ydb_client_pool() as ydb_client:
40 yield ydb_client
41
42
43@pytest.fixture(scope='session')
44def _ydb_client_pool(_ydb_service, ydb_service_settings):
45 endpoint = '{}:{}'.format(
46 ydb_service_settings.host,
47 ydb_service_settings.grpc_port,
48 )
49 pool = []
50
51 @contextlib.contextmanager
52 def get_client():
53 try:
54 ydb_client = pool.pop()
55 except IndexError:
56 ydb_client = client.YdbClient(
57 endpoint,
58 ydb_service_settings.database,
59 )
60 try:
61 yield ydb_client
62 finally:
63 pool.append(ydb_client)
64
65 return get_client
66
67
68def pytest_service_register(register_service):
69 register_service('ydb', service.create_ydb_service)
70
71
72@pytest.fixture(scope='session')
73def _ydb_service(pytestconfig, ensure_service_started, ydb_service_settings):
74 if os.environ.get('YDB_ENDPOINT') or pytestconfig.option.ydb_host:
75 return
76 ensure_service_started('ydb', settings=ydb_service_settings)
77
78
79@pytest.fixture(scope='session')
80def ydb_service_settings(pytestconfig) -> service.ServiceSettings:
81 endpoint_from_env = os.environ.get('YDB_ENDPOINT')
82 database = os.environ.get('YDB_DATABASE', 'local')
83
84 if endpoint_from_env:
85 host, grpc_port = endpoint_from_env.split(':', 1)
86 return service.ServiceSettings(
87 host=host,
88 grpc_port=grpc_port,
89 mon_port=None,
90 ic_port=None,
91 database=database,
92 )
93
94 if pytestconfig.option.ydb_host:
95 return service.ServiceSettings(
96 host=pytestconfig.option.ydb_host,
97 grpc_port=pytestconfig.option.ydb_grpc_port,
98 mon_port=pytestconfig.option.ydb_mon_port,
99 ic_port=pytestconfig.option.ydb_ic_port,
100 database=database,
101 )
102 return service.get_service_settings()
103
104
105@pytest.fixture(scope='session')
106def _ydb_service_schemas(service_source_dir):
107 service_schemas_ydb = service_source_dir / 'ydb' / 'schemas'
108 return discover.find_schemas([service_schemas_ydb])
109
110
111@pytest.fixture(scope='session')
112def ydb_settings_substitute(ydb_service_settings):
113 def secdist_settings(*args, **kwargs):
114 return {
115 'endpoint': '{}:{}'.format(
116 ydb_service_settings.host,
117 ydb_service_settings.grpc_port,
118 ),
119 'database': '/{}'.format(ydb_service_settings.database),
120 'token': '',
121 }
122
123 return {'ydb_settings': secdist_settings}
124
125
126@pytest.fixture(scope='session')
127def _ydb_state():
128 class State:
129 def __init__(self):
130 self.init = False
131 self.tables = []
132
133 return State()
134
135
136@pytest.fixture(scope='session')
137def ydb_migration_dir(service_source_dir) -> pathlib.Path:
138 """
139 Directory with migration files
140
141 @ingroup userver_testsuite_fixtures
142 """
143 return service_source_dir / 'ydb' / 'migrations'
144
145
146YDB_MIGRATION_TABLE = 'goose_db_version'
147
148
149def _ydb_migrate(ydb_service_settings, ydb_migration_dir, goose_binary_path):
150 if not ydb_migration_dir.exists():
151 return
152 if not list(ydb_migration_dir.iterdir()):
153 return
154
155 host = ydb_service_settings.host
156 port = ydb_service_settings.grpc_port
157
158 command = [
159 str(goose_binary_path),
160 '-dir',
161 str(ydb_migration_dir),
162 '-table',
163 YDB_MIGRATION_TABLE,
164 'ydb',
165 (f'grpc://{host}:{port}/local?go_query_mode=scripting&go_fake_tx=scripting&go_query_bind=declare,numeric'),
166 'up',
167 ]
168 try:
169 shell.execute(command, verbose=True, command_alias='ydb/migrations')
170 except shell.SubprocessFailed as exc:
171 raise Exception(f'YDB run migration failed:\n{exc}')
172
173
174@pytest.fixture(scope='session')
175def goose_binary_path() -> pathlib.Path:
176 """
177 Path to 'goose' migration tool.
178
179 Override this fixture to change the way 'goose' binary is discovered.
180
181 @ingroup userver_testsuite_fixtures
182 """
183 try:
184 import yatest
185
186 return yatest.common.runtime.binary_path(
187 'contrib/go/patched/goose/cmd/goose/goose',
188 )
189 except ImportError:
190 return 'goose'
191
192
193def _ydb_fetch_table_names(ydb_service_settings, ydb_cli) -> List[str]:
194 try:
195 host = ydb_service_settings.host
196 port = ydb_service_settings.grpc_port
197 output = subprocess.check_output(
198 [
199 str(ydb_cli),
200 '-e',
201 f'grpc://{host}:{port}',
202 '-d',
203 '/local',
204 'scheme',
205 'ls',
206 '-lR',
207 ],
208 encoding='utf-8',
209 )
210 tables = []
211
212 for line in output.split('\n'):
213 if ' table ' not in line:
214 continue
215 if '.sys' in line:
216 continue
217 if YDB_MIGRATION_TABLE in line:
218 continue
219 path = line.split('│')[6].strip()
220 tables.append(path)
221 return tables
222 except subprocess.CalledProcessError as exc:
223 raise Exception(f'Could not fetch table names:\n{exc}')
224
225
226@pytest.fixture(scope='session')
227def ydb_cli() -> pathlib.Path:
228 """
229 Path to YDB CLI executable.
230
231 Override this fixture to change the way YDB CLI is discovered.
232
233 @ingroup userver_testsuite_fixtures
234 """
235 try:
236 import yatest
237
238 return yatest.common.runtime.binary_path('contrib/ydb/apps/ydb/ydb')
239 except ImportError:
240 return 'ydb'
241
242
243@pytest.fixture(scope='session')
244def _ydb_prepare(
245 _ydb_client,
246 _ydb_service_schemas,
247 ydb_service_settings,
248 _ydb_state,
249 ydb_migration_dir,
250 goose_binary_path,
251):
252 if _ydb_service_schemas and ydb_migration_dir.exists():
253 raise Exception(
254 'Both ydb/schema and ydb/migrations exist, which are mutually exclusive',
255 )
256
257 # testsuite legacy
258 for schema_path in _ydb_service_schemas:
259 with open(schema_path) as fp:
260 tables_schemas = yaml.load(fp.read(), Loader=_YamlLoader)
261 for table_schema in tables_schemas:
262 client.drop_table(_ydb_client, table_schema['path'])
263 client.create_table(_ydb_client, table_schema)
264 _ydb_state.tables.append(table_schema['path'])
265
266 # goose
267 _ydb_migrate(ydb_service_settings, ydb_migration_dir, goose_binary_path)
268
269 _ydb_state.init = True
270
271
272@pytest.fixture(scope='session')
273def _ydb_tables(_ydb_state, _ydb_prepare, ydb_service_settings, ydb_cli):
274 tables = {
275 *_ydb_state.tables,
276 *_ydb_fetch_table_names(ydb_service_settings, ydb_cli),
277 }
278 return tuple(sorted(tables))
279
280
281@pytest.fixture
282def _ydb_init(
283 request,
284 _ydb_client,
285 _ydb_state,
286 ydb_service_settings,
287 _ydb_prepare,
288 _ydb_tables,
289 _ydb_client_pool,
290 load,
291):
292 def ydb_mark_queries(files=(), queries=()):
293 result_queries = []
294 for path in files:
295 result_queries.append(load(path))
296 result_queries.extend(queries)
297 return result_queries
298
299 def drop_table(table):
300 with _ydb_client_pool() as ydb_client:
301 ydb_client.execute('DELETE FROM `{}`'.format(table))
302
303 if _ydb_tables:
304 with concurrent.futures.ThreadPoolExecutor(
305 max_workers=len(_ydb_tables),
306 ) as executer:
307 executer.map(drop_table, _ydb_tables)
308
309 for mark in request.node.iter_markers('ydb'):
310 queries = ydb_mark_queries(**mark.kwargs)
311 for query in queries:
312 _ydb_client.execute(query)
313
314
315@pytest.fixture
316def userver_ydb_trx(testpoint) -> sql.RegisteredTrx:
317 """
318 The fixture maintains transaction fault injection state using
319 RegisteredTrx class.
320
321 @see pytest_userver.sql.RegisteredTrx
322
323 @snippet integration_tests/tests/test_trx_failure.py fault injection
324
325 @ingroup userver_testsuite_fixtures
326 """
327
328 registered = sql.RegisteredTrx()
329
330 @testpoint('ydb_trx_commit')
331 def _pg_trx_tp(data):
332 should_fail = registered.is_failure_enabled(data['trx_name'])
333 return {'trx_should_fail': should_fail}
334
335 return registered
336
337
338@pytest.fixture(scope='session')
339def userver_config_ydb(ydb_service_settings):
340 """
341 Returns a function that adjusts the static configuration file for testsuite.
342
343 For all `ydb.databases`, sets `endpoint` and `database` to the local test
344 YDB instance.
345
346 @ingroup userver_testsuite_fixtures
347 """
348
349 endpoint = f'{ydb_service_settings.host}:{ydb_service_settings.grpc_port}'
350 database = ('' if ydb_service_settings.database.startswith('/') else '/') + ydb_service_settings.database
351
352 def patch_config(config, config_vars):
353 ydb_component = config['components_manager']['components']['ydb']
354 if isinstance(ydb_component, str):
355 ydb_component = config_vars[ydb_component[1:]]
356 databases = ydb_component['databases']
357 for dbname, dbconfig in databases.items():
358 dbconfig['endpoint'] = endpoint
359 dbconfig['database'] = database
360
361 return patch_config