userver: postgresql/functional_tests/basic_chaos/utils.py
Loading...
Searching...
No Matches
postgresql/functional_tests/basic_chaos/utils.py
1# /// [consume_dead_db_connections]
2import asyncio
3import logging
4
5SELECT_URL = '/chaos/postgres?type=select'
6MAX_POOL_SIZE = 1 # should be in sync with ./static_config.yaml
7
8
9logger = logging.getLogger(__name__)
10
11
12async def consume_dead_db_connections(service_client):
13 logger.debug('Starting "consume_dead_db_connections"')
14 await asyncio.gather(
15 *[service_client.get(SELECT_URL) for _ in range(MAX_POOL_SIZE * 2)],
16 )
17 logger.debug('End of "consume_dead_db_connections"')
18
19 logger.debug('Starting "consume_dead_db_connections" check for 200')
20 results_list = await asyncio.gather(
21 *[service_client.get(SELECT_URL) for _ in range(MAX_POOL_SIZE)],
22 )
23 for result in results_list:
24 assert result.status_code == 200
25 logger.debug('End of "consume_dead_db_connections" check for 200')
26 # /// [consume_dead_db_connections]