2import concurrent.futures
11from testsuite.environment
import shell
13from pytest_userver
import sql
18if hasattr(yaml,
'CLoader'):
19 _YamlLoader = yaml.CLoader
21 _YamlLoader = yaml.Loader
23USERVER_CONFIG_HOOKS = [
'userver_config_ydb']
27def ydb(_ydb_client, _ydb_init) -> client.YdbClient:
31 @ingroup userver_testsuite_fixtures
36@pytest.fixture(scope='session')
37def _ydb_client(_ydb_client_pool):
38 with _ydb_client_pool()
as ydb_client:
42@pytest.fixture(scope='session')
43def _ydb_client_pool(_ydb_service, ydb_service_settings):
44 endpoint =
'{}:{}'.format(
45 ydb_service_settings.host,
46 ydb_service_settings.grpc_port,
50 @contextlib.contextmanager
53 ydb_client = pool.pop()
55 ydb_client = client.YdbClient(
57 ydb_service_settings.database,
62 pool.append(ydb_client)
67def pytest_service_register(register_service):
68 register_service(
'ydb', service.create_ydb_service)
71@pytest.fixture(scope='session')
72def _ydb_service(pytestconfig, ensure_service_started, ydb_service_settings):
73 if os.environ.get(
'YDB_ENDPOINT')
or pytestconfig.option.ydb_host:
75 ensure_service_started(
'ydb', settings=ydb_service_settings)
78@pytest.fixture(scope='session')
79def ydb_service_settings(pytestconfig) -> service.ServiceSettings:
80 endpoint_from_env = os.environ.get(
'YDB_ENDPOINT')
81 database = os.environ.get(
'YDB_DATABASE',
'local')
84 host, grpc_port = endpoint_from_env.split(
':', 1)
85 return service.ServiceSettings(
93 if pytestconfig.option.ydb_host:
94 return service.ServiceSettings(
95 host=pytestconfig.option.ydb_host,
96 grpc_port=pytestconfig.option.ydb_grpc_port,
97 mon_port=pytestconfig.option.ydb_mon_port,
98 ic_port=pytestconfig.option.ydb_ic_port,
101 return service.get_service_settings()
104@pytest.fixture(scope='session')
105def _ydb_service_schemas(service_source_dir):
106 service_schemas_ydb = service_source_dir /
'ydb' /
'schemas'
107 return discover.find_schemas([service_schemas_ydb])
110@pytest.fixture(scope='session')
111def ydb_settings_substitute(ydb_service_settings):
112 def secdist_settings(*args, **kwargs):
114 'endpoint':
'{}:{}'.format(
115 ydb_service_settings.host,
116 ydb_service_settings.grpc_port,
118 'database':
'/{}'.format(ydb_service_settings.database),
122 return {
'ydb_settings': secdist_settings}
125@pytest.fixture(scope='session')
135@pytest.fixture(scope='session')
136def ydb_migration_dir(service_source_dir) -> pathlib.Path:
138 Directory with migration files
140 @ingroup userver_testsuite_fixtures
142 return service_source_dir /
'ydb' /
'migrations'
145YDB_MIGRATION_TABLE =
'goose_db_version'
148def _ydb_migrate(ydb_service_settings, ydb_migration_dir, goose_binary_path):
149 if not ydb_migration_dir.exists():
151 if not list(ydb_migration_dir.iterdir()):
154 host = ydb_service_settings.host
155 port = ydb_service_settings.grpc_port
158 str(goose_binary_path),
160 str(ydb_migration_dir),
164 (f
'grpc://{host}:{port}/local?go_query_mode=scripting&go_fake_tx=scripting&go_query_bind=declare,numeric'),
168 shell.execute(command, verbose=
True, command_alias=
'ydb/migrations')
169 except shell.SubprocessFailed
as exc:
170 raise Exception(f
'YDB run migration failed:\n{exc}')
173@pytest.fixture(scope='session')
174def goose_binary_path() -> pathlib.Path:
176 Path to 'goose' migration tool.
178 Override this fixture to change the way 'goose' binary is discovered.
180 @ingroup userver_testsuite_fixtures
185 return yatest.common.runtime.binary_path(
186 'contrib/go/patched/goose/cmd/goose/goose',
192def _ydb_fetch_table_names(ydb_service_settings, ydb_cli) -> list[str]:
194 host = ydb_service_settings.host
195 port = ydb_service_settings.grpc_port
196 output = subprocess.check_output(
200 f
'grpc://{host}:{port}',
211 for line
in output.split(
'\n'):
212 if ' table ' not in line:
216 if YDB_MIGRATION_TABLE
in line:
218 path = line.split(
'│')[6].strip()
221 except subprocess.CalledProcessError
as exc:
222 raise Exception(f
'Could not fetch table names:\n{exc}')
225@pytest.fixture(scope='session')
226def ydb_cli() -> pathlib.Path:
228 Path to YDB CLI executable.
230 Override this fixture to change the way YDB CLI is discovered.
232 @ingroup userver_testsuite_fixtures
237 return yatest.common.runtime.binary_path(
'contrib/ydb/apps/ydb/ydb')
242@pytest.fixture(scope='session')
245 _ydb_service_schemas,
246 ydb_service_settings,
251 if _ydb_service_schemas
and ydb_migration_dir.exists():
253 'Both ydb/schema and ydb/migrations exist, which are mutually exclusive',
257 for schema_path
in _ydb_service_schemas:
258 with open(schema_path)
as fp:
259 tables_schemas = yaml.load(fp.read(), Loader=_YamlLoader)
260 for table_schema
in tables_schemas:
261 client.drop_table(_ydb_client, table_schema[
'path'])
262 client.create_table(_ydb_client, table_schema)
263 _ydb_state.tables.append(table_schema[
'path'])
266 _ydb_migrate(ydb_service_settings, ydb_migration_dir, goose_binary_path)
268 _ydb_state.init =
True
271@pytest.fixture(scope='session')
272def _ydb_tables(_ydb_state, _ydb_prepare, ydb_service_settings, ydb_cli):
275 *_ydb_fetch_table_names(ydb_service_settings, ydb_cli),
277 return tuple(sorted(tables))
285 ydb_service_settings,
291 def ydb_mark_queries(files=(), queries=()):
294 result_queries.append(load(path))
295 result_queries.extend(queries)
296 return result_queries
298 def drop_table(table):
299 with _ydb_client_pool()
as ydb_client:
300 ydb_client.execute(
'DELETE FROM `{}`'.format(table))
303 with concurrent.futures.ThreadPoolExecutor(
304 max_workers=len(_ydb_tables),
306 executer.map(drop_table, _ydb_tables)
308 for mark
in request.node.iter_markers(
'ydb'):
309 queries = ydb_mark_queries(**mark.kwargs)
310 for query
in queries:
311 _ydb_client.execute(query)
315def userver_ydb_trx(testpoint) -> sql.RegisteredTrx:
317 The fixture maintains transaction fault injection state using
320 @see pytest_userver.sql.RegisteredTrx
322 @snippet integration_tests/tests/test_trx_failure.py fault injection
324 @ingroup userver_testsuite_fixtures
327 registered = sql.RegisteredTrx()
329 @testpoint('ydb_trx_commit')
330 def _pg_trx_tp(data):
331 should_fail = registered.is_failure_enabled(data[
'trx_name'])
332 return {
'trx_should_fail': should_fail}
337@pytest.fixture(scope='session')
338def userver_config_ydb(ydb_service_settings):
340 Returns a function that adjusts the static configuration file for testsuite.
342 For all `ydb.databases`, sets `endpoint` and `database` to the local test
345 @ingroup userver_testsuite_fixtures
348 endpoint = f
'{ydb_service_settings.host}:{ydb_service_settings.grpc_port}'
349 database = (
'' if ydb_service_settings.database.startswith(
'/')
else '/') + ydb_service_settings.database
351 def patch_config(config, config_vars):
352 ydb_component = config[
'components_manager'][
'components'][
'ydb']
353 if isinstance(ydb_component, str):
354 ydb_component = config_vars[ydb_component[1:]]
355 databases = ydb_component[
'databases']
356 for dbconfig
in databases.values():
357 dbconfig[
'endpoint'] = endpoint
358 dbconfig[
'database'] = database