userver: /data/code/userver/testsuite/pytest_plugins/pytest_userver/plugins/ydb/ydbsupport.py Source File
⚠️ This is the documentation for an old userver version. Click here to switch to the latest version.
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages Concepts
ydbsupport.py
1# pylint: disable=redefined-outer-name
2import concurrent.futures
3import contextlib
4import os
5import pathlib
6import subprocess
7from typing import List
8from typing import Optional
9
10import pytest
11import yaml
12
13from testsuite.environment import shell
14
15from pytest_userver import sql
16from . import client
17from . import discover
18from . import service
19
20if hasattr(yaml, 'CLoader'):
21 _YamlLoader = yaml.CLoader # type: ignore
22else:
23 _YamlLoader = yaml.Loader # type: ignore
24
25USERVER_CONFIG_HOOKS = ['userver_config_ydb']
26
27
28@pytest.fixture
29def ydb(_ydb_client, _ydb_init):
30 return _ydb_client
31
32
33@pytest.fixture(scope='session')
34def _ydb_client(_ydb_client_pool):
35 with _ydb_client_pool() as ydb_client:
36 yield ydb_client
37
38
39@pytest.fixture(scope='session')
40def _ydb_client_pool(_ydb_service, ydb_service_settings):
41 endpoint = '{}:{}'.format(
42 ydb_service_settings.host, ydb_service_settings.grpc_port,
43 )
44 pool = []
45
46 @contextlib.contextmanager
47 def get_client():
48 try:
49 ydb_client = pool.pop()
50 except IndexError:
51 ydb_client = client.YdbClient(
52 endpoint, ydb_service_settings.database,
53 )
54 try:
55 yield ydb_client
56 finally:
57 pool.append(ydb_client)
58
59 return get_client
60
61
62def pytest_service_register(register_service):
63 register_service('ydb', service.create_ydb_service)
64
65
66@pytest.fixture(scope='session')
67def _ydb_service(pytestconfig, ensure_service_started, ydb_service_settings):
68 if os.environ.get('YDB_ENDPOINT') or pytestconfig.option.ydb_host:
69 return
70 ensure_service_started('ydb', settings=ydb_service_settings)
71
72
73@pytest.fixture(scope='session')
74def ydb_service_settings(pytestconfig) -> service.ServiceSettings:
75 endpoint_from_env = os.environ.get('YDB_ENDPOINT')
76 database = os.environ.get('YDB_DATABASE', 'local')
77
78 if endpoint_from_env:
79 host, grpc_port = endpoint_from_env.split(':', 1)
80 return service.ServiceSettings(
81 host=host,
82 grpc_port=grpc_port,
83 mon_port=None,
84 ic_port=None,
85 database=database,
86 )
87
88 if pytestconfig.option.ydb_host:
89 return service.ServiceSettings(
90 host=pytestconfig.option.ydb_host,
91 grpc_port=pytestconfig.option.ydb_grpc_port,
92 mon_port=pytestconfig.option.ydb_mon_port,
93 ic_port=pytestconfig.option.ydb_ic_port,
94 database=database,
95 )
96 return service.get_service_settings()
97
98
99@pytest.fixture(scope='session')
100def _ydb_service_schemas(service_source_dir):
101 service_schemas_ydb = service_source_dir / 'ydb' / 'schemas'
102 return discover.find_schemas([service_schemas_ydb])
103
104
105@pytest.fixture(scope='session')
106def ydb_settings_substitute(ydb_service_settings):
107 def secdist_settings(*args, **kwargs):
108 return {
109 'endpoint': '{}:{}'.format(
110 ydb_service_settings.host, ydb_service_settings.grpc_port,
111 ),
112 'database': '/{}'.format(ydb_service_settings.database),
113 'token': '',
114 }
115
116 return {'ydb_settings': secdist_settings}
117
118
119@pytest.fixture(scope='session')
120def _ydb_state():
121 class State:
122 def __init__(self):
123 self.init = False
124 self.tables = []
125
126 return State()
127
128
129@pytest.fixture(scope='session')
130def ydb_migrate_dir(service_source_dir) -> pathlib.Path:
131 return service_source_dir / 'ydb' / 'migrations'
132
133
134def _ydb_migrate(ydb_service_settings, ydb_migrate_dir):
135 if not ydb_migrate_dir.exists():
136 return
137 if not list(ydb_migrate_dir.iterdir()):
138 return
139
140 if not _get_goose():
141 return
142
143 host = ydb_service_settings.host
144 port = ydb_service_settings.grpc_port
145
146 command = [
147 str(_get_goose()),
148 '-dir',
149 str(ydb_migrate_dir),
150 'ydb',
151 (
152 f'grpc://{host}:{port}/local?go_query_mode=scripting&'
153 'go_fake_tx=scripting&go_query_bind=declare,numeric'
154 ),
155 'up',
156 ]
157 try:
158 shell.execute(command, verbose=True, command_alias='ydb/migrations')
159 except shell.SubprocessFailed as exc:
160 raise Exception(f'YDB run migration failed:\n\n{exc}')
161
162
163def _get_goose() -> Optional[pathlib.Path]:
164 try:
165 import yatest
166
167 return yatest.common.runtime.binary_path(
168 'contrib/go/patched/goose/cmd/goose/goose',
169 )
170 except ImportError:
171 return None
172
173
174def _ydb_fetch_table_names(ydb_service_settings) -> List[str]:
175 try:
176 import yatest
177
178 host = ydb_service_settings.host
179 port = ydb_service_settings.grpc_port
180 output = subprocess.check_output(
181 [
182 yatest.common.runtime.binary_path('contrib/ydb/apps/ydb/ydb'),
183 '-e',
184 f'grpc://{host}:{port}',
185 '-d',
186 '/local',
187 'scheme',
188 'ls',
189 '-lR',
190 ],
191 encoding='utf-8',
192 )
193 tables = []
194
195 for line in output.split('\n'):
196 if ' table ' not in line:
197 continue
198 if '.sys' in line:
199 continue
200 path = line.split('│')[6].strip()
201 tables.append(path)
202 return tables
203 except ImportError:
204 return []
205
206
207@pytest.fixture(scope='session')
208def _ydb_prepare(
209 _ydb_client,
210 _ydb_service_schemas,
211 ydb_service_settings,
212 _ydb_state,
213 ydb_migrate_dir,
214):
215 if _ydb_service_schemas and ydb_migrate_dir.exists():
216 raise Exception(
217 'Both ydb/schema and ydb/migrations exist, '
218 'which are mutually exclusive',
219 )
220
221 # testsuite legacy
222 for schema_path in _ydb_service_schemas:
223 with open(schema_path) as fp:
224 tables_schemas = yaml.load(fp.read(), Loader=_YamlLoader)
225 for table_schema in tables_schemas:
226 client.drop_table(_ydb_client, table_schema['path'])
227 client.create_table(_ydb_client, table_schema)
228 _ydb_state.tables.append(table_schema['path'])
229
230 # goose
231 _ydb_migrate(ydb_service_settings, ydb_migrate_dir)
232
233 _ydb_state.init = True
234
235
236@pytest.fixture(scope='session')
237def _ydb_tables(_ydb_state, _ydb_prepare, ydb_service_settings):
238 tables = {
239 *_ydb_state.tables,
240 *_ydb_fetch_table_names(ydb_service_settings),
241 }
242 return tuple(sorted(tables))
243
244
245@pytest.fixture
246def _ydb_init(
247 request,
248 _ydb_client,
249 _ydb_state,
250 ydb_service_settings,
251 _ydb_prepare,
252 _ydb_tables,
253 _ydb_client_pool,
254 load,
255):
256 def ydb_mark_queries(files=(), queries=()):
257 result_queries = []
258 for path in files:
259 result_queries.append(load(path))
260 result_queries.extend(queries)
261 return result_queries
262
263 def drop_table(table):
264 with _ydb_client_pool() as ydb_client:
265 ydb_client.execute('DELETE FROM `{}`'.format(table))
266
267 if _ydb_tables:
268 with concurrent.futures.ThreadPoolExecutor(
269 max_workers=len(_ydb_tables),
270 ) as executer:
271 executer.map(drop_table, _ydb_tables)
272
273 for mark in request.node.iter_markers('ydb'):
274 queries = ydb_mark_queries(**mark.kwargs)
275 for query in queries:
276 _ydb_client.execute(query)
277
278
279@pytest.fixture
280def userver_ydb_trx(testpoint) -> sql.RegisteredTrx:
281 """
282 The fixture maintains transaction fault injection state using
283 RegisteredTrx class.
284
285 @see pytest_userver.sql.RegisteredTrx
286
287 @snippet integration_tests/tests/test_trx_failure.py fault injection
288
289 @ingroup userver_testsuite_fixtures
290 """
291
292 registered = sql.RegisteredTrx()
293
294 @testpoint('ydb_trx_commit')
295 def _pg_trx_tp(data):
296 should_fail = registered.is_failure_enabled(data['trx_name'])
297 return {'trx_should_fail': should_fail}
298
299 return registered
300
301
302@pytest.fixture(scope='session')
303def userver_config_ydb(ydb_service_settings):
304 """
305 Returns a function that adjusts the static configuration file for testsuite.
306
307 For all `ydb.databases`, sets `endpoint` and `database` to the local test
308 YDB instance.
309
310 @ingroup userver_testsuite_fixtures
311 """
312
313 endpoint = f'{ydb_service_settings.host}:{ydb_service_settings.grpc_port}'
314 database = (
315 '' if ydb_service_settings.database.startswith('/') else '/'
316 ) + ydb_service_settings.database
317
318 def patch_config(config, config_vars):
319 ydb_component = config['components_manager']['components']['ydb']
320 if isinstance(ydb_component, str):
321 ydb_component = config_vars[ydb_component[1:]]
322 databases = ydb_component['databases']
323 for dbname, dbconfig in databases.items():
324 dbconfig['endpoint'] = endpoint
325 dbconfig['database'] = database
326
327 return patch_config