2import concurrent.futures
8from typing
import Optional
13from testsuite.environment
import shell
15from pytest_userver
import sql
20if hasattr(yaml,
'CLoader'):
21 _YamlLoader = yaml.CLoader
23 _YamlLoader = yaml.Loader
25USERVER_CONFIG_HOOKS = [
'userver_config_ydb']
29def ydb(_ydb_client, _ydb_init):
33@pytest.fixture(scope='session')
34def _ydb_client(_ydb_client_pool):
35 with _ydb_client_pool()
as ydb_client:
39@pytest.fixture(scope='session')
40def _ydb_client_pool(_ydb_service, ydb_service_settings):
41 endpoint =
'{}:{}'.format(
42 ydb_service_settings.host, ydb_service_settings.grpc_port,
46 @contextlib.contextmanager
49 ydb_client = pool.pop()
51 ydb_client = client.YdbClient(
52 endpoint, ydb_service_settings.database,
57 pool.append(ydb_client)
62def pytest_service_register(register_service):
63 register_service(
'ydb', service.create_ydb_service)
66@pytest.fixture(scope='session')
67def _ydb_service(pytestconfig, ensure_service_started, ydb_service_settings):
68 if os.environ.get(
'YDB_ENDPOINT')
or pytestconfig.option.ydb_host:
70 ensure_service_started(
'ydb', settings=ydb_service_settings)
73@pytest.fixture(scope='session')
74def ydb_service_settings(pytestconfig) -> service.ServiceSettings:
75 endpoint_from_env = os.environ.get(
'YDB_ENDPOINT')
76 database = os.environ.get(
'YDB_DATABASE',
'local')
79 host, grpc_port = endpoint_from_env.split(
':', 1)
80 return service.ServiceSettings(
88 if pytestconfig.option.ydb_host:
89 return service.ServiceSettings(
90 host=pytestconfig.option.ydb_host,
91 grpc_port=pytestconfig.option.ydb_grpc_port,
92 mon_port=pytestconfig.option.ydb_mon_port,
93 ic_port=pytestconfig.option.ydb_ic_port,
96 return service.get_service_settings()
99@pytest.fixture(scope='session')
100def _ydb_service_schemas(service_source_dir):
101 service_schemas_ydb = service_source_dir /
'ydb' /
'schemas'
102 return discover.find_schemas([service_schemas_ydb])
105@pytest.fixture(scope='session')
106def ydb_settings_substitute(ydb_service_settings):
107 def secdist_settings(*args, **kwargs):
109 'endpoint':
'{}:{}'.format(
110 ydb_service_settings.host, ydb_service_settings.grpc_port,
112 'database':
'/{}'.format(ydb_service_settings.database),
116 return {
'ydb_settings': secdist_settings}
119@pytest.fixture(scope='session')
129@pytest.fixture(scope='session')
130def ydb_migrate_dir(service_source_dir) -> pathlib.Path:
131 return service_source_dir /
'ydb' /
'migrations'
134def _ydb_migrate(ydb_service_settings, ydb_migrate_dir):
135 if not ydb_migrate_dir.exists():
137 if not list(ydb_migrate_dir.iterdir()):
143 host = ydb_service_settings.host
144 port = ydb_service_settings.grpc_port
149 str(ydb_migrate_dir),
152 f
'grpc://{host}:{port}/local?go_query_mode=scripting&'
153 'go_fake_tx=scripting&go_query_bind=declare,numeric'
158 shell.execute(command, verbose=
True, command_alias=
'ydb/migrations')
159 except shell.SubprocessFailed
as exc:
160 raise Exception(f
'YDB run migration failed:\n\n{exc}')
163def _get_goose() -> Optional[pathlib.Path]:
167 return yatest.common.runtime.binary_path(
168 'contrib/go/patched/goose/cmd/goose/goose',
174def _ydb_fetch_table_names(ydb_service_settings) -> List[str]:
178 host = ydb_service_settings.host
179 port = ydb_service_settings.grpc_port
180 output = subprocess.check_output(
182 yatest.common.runtime.binary_path(
'contrib/ydb/apps/ydb/ydb'),
184 f
'grpc://{host}:{port}',
195 for line
in output.split(
'\n'):
196 if ' table ' not in line:
200 path = line.split(
'│')[6].strip()
207@pytest.fixture(scope='session')
210 _ydb_service_schemas,
211 ydb_service_settings,
215 if _ydb_service_schemas
and ydb_migrate_dir.exists():
217 'Both ydb/schema and ydb/migrations exist, '
218 'which are mutually exclusive',
222 for schema_path
in _ydb_service_schemas:
223 with open(schema_path)
as fp:
224 tables_schemas = yaml.load(fp.read(), Loader=_YamlLoader)
225 for table_schema
in tables_schemas:
226 client.drop_table(_ydb_client, table_schema[
'path'])
227 client.create_table(_ydb_client, table_schema)
228 _ydb_state.tables.append(table_schema[
'path'])
231 _ydb_migrate(ydb_service_settings, ydb_migrate_dir)
233 _ydb_state.init =
True
236@pytest.fixture(scope='session')
237def _ydb_tables(_ydb_state, _ydb_prepare, ydb_service_settings):
240 *_ydb_fetch_table_names(ydb_service_settings),
242 return tuple(sorted(tables))
250 ydb_service_settings,
256 def ydb_mark_queries(files=(), queries=()):
259 result_queries.append(load(path))
260 result_queries.extend(queries)
261 return result_queries
263 def drop_table(table):
264 with _ydb_client_pool()
as ydb_client:
265 ydb_client.execute(
'DELETE FROM `{}`'.format(table))
268 with concurrent.futures.ThreadPoolExecutor(
269 max_workers=len(_ydb_tables),
271 executer.map(drop_table, _ydb_tables)
273 for mark
in request.node.iter_markers(
'ydb'):
274 queries = ydb_mark_queries(**mark.kwargs)
275 for query
in queries:
276 _ydb_client.execute(query)
280def userver_ydb_trx(testpoint) -> sql.RegisteredTrx:
282 The fixture maintains transaction fault injection state using
285 @see pytest_userver.sql.RegisteredTrx
287 @snippet integration_tests/tests/test_trx_failure.py fault injection
289 @ingroup userver_testsuite_fixtures
292 registered = sql.RegisteredTrx()
294 @testpoint('ydb_trx_commit')
295 def _pg_trx_tp(data):
296 should_fail = registered.is_failure_enabled(data[
'trx_name'])
297 return {
'trx_should_fail': should_fail}
302@pytest.fixture(scope='session')
303def userver_config_ydb(ydb_service_settings):
305 Returns a function that adjusts the static configuration file for testsuite.
307 For all `ydb.databases`, sets `endpoint` and `database` to the local test
310 @ingroup userver_testsuite_fixtures
313 endpoint = f
'{ydb_service_settings.host}:{ydb_service_settings.grpc_port}'
315 '' if ydb_service_settings.database.startswith(
'/')
else '/'
316 ) + ydb_service_settings.database
318 def patch_config(config, config_vars):
319 ydb_component = config[
'components_manager'][
'components'][
'ydb']
320 if isinstance(ydb_component, str):
321 ydb_component = config_vars[ydb_component[1:]]
322 databases = ydb_component[
'databases']
323 for dbname, dbconfig
in databases.items():
324 dbconfig[
'endpoint'] = endpoint
325 dbconfig[
'database'] = database