A system could not achieve high availability if it does not tolerate failures. In microservices and other distributed network architectures the hardware and network itself is the source of failures.
Luckily userver provides a network proxy to simulate network errors and whose behavior could be controlled from functional tests. In other words, it is a tool to do chaos testing.
To implement the tests, the pytest_userver.chaos.TcpGate and pytest_userver.chaos.UdpGate are placed between client and server. After that, the proxy could be used to test the client and the server fault tolerance. The following network conditions could be simulated:
Network condition | Possible reasons for the behavior in production |
Server doesn't accept new connections | Server is overloaded; network misconfigured |
Client/server does not read data from socket | Client/server is overloaded; deadlock in client/server thread; OS lost the data on socket due to overload |
Client/server reads data, but does not respond | Client/server is overloaded; deadlock in client/server thread |
Client/server closes connections | Client/server restart; client/server overloaded and shutdowns connections |
Client/server closes the socket when receives data | Client/server congestion control in action |
Client/server corrupts the response data | Stack overflow or other memory corruption at client/server side |
Network works with send/receive delays | Poorly performing network; interference in the network |
Limited bandwidth | Poorly performing network; |
Connections close on timeout | Timeout on client/server/network interactions from client/server provider |
Data slicing | Interference in the network; poor routers configuration |
Keep in mind the following picture:
For client requests:
Client -> chaos.TcpGate -> Server
For server responses:
Client <- chaos.TcpGate <- Server
Gates allows you to simulate various network conditions on client
and server
parts independently. For example, if you wish to corrupt data from Client to Server, you should use gate.to_server_corrupt_data()
. If you wish to corrupt data that goes from Server to Client, you should use gate.to_client_corrupt_data()
.
To restore the non failure behavior use gate.to_client_pass()
and gate.to_server_pass()
respectively.
chaos.TcpGate
Use chaos.TcpGate when you need to deal with messages over TCP protocol (for example HTTP). This gate supports multiple connections from different clients to one server. TcpGate accepts incoming connections which trigger swapning of new clients connected to server.
chaos.UdpGate
Use chaos.UdpGate when you work with
UDP protocol (for example DNS). UdpGate supports data providing between only one client and one server. If you wish to connect more than one Client to Server, please use different instance of UdpGate for every client.
Usage Sample
Import chaos
to use chaos.TcpGate:
from pytest_userver import chaos
The proxy usually should be started before the service and should keep working between the service tests runs. To achieve that make a fixture with the session scope and add it to the service fixture dependencies.
An example how to setup the proxy to stand between the service and PostgreSQL database:
@pytest.fixture(scope='session')
def pgsql_local(service_source_dir, pgsql_local_create):
databases = discover.find_schemas(
'pg', [service_source_dir.joinpath('schemas/postgresql')],
)
return pgsql_local_create(list(databases.values()))
@pytest.fixture(scope='session')
async def _gate_started(loop, pgsql_local):
gate_config = chaos.GateRoute(
name='postgres proxy',
host_to_server=pgsql_local['key_value'].host,
port_to_server=pgsql_local['key_value'].port,
)
async with chaos.TcpGate(gate_config, loop) as proxy:
yield proxy
@pytest.fixture
def extra_client_deps(_gate_started):
pass
@pytest.fixture(name='userver_config_testsuite_support', scope='session')
def _userver_config_testsuite_support(userver_config_testsuite_support):
def patch_config(config_yaml, config_vars):
userver_config_testsuite_support(config_yaml, config_vars)
components: dict = config_yaml['components_manager']['components']
testsuite_support = components['testsuite-support']
testsuite_support.pop('testsuite-pg-execute-timeout')
testsuite_support.pop('testsuite-pg-statement-timeout')
testsuite_support.pop('testsuite-pg-readonly-master-expected')
return patch_config
@pytest.fixture(scope='session')
def userver_pg_config(pgsql_local, _gate_started):
def _hook_db_config(config_yaml, config_vars):
host, port = _gate_started.get_sockname_for_clients()
db_info = pgsql_local['key_value']
db_chaos_gate = connection.PgConnectionInfo(
host=host,
port=port,
user=db_info.user,
password=db_info.password,
options=db_info.options,
sslmode=db_info.sslmode,
dbname=db_info.dbname,
)
components = config_yaml['components_manager']['components']
db = components['key-value-database']
db['dbconnection'] = db_chaos_gate.get_uri()
return _hook_db_config
The proxy should be reset to the default state between the test runs, to the state when connections are accepted and the data pass from client to server and from server to client:
@pytest.fixture(name='gate')
async def _gate_ready(service_client, _gate_started):
_gate_started.to_server_pass()
_gate_started.to_client_pass()
_gate_started.start_accepting()
await _gate_started.wait_for_connections()
yield _gate_started
After that, everything is ready to write chaos tests:
async def test_pg_congestion_control(service_client, gate):
gate.to_server_close_on_data()
gate.to_client_close_on_data()
for _ in range(gate.connections_count()):
response = await service_client.get(SELECT_SMALL_TIMEOUT_URL)
assert response.status == 500
await _check_that_restores(service_client, gate)
Note that the most important part of the test is to make sure that the service restores functionality after the failures. Touch the dead connections and make sure that the service restores:
import asyncio
import logging
SELECT_URL = '/chaos/postgres?type=select'
MAX_POOL_SIZE = 1
logger = logging.getLogger(__name__)
async def consume_dead_db_connections(service_client):
logger.debug('Starting "consume_dead_db_connections"')
await asyncio.gather(
*[service_client.get(SELECT_URL) for _ in range(MAX_POOL_SIZE * 2)],
)
logger.debug('End of "consume_dead_db_connections"')
logger.debug('Starting "consume_dead_db_connections" check for 200')
results_list = await asyncio.gather(
*[service_client.get(SELECT_URL) for _ in range(MAX_POOL_SIZE)],
)
for result in results_list:
assert result.status_code == 200
logger.debug('End of "consume_dead_db_connections" check for 200')
async def _check_that_restores(service_client, gate):
gate.to_server_pass()
gate.to_client_pass()
gate.start_accepting()
await utils.consume_dead_db_connections(service_client)
logger.debug('Starting "_check_that_restores" wait for 200')
response = await service_client.get(SELECT_URL)
assert response.status == 200, 'Bad results after connection restore'
logger.debug('End of "_check_that_restores" wait for 200')
Full Sources