2import concurrent.futures
 
    8from typing 
import Optional
 
   13from testsuite.environment 
import shell
 
   15from pytest_userver 
import sql
 
   20if hasattr(yaml, 
'CLoader'):
 
   21    _YamlLoader = yaml.CLoader  
 
   23    _YamlLoader = yaml.Loader  
 
   25USERVER_CONFIG_HOOKS = [
'userver_config_ydb']
 
   29def ydb(_ydb_client, _ydb_init):
 
   33@pytest.fixture(scope='session') 
   34def _ydb_client(_ydb_client_pool):
 
   35    with _ydb_client_pool() 
as ydb_client:
 
   39@pytest.fixture(scope='session') 
   40def _ydb_client_pool(_ydb_service, ydb_service_settings):
 
   41    endpoint = 
'{}:{}'.format(
 
   42        ydb_service_settings.host, ydb_service_settings.grpc_port,
 
   46    @contextlib.contextmanager 
   49            ydb_client = pool.pop()
 
   51            ydb_client = client.YdbClient(
 
   52                endpoint, ydb_service_settings.database,
 
   57            pool.append(ydb_client)
 
   62def pytest_service_register(register_service):
 
   63    register_service(
'ydb', service.create_ydb_service)
 
   66@pytest.fixture(scope='session') 
   67def _ydb_service(pytestconfig, ensure_service_started, ydb_service_settings):
 
   68    if os.environ.get(
'YDB_ENDPOINT') 
or pytestconfig.option.ydb_host:
 
   70    ensure_service_started(
'ydb', settings=ydb_service_settings)
 
   73@pytest.fixture(scope='session') 
   74def ydb_service_settings(pytestconfig) -> service.ServiceSettings:
 
   75    endpoint_from_env = os.environ.get(
'YDB_ENDPOINT')
 
   76    database = os.environ.get(
'YDB_DATABASE', 
'local')
 
   79        host, grpc_port = endpoint_from_env.split(
':', 1)
 
   80        return service.ServiceSettings(
 
   88    if pytestconfig.option.ydb_host:
 
   89        return service.ServiceSettings(
 
   90            host=pytestconfig.option.ydb_host,
 
   91            grpc_port=pytestconfig.option.ydb_grpc_port,
 
   92            mon_port=pytestconfig.option.ydb_mon_port,
 
   93            ic_port=pytestconfig.option.ydb_ic_port,
 
   96    return service.get_service_settings()
 
   99@pytest.fixture(scope='session') 
  100def _ydb_service_schemas(service_source_dir):
 
  101    service_schemas_ydb = service_source_dir / 
'ydb' / 
'schemas' 
  102    return discover.find_schemas([service_schemas_ydb])
 
  105@pytest.fixture(scope='session') 
  106def ydb_settings_substitute(ydb_service_settings):
 
  107    def secdist_settings(*args, **kwargs):
 
  109            'endpoint': 
'{}:{}'.format(
 
  110                ydb_service_settings.host, ydb_service_settings.grpc_port,
 
  112            'database': 
'/{}'.format(ydb_service_settings.database),
 
  116    return {
'ydb_settings': secdist_settings}
 
  119@pytest.fixture(scope='session') 
  129@pytest.fixture(scope='session') 
  130def ydb_migrate_dir(service_source_dir) -> pathlib.Path:
 
  131    return service_source_dir / 
'ydb' / 
'migrations' 
  134def _ydb_migrate(ydb_service_settings, ydb_migrate_dir):
 
  135    if not ydb_migrate_dir.exists():
 
  137    if not list(ydb_migrate_dir.iterdir()):
 
  143    host = ydb_service_settings.host
 
  144    port = ydb_service_settings.grpc_port
 
  149        str(ydb_migrate_dir),
 
  152            f
'grpc://{host}:{port}/local?go_query_mode=scripting&' 
  153            'go_fake_tx=scripting&go_query_bind=declare,numeric' 
  158        shell.execute(command, verbose=
True, command_alias=
'ydb/migrations')
 
  159    except shell.SubprocessFailed 
as exc:
 
  160        raise Exception(f
'YDB run migration failed:\n\n{exc}')
 
  163def _get_goose() -> Optional[pathlib.Path]:
 
  167        return yatest.common.runtime.binary_path(
 
  168            'contrib/go/patched/goose/cmd/goose/goose',
 
  174def _ydb_fetch_table_names(ydb_service_settings) -> List[str]:
 
  178        host = ydb_service_settings.host
 
  179        port = ydb_service_settings.grpc_port
 
  180        output = subprocess.check_output(
 
  182                yatest.common.runtime.binary_path(
'contrib/ydb/apps/ydb/ydb'),
 
  184                f
'grpc://{host}:{port}',
 
  195        for line 
in output.split(
'\n'):
 
  196            if ' table ' not in line:
 
  200            path = line.split(
'│')[6].strip()
 
  207@pytest.fixture(scope='session') 
  210        _ydb_service_schemas,
 
  211        ydb_service_settings,
 
  215    if _ydb_service_schemas 
and ydb_migrate_dir.exists():
 
  217            'Both ydb/schema and ydb/migrations exist, ' 
  218            'which are mutually exclusive',
 
  222    for schema_path 
in _ydb_service_schemas:
 
  223        with open(schema_path) 
as fp:
 
  224            tables_schemas = yaml.load(fp.read(), Loader=_YamlLoader)
 
  225        for table_schema 
in tables_schemas:
 
  226            client.drop_table(_ydb_client, table_schema[
'path'])
 
  227            client.create_table(_ydb_client, table_schema)
 
  228            _ydb_state.tables.append(table_schema[
'path'])
 
  231    _ydb_migrate(ydb_service_settings, ydb_migrate_dir)
 
  233    _ydb_state.init = 
True 
  236@pytest.fixture(scope='session') 
  237def _ydb_tables(_ydb_state, _ydb_prepare, ydb_service_settings):
 
  240        *_ydb_fetch_table_names(ydb_service_settings),
 
  242    return tuple(sorted(tables))
 
  250        ydb_service_settings,
 
  256    def ydb_mark_queries(files=(), queries=()):
 
  259            result_queries.append(load(path))
 
  260        result_queries.extend(queries)
 
  261        return result_queries
 
  263    def drop_table(table):
 
  264        with _ydb_client_pool() 
as ydb_client:
 
  265            ydb_client.execute(
'DELETE FROM `{}`'.format(table))
 
  268        with concurrent.futures.ThreadPoolExecutor(
 
  269                max_workers=len(_ydb_tables),
 
  271            executer.map(drop_table, _ydb_tables)
 
  273    for mark 
in request.node.iter_markers(
'ydb'):
 
  274        queries = ydb_mark_queries(**mark.kwargs)
 
  275        for query 
in queries:
 
  276            _ydb_client.execute(query)
 
  280def userver_ydb_trx(testpoint) -> sql.RegisteredTrx:
 
  282    The fixture maintains transaction fault injection state using 
  285    @see pytest_userver.sql.RegisteredTrx 
  287    @snippet integration_tests/tests/test_trx_failure.py  fault injection 
  289    @ingroup userver_testsuite_fixtures 
  292    registered = sql.RegisteredTrx()
 
  294    @testpoint('ydb_trx_commit') 
  295    def _pg_trx_tp(data):
 
  296        should_fail = registered.is_failure_enabled(data[
'trx_name'])
 
  297        return {
'trx_should_fail': should_fail}
 
  302@pytest.fixture(scope='session') 
  303def userver_config_ydb(ydb_service_settings):
 
  305    Returns a function that adjusts the static configuration file for testsuite. 
  307    For all `ydb.databases`, sets `endpoint` and `database` to the local test 
  310    @ingroup userver_testsuite_fixtures 
  313    endpoint = f
'{ydb_service_settings.host}:{ydb_service_settings.grpc_port}' 
  315        '' if ydb_service_settings.database.startswith(
'/') 
else '/' 
  316    ) + ydb_service_settings.database
 
  318    def patch_config(config, config_vars):
 
  319        ydb_component = config[
'components_manager'][
'components'][
'ydb']
 
  320        if isinstance(ydb_component, str):
 
  321            ydb_component = config_vars[ydb_component[1:]]
 
  322        databases = ydb_component[
'databases']
 
  323        for dbname, dbconfig 
in databases.items():
 
  324            dbconfig[
'endpoint'] = endpoint
 
  325            dbconfig[
'database'] = database