userver: Chaos Testing
Loading...
Searching...
No Matches
Chaos Testing

A system could not achieve high availability if it does not tolerate failures. In microservices and other distributed network architectures the hardware and network itself is the source of failures.

Luckily userver provides a network proxy to simulate network errors and whose behavior could be controlled from functional tests. In other words, it is a tool to do chaos testing.

To implement the tests, the pytest_userver.chaos.TcpGate and pytest_userver.chaos.UdpGate are placed between client and server. After that, the proxy could be used to test the client and the server fault tolerance. The following network conditions could be simulated:

Network condition Possible reasons for the behavior in production
Server doesn't accept new connections Server is overloaded; network misconfigured
Client/server does not read data from socket Client/server is overloaded; deadlock in client/server thread; OS lost the data on socket due to overload
Client/server reads data, but does not respond Client/server is overloaded; deadlock in client/server thread
Client/server closes connections Client/server restart; client/server overloaded and shutdowns connections
Client/server closes the socket when receives data Client/server congestion control in action
Client/server corrupts the response data Stack overflow or other memory corruption at client/server side
Network works with send/receive delays Poorly performing network; interference in the network
Limited bandwidth Poorly performing network;
Connections close on timeout Timeout on client/server/network interactions from client/server provider
Data slicing Interference in the network; poor routers configuration

Keep in mind the following picture:

For client requests:

Client -> chaos.TcpGate -> Server

For server responses:

Client <- chaos.TcpGate <- Server

Gates allows you to simulate various network conditions on client and server parts independently. For example, if you wish to corrupt data from Client to Server, you should use gate.to_server_corrupt_data(). If you wish to corrupt data that goes from Server to Client, you should use gate.to_client_corrupt_data().

To restore the non failure behavior use gate.to_client_pass() and gate.to_server_pass() respectively.

chaos.TcpGate

Use chaos.TcpGate when you need to deal with messages over TCP protocol (for example HTTP). This gate supports multiple connections from different clients to one server. TcpGate accepts incoming connections which trigger swapning of new clients connected to server.

chaos.UdpGate

Use chaos.UdpGate when you work with
UDP protocol (for example DNS). UdpGate supports data providing between only one client and one server. If you wish to connect more than one Client to Server, please use different instance of UdpGate for every client.

Usage Sample

Import chaos to use chaos.TcpGate:

from pytest_userver import chaos

The proxy usually should be started before the service and should keep working between the service tests runs. To achieve that make a fixture with the session scope and add it to the service fixture dependencies.

An example how to setup the proxy to stand between the service and PostgreSQL database:

@pytest.fixture(name='pgsql_local', scope='session')
def _pgsql_local(service_source_dir, pgsql_local_create):
databases = discover.find_schemas(
'pg', [service_source_dir.joinpath('schemas/postgresql')],
)
return pgsql_local_create(list(databases.values()))
@pytest.fixture(scope='session')
async def _gate_started(loop, pgsql_local):
gate_config = chaos.GateRoute(
name='postgres proxy',
host_to_server=pgsql_local['key_value'].host,
port_to_server=pgsql_local['key_value'].port,
)
async with chaos.TcpGate(gate_config, loop) as proxy:
yield proxy
@pytest.fixture
def extra_client_deps(_gate_started):
pass
@pytest.fixture(name='userver_config_testsuite', scope='session')
def _userver_config_testsuite(userver_config_testsuite):
def patch_config(config_yaml, config_vars):
userver_config_testsuite(config_yaml, config_vars)
components: dict = config_yaml['components_manager']['components']
testsuite_support = components['testsuite-support']
testsuite_support['testsuite-pg-execute-timeout'] = '0ms'
testsuite_support['testsuite-pg-statement-timeout'] = '0ms'
testsuite_support.pop('testsuite-pg-readonly-master-expected')
return patch_config
@pytest.fixture(scope='session')
def userver_pg_config(pgsql_local, _gate_started):
def _hook_db_config(config_yaml, config_vars):
host, port = _gate_started.get_sockname_for_clients()
db_info = pgsql_local['key_value']
db_chaos_gate = connection.PgConnectionInfo(
host=host,
port=port,
user=db_info.user,
password=db_info.password,
options=db_info.options,
sslmode=db_info.sslmode,
dbname=db_info.dbname,
)
components = config_yaml['components_manager']['components']
db = components['key-value-database']
db['dbconnection'] = db_chaos_gate.get_uri()
return _hook_db_config

The proxy should be reset to the default state between the test runs, to the state when connections are accepted and the data pass from client to server and from server to client:

@pytest.fixture(name='gate')
async def _gate_ready(service_client, _gate_started):
_gate_started.to_server_pass()
_gate_started.to_client_pass()
_gate_started.start_accepting()
await _gate_started.wait_for_connections()
yield _gate_started

After that, everything is ready to write chaos tests:

async def test_pg_congestion_control(service_client, gate):
gate.to_server_close_on_data()
gate.to_client_close_on_data()
for _ in range(gate.connections_count()):
response = await service_client.get(SELECT_SMALL_TIMEOUT_URL)
assert response.status == 500
await _check_that_restores(service_client, gate)

Note that the most important part of the test is to make sure that the service restores functionality after the failures. Touch the dead connections and make sure that the service restores:

import asyncio
import logging
SELECT_URL = '/chaos/postgres?type=select'
MAX_POOL_SIZE = 1 # should be in sync with ./static_config.yaml
logger = logging.getLogger(__name__)
async def consume_dead_db_connections(service_client):
logger.debug('Starting "consume_dead_db_connections"')
await asyncio.gather(*[
service_client.get(SELECT_URL) for _ in range(MAX_POOL_SIZE * 2)
])
logger.debug('End of "consume_dead_db_connections"')
logger.debug('Starting "consume_dead_db_connections" check for 200')
results_list = await asyncio.gather(*[
service_client.get(SELECT_URL) for _ in range(MAX_POOL_SIZE)
])
for result in results_list:
assert result.status_code == 200
logger.debug('End of "consume_dead_db_connections" check for 200')
async def _check_that_restores(service_client, gate):
gate.to_server_pass()
gate.to_client_pass()
gate.start_accepting()
await utils.consume_dead_db_connections(service_client)
logger.debug('Starting "_check_that_restores" wait for 200')
response = await service_client.get(SELECT_URL)
assert response.status == 200, 'Bad results after connection restore'
logger.debug('End of "_check_that_restores" wait for 200')

Full Sources