2import concurrent.futures
8from typing
import Optional
13from testsuite.environment
import shell
15from pytest_userver
import sql
20if hasattr(yaml,
'CLoader'):
21 _YamlLoader = yaml.CLoader
23 _YamlLoader = yaml.Loader
27def ydb(_ydb_client, _ydb_init):
31@pytest.fixture(scope='session')
32def _ydb_client(_ydb_client_pool):
33 with _ydb_client_pool()
as ydb_client:
37@pytest.fixture(scope='session')
38def _ydb_client_pool(_ydb_service, _ydb_service_settings):
39 endpoint =
'{}:{}'.format(
40 _ydb_service_settings.host, _ydb_service_settings.grpc_port,
44 @contextlib.contextmanager
47 ydb_client = pool.pop()
49 ydb_client = client.YdbClient(
50 endpoint, _ydb_service_settings.database,
55 pool.append(ydb_client)
60def pytest_service_register(register_service):
61 register_service(
'ydb', service.create_ydb_service)
64@pytest.fixture(scope='session')
65def _ydb_service(pytestconfig, ensure_service_started, _ydb_service_settings):
66 if os.environ.get(
'YDB_ENDPOINT')
or pytestconfig.option.ydb_host:
68 ensure_service_started(
'ydb', settings=_ydb_service_settings)
71@pytest.fixture(scope='session')
72def ydb_service_settings(_ydb_service_settings):
73 return _ydb_service_settings
76@pytest.fixture(scope='session')
77def _ydb_service_settings(pytestconfig):
78 endpoint_from_env = os.environ.get(
'YDB_ENDPOINT')
79 database = os.environ.get(
'YDB_DATABASE',
'local')
82 host, grpc_port = endpoint_from_env.split(
':', 1)
83 return service.ServiceSettings(
91 if pytestconfig.option.ydb_host:
92 return service.ServiceSettings(
93 host=pytestconfig.option.ydb_host,
94 grpc_port=pytestconfig.option.ydb_grpc_port,
95 mon_port=pytestconfig.option.ydb_mon_port,
96 ic_port=pytestconfig.option.ydb_ic_port,
99 return service.get_service_settings()
102@pytest.fixture(scope='session')
103def _ydb_service_schemas(service_source_dir):
104 service_schemas_ydb = service_source_dir /
'ydb' /
'schemas'
105 return discover.find_schemas([service_schemas_ydb])
108@pytest.fixture(scope='session')
109def ydb_settings_substitute(_ydb_service_settings):
110 def secdist_settings(*args, **kwargs):
112 'endpoint':
'{}:{}'.format(
113 _ydb_service_settings.host, _ydb_service_settings.grpc_port,
115 'database':
'/{}'.format(_ydb_service_settings.database),
119 return {
'ydb_settings': secdist_settings}
122@pytest.fixture(scope='session')
132@pytest.fixture(scope='session')
133def ydb_migrate_dir(service_source_dir) -> pathlib.Path:
134 return service_source_dir /
'ydb' /
'migrations'
137def _ydb_migrate(_ydb_service_settings, ydb_migrate_dir):
138 if not ydb_migrate_dir.exists():
140 if not list(ydb_migrate_dir.iterdir()):
146 host = _ydb_service_settings.host
147 port = _ydb_service_settings.grpc_port
152 str(ydb_migrate_dir),
155 f
'grpc://{host}:{port}/local?go_query_mode=scripting&'
156 'go_fake_tx=scripting&go_query_bind=declare,numeric'
161 shell.execute(command, verbose=
True, command_alias=
'ydb/migrations')
162 except shell.SubprocessFailed
as exc:
163 raise Exception(f
'YDB run migration failed:\n\n{exc}')
166def _get_goose() -> Optional[pathlib.Path]:
170 return yatest.common.runtime.binary_path(
171 'contrib/go/patched/goose/cmd/goose/goose',
177def _ydb_fetch_table_names(_ydb_service_settings) -> List[str]:
181 host = _ydb_service_settings.host
182 port = _ydb_service_settings.grpc_port
183 output = subprocess.check_output(
185 yatest.common.runtime.binary_path(
'contrib/ydb/apps/ydb/ydb'),
187 f
'grpc://{host}:{port}',
198 for line
in output.split(
'\n'):
199 if ' table ' not in line:
203 path = line.split(
'│')[6].strip()
210@pytest.fixture(scope='session')
213 _ydb_service_schemas,
214 _ydb_service_settings,
218 if _ydb_service_schemas
and ydb_migrate_dir.exists():
220 'Both ydb/schema and ydb/migrations exist, '
221 'which are mutually exclusive',
225 for schema_path
in _ydb_service_schemas:
226 with open(schema_path)
as fp:
227 tables_schemas = yaml.load(fp.read(), Loader=_YamlLoader)
228 for table_schema
in tables_schemas:
229 client.drop_table(_ydb_client, table_schema[
'path'])
230 client.create_table(_ydb_client, table_schema)
231 _ydb_state.tables.append(table_schema[
'path'])
234 _ydb_migrate(_ydb_service_settings, ydb_migrate_dir)
236 _ydb_state.init =
True
239@pytest.fixture(scope='session')
240def _ydb_tables(_ydb_state, _ydb_prepare, _ydb_service_settings):
243 *_ydb_fetch_table_names(_ydb_service_settings),
245 return tuple(sorted(tables))
253 _ydb_service_settings,
259 def ydb_mark_queries(files=(), queries=()):
262 result_queries.append(load(path))
263 result_queries.extend(queries)
264 return result_queries
266 def drop_table(table):
267 with _ydb_client_pool()
as ydb_client:
268 ydb_client.execute(
'DELETE FROM `{}`'.format(table))
271 with concurrent.futures.ThreadPoolExecutor(
272 max_workers=len(_ydb_tables),
274 executer.map(drop_table, _ydb_tables)
276 for mark
in request.node.iter_markers(
'ydb'):
277 queries = ydb_mark_queries(**mark.kwargs)
278 for query
in queries:
279 _ydb_client.execute(query)
283def userver_ydb_trx(testpoint) -> sql.RegisteredTrx:
285 The fixture maintains transaction fault injection state using
288 @see pytest_userver.sql.RegisteredTrx
290 @snippet integration_tests/tests/test_trx_failure.py fault injection
292 @ingroup userver_testsuite_fixtures
295 registered = sql.RegisteredTrx()
297 @testpoint('ydb_trx_commit')
298 def _pg_trx_tp(data):
299 should_fail = registered.is_failure_enabled(data[
'trx_name'])
300 return {
'trx_should_fail': should_fail}