2import concurrent.futures
12from testsuite.environment
import shell
14from pytest_userver
import sql
19if hasattr(yaml,
'CLoader'):
20 _YamlLoader = yaml.CLoader
22 _YamlLoader = yaml.Loader
24USERVER_CONFIG_HOOKS = [
'userver_config_ydb']
28def ydb(_ydb_client, _ydb_init) -> client.YdbClient:
32 @ingroup userver_testsuite_fixtures
37@pytest.fixture(scope='session')
38def _ydb_client(_ydb_client_pool):
39 with _ydb_client_pool()
as ydb_client:
43@pytest.fixture(scope='session')
44def _ydb_client_pool(_ydb_service, ydb_service_settings):
45 endpoint =
'{}:{}'.format(
46 ydb_service_settings.host,
47 ydb_service_settings.grpc_port,
51 @contextlib.contextmanager
54 ydb_client = pool.pop()
56 ydb_client = client.YdbClient(
58 ydb_service_settings.database,
63 pool.append(ydb_client)
68def pytest_service_register(register_service):
69 register_service(
'ydb', service.create_ydb_service)
72@pytest.fixture(scope='session')
73def _ydb_service(pytestconfig, ensure_service_started, ydb_service_settings):
74 if os.environ.get(
'YDB_ENDPOINT')
or pytestconfig.option.ydb_host:
76 ensure_service_started(
'ydb', settings=ydb_service_settings)
79@pytest.fixture(scope='session')
80def ydb_service_settings(pytestconfig) -> service.ServiceSettings:
81 endpoint_from_env = os.environ.get(
'YDB_ENDPOINT')
82 database = os.environ.get(
'YDB_DATABASE',
'local')
85 host, grpc_port = endpoint_from_env.split(
':', 1)
86 return service.ServiceSettings(
94 if pytestconfig.option.ydb_host:
95 return service.ServiceSettings(
96 host=pytestconfig.option.ydb_host,
97 grpc_port=pytestconfig.option.ydb_grpc_port,
98 mon_port=pytestconfig.option.ydb_mon_port,
99 ic_port=pytestconfig.option.ydb_ic_port,
102 return service.get_service_settings()
105@pytest.fixture(scope='session')
106def _ydb_service_schemas(service_source_dir):
107 service_schemas_ydb = service_source_dir /
'ydb' /
'schemas'
108 return discover.find_schemas([service_schemas_ydb])
111@pytest.fixture(scope='session')
112def ydb_settings_substitute(ydb_service_settings):
113 def secdist_settings(*args, **kwargs):
115 'endpoint':
'{}:{}'.format(
116 ydb_service_settings.host,
117 ydb_service_settings.grpc_port,
119 'database':
'/{}'.format(ydb_service_settings.database),
123 return {
'ydb_settings': secdist_settings}
126@pytest.fixture(scope='session')
136@pytest.fixture(scope='session')
137def ydb_migration_dir(service_source_dir) -> pathlib.Path:
139 Directory with migration files
141 @ingroup userver_testsuite_fixtures
143 return service_source_dir /
'ydb' /
'migrations'
146YDB_MIGRATION_TABLE =
'goose_db_version'
149def _ydb_migrate(ydb_service_settings, ydb_migration_dir, goose_binary_path):
150 if not ydb_migration_dir.exists():
152 if not list(ydb_migration_dir.iterdir()):
155 host = ydb_service_settings.host
156 port = ydb_service_settings.grpc_port
159 str(goose_binary_path),
161 str(ydb_migration_dir),
165 (f
'grpc://{host}:{port}/local?go_query_mode=scripting&go_fake_tx=scripting&go_query_bind=declare,numeric'),
169 shell.execute(command, verbose=
True, command_alias=
'ydb/migrations')
170 except shell.SubprocessFailed
as exc:
171 raise Exception(f
'YDB run migration failed:\n{exc}')
174@pytest.fixture(scope='session')
175def goose_binary_path() -> pathlib.Path:
177 Path to 'goose' migration tool.
179 Override this fixture to change the way 'goose' binary is discovered.
181 @ingroup userver_testsuite_fixtures
186 return yatest.common.runtime.binary_path(
187 'contrib/go/patched/goose/cmd/goose/goose',
193def _ydb_fetch_table_names(ydb_service_settings, ydb_cli) -> List[str]:
195 host = ydb_service_settings.host
196 port = ydb_service_settings.grpc_port
197 output = subprocess.check_output(
201 f
'grpc://{host}:{port}',
212 for line
in output.split(
'\n'):
213 if ' table ' not in line:
217 if YDB_MIGRATION_TABLE
in line:
219 path = line.split(
'│')[6].strip()
222 except subprocess.CalledProcessError
as exc:
223 raise Exception(f
'Could not fetch table names:\n{exc}')
226@pytest.fixture(scope='session')
227def ydb_cli() -> pathlib.Path:
229 Path to YDB CLI executable.
231 Override this fixture to change the way YDB CLI is discovered.
233 @ingroup userver_testsuite_fixtures
238 return yatest.common.runtime.binary_path(
'contrib/ydb/apps/ydb/ydb')
243@pytest.fixture(scope='session')
246 _ydb_service_schemas,
247 ydb_service_settings,
252 if _ydb_service_schemas
and ydb_migration_dir.exists():
254 'Both ydb/schema and ydb/migrations exist, which are mutually exclusive',
258 for schema_path
in _ydb_service_schemas:
259 with open(schema_path)
as fp:
260 tables_schemas = yaml.load(fp.read(), Loader=_YamlLoader)
261 for table_schema
in tables_schemas:
262 client.drop_table(_ydb_client, table_schema[
'path'])
263 client.create_table(_ydb_client, table_schema)
264 _ydb_state.tables.append(table_schema[
'path'])
267 _ydb_migrate(ydb_service_settings, ydb_migration_dir, goose_binary_path)
269 _ydb_state.init =
True
272@pytest.fixture(scope='session')
273def _ydb_tables(_ydb_state, _ydb_prepare, ydb_service_settings, ydb_cli):
276 *_ydb_fetch_table_names(ydb_service_settings, ydb_cli),
278 return tuple(sorted(tables))
286 ydb_service_settings,
292 def ydb_mark_queries(files=(), queries=()):
295 result_queries.append(load(path))
296 result_queries.extend(queries)
297 return result_queries
299 def drop_table(table):
300 with _ydb_client_pool()
as ydb_client:
301 ydb_client.execute(
'DELETE FROM `{}`'.format(table))
304 with concurrent.futures.ThreadPoolExecutor(
305 max_workers=len(_ydb_tables),
307 executer.map(drop_table, _ydb_tables)
309 for mark
in request.node.iter_markers(
'ydb'):
310 queries = ydb_mark_queries(**mark.kwargs)
311 for query
in queries:
312 _ydb_client.execute(query)
316def userver_ydb_trx(testpoint) -> sql.RegisteredTrx:
318 The fixture maintains transaction fault injection state using
321 @see pytest_userver.sql.RegisteredTrx
323 @snippet integration_tests/tests/test_trx_failure.py fault injection
325 @ingroup userver_testsuite_fixtures
328 registered = sql.RegisteredTrx()
330 @testpoint('ydb_trx_commit')
331 def _pg_trx_tp(data):
332 should_fail = registered.is_failure_enabled(data[
'trx_name'])
333 return {
'trx_should_fail': should_fail}
338@pytest.fixture(scope='session')
339def userver_config_ydb(ydb_service_settings):
341 Returns a function that adjusts the static configuration file for testsuite.
343 For all `ydb.databases`, sets `endpoint` and `database` to the local test
346 @ingroup userver_testsuite_fixtures
349 endpoint = f
'{ydb_service_settings.host}:{ydb_service_settings.grpc_port}'
350 database = (
'' if ydb_service_settings.database.startswith(
'/')
else '/') + ydb_service_settings.database
352 def patch_config(config, config_vars):
353 ydb_component = config[
'components_manager'][
'components'][
'ydb']
354 if isinstance(ydb_component, str):
355 ydb_component = config_vars[ydb_component[1:]]
356 databases = ydb_component[
'databases']
357 for dbname, dbconfig
in databases.items():
358 dbconfig[
'endpoint'] = endpoint
359 dbconfig[
'database'] = database