@pytest.fixture(scope='session')
def pgsql_local(service_source_dir, pgsql_local_create):
databases = discover.find_schemas(
'pg', [service_source_dir.joinpath('schemas/postgresql')],
)
return pgsql_local_create(list(databases.values()))
@pytest.fixture(scope='session')
async def _gate_started(loop, pgsql_local):
gate_config = chaos.GateRoute(
name='postgres proxy',
host_to_server=pgsql_local['key_value'].host,
port_to_server=pgsql_local['key_value'].port,
)
async with chaos.TcpGate(gate_config, loop) as proxy:
yield proxy
@pytest.fixture
def extra_client_deps(_gate_started):
pass
@pytest.fixture(name='userver_config_testsuite_support', scope='session')
def _userver_config_testsuite_support(userver_config_testsuite_support):
def patch_config(config_yaml, config_vars):
userver_config_testsuite_support(config_yaml, config_vars)
components: dict = config_yaml['components_manager']['components']
testsuite_support = components['testsuite-support']
testsuite_support.pop('testsuite-pg-execute-timeout')
testsuite_support.pop('testsuite-pg-statement-timeout')
testsuite_support.pop('testsuite-pg-readonly-master-expected')
return patch_config
@pytest.fixture(scope='session')
def userver_pg_config(pgsql_local, _gate_started):
def _hook_db_config(config_yaml, config_vars):
host, port = _gate_started.get_sockname_for_clients()
db_info = pgsql_local['key_value']
db_chaos_gate = connection.PgConnectionInfo(
host=host,
port=port,
user=db_info.user,
password=db_info.password,
options=db_info.options,
sslmode=db_info.sslmode,
dbname=db_info.dbname,
)
components = config_yaml['components_manager']['components']
db = components['key-value-database']
db['dbconnection'] = db_chaos_gate.get_uri()
return _hook_db_config