1import asyncio
    2import logging
    3import socket
    4 
    5import pytest
    6from pytest_userver import chaos
    7 
    8import utils
    9 
   10 
   11DATA_TRANSMISSION_DELAY = 1
   12BYTES_PER_SECOND_LIMIT = 10
   13CONNECTION_TIME_LIMIT = 0.4
   14CONNECTION_LIMIT_JITTER = 0.004
   15FAILURE_RETRIES = 250
   16DATA_PARTS_MAX_SIZE = 40
   17BYTES_TRANSMISSION_LIMIT = 1024
   18 
   19SELECT_URL = '/chaos/postgres?type=select'
   20SELECT_SMALL_TIMEOUT_URL = '/chaos/postgres?type=select-small-timeout'
   21 
   22 
   23logger = logging.getLogger(__name__)
   24 
   25 
   26
   27async def _check_that_restores(service_client, gate):
   28    gate.to_server_pass()
   29    gate.to_client_pass()
   30    gate.start_accepting()
   31 
   32    await utils.consume_dead_db_connections(service_client)
   33 
   34    logger.debug('Starting "_check_that_restores" wait for 200')
   35    response = await service_client.get(SELECT_URL)
   36    assert response.status == 200, 'Bad results after connection restore'
   37    logger.debug('End of "_check_that_restores" wait for 200')
   38    
   39 
   40 
   41async def test_pg_fine(service_client, gate):
   42    response = await service_client.get(SELECT_URL)
   43    assert response.status == 200
   44 
   45 
   46async def test_pg_overload_no_accepts(service_client, gate):
   47    
   48    response = await service_client.get(SELECT_URL)
   49    assert response.status == 200
   50 
   51    await gate.stop_accepting()
   52 
   53    
   54    response = await service_client.get(SELECT_URL)
   55    assert response.status == 200
   56 
   57 
   58
   59async def test_pg_congestion_control(service_client, gate):
   60    gate.to_server_close_on_data()
   61    gate.to_client_close_on_data()
   62 
   63    for _ in range(gate.connections_count()):
   64        response = await service_client.get(SELECT_SMALL_TIMEOUT_URL)
   65        assert response.status == 500
   66 
   67    await _check_that_restores(service_client, gate)
   68    
   69 
   70 
   71async def test_close_to_client_limit(service_client, gate):
   72    for i in range(100, 250, 50):
   73        gate.to_client_limit_bytes(i)
   74        response = await service_client.get(SELECT_SMALL_TIMEOUT_URL)
   75        assert response.status == 500, i
   76 
   77    gate.to_client_pass()
   78    await _check_that_restores(service_client, gate)
   79 
   80 
   81async def test_close_to_server_limit(service_client, gate):
   82    for i in range(100, 250, 50):
   83        gate.to_server_limit_bytes(i)
   84        response = await service_client.get(SELECT_SMALL_TIMEOUT_URL)
   85        assert response.status == 500
   86 
   87    gate.to_server_pass()
   88    await _check_that_restores(service_client, gate)
   89 
   90 
   91@pytest.mark.skip(
   92    reason='Rarely breaks the server, and corrupted data can still be valid',
   93)
   94async def test_pg_corrupted_response(service_client, gate):
   95    gate.to_client_corrupt_data()
   96 
   97    for _ in range(gate.connections_count()):
   98        response = await service_client.get(SELECT_URL)
   99        assert response.status == 500
  100 
  101    await _check_that_restores(service_client, gate)
  102 
  103 
  104@pytest.mark.skip(reason='response.status == 200')
  105async def test_network_delay_sends(service_client, gate):
  106    gate.to_server_delay(DATA_TRANSMISSION_DELAY)
  107 
  108    logger.debug('Starting "test_network_delay_sends" check for 500')
  109    response = await service_client.get(SELECT_SMALL_TIMEOUT_URL)
  110    assert response.status == 500
  111    logger.debug('End of "test_network_delay_sends" check for 500')
  112 
  113    await _check_that_restores(service_client, gate)
  114 
  115 
  116@pytest.mark.skip(reason='response.status == 200')
  117async def test_network_delay_recv(service_client, gate):
  118    gate.to_client_delay(DATA_TRANSMISSION_DELAY)
  119 
  120    response = await service_client.get(SELECT_SMALL_TIMEOUT_URL)
  121    assert response.status == 500
  122 
  123    await _check_that_restores(service_client, gate)
  124 
  125 
  126async def test_network_delay(service_client, gate):
  127    gate.to_server_delay(DATA_TRANSMISSION_DELAY)
  128    gate.to_client_delay(DATA_TRANSMISSION_DELAY)
  129 
  130    logger.debug('Starting "test_network_delay" check for 500')
  131    response = await service_client.get(SELECT_SMALL_TIMEOUT_URL)
  132    assert response.status == 500
  133    logger.debug('End of "test_network_delay" check for 500')
  134 
  135    await _check_that_restores(service_client, gate)
  136 
  137 
  138async def test_network_limit_bps_sends(service_client, gate):
  139    gate.to_server_limit_bps(BYTES_PER_SECOND_LIMIT)
  140 
  141    logger.debug('Starting "test_network_limit_bps_sends" check for 500')
  142    response = await service_client.get(SELECT_SMALL_TIMEOUT_URL)
  143    assert response.status == 500
  144    logger.debug('End of "test_network_limit_bps_sends" check for 500')
  145 
  146    await _check_that_restores(service_client, gate)
  147 
  148 
  149async def test_network_limit_bps_recv(service_client, gate):
  150    gate.to_client_limit_bps(BYTES_PER_SECOND_LIMIT)
  151 
  152    logger.debug('Starting "test_network_limit_bps_recv" check for 500')
  153    response = await service_client.get(SELECT_SMALL_TIMEOUT_URL)
  154    assert response.status == 500
  155    logger.debug('End of "test_network_limit_bps_recv" check for 500')
  156 
  157    await _check_that_restores(service_client, gate)
  158 
  159 
  160async def test_network_limit_bps(service_client, gate):
  161    gate.to_server_limit_bps(BYTES_PER_SECOND_LIMIT)
  162    gate.to_client_limit_bps(BYTES_PER_SECOND_LIMIT)
  163 
  164    logger.debug('Starting "test_network_limit_bps" check for 500')
  165    response = await service_client.get(SELECT_SMALL_TIMEOUT_URL)
  166    assert response.status == 500
  167    logger.debug('End of "test_network_limit_bps" check for 500')
  168 
  169    await _check_that_restores(service_client, gate)
  170 
  171 
  172@pytest.mark.skip(reason='flacky')
  173async def test_network_limit_time_sends(service_client, gate):
  174    gate.to_server_limit_time(CONNECTION_TIME_LIMIT, CONNECTION_LIMIT_JITTER)
  175 
  176    logger.debug('Starting "test_network_limit_time_sends" check for 500')
  177    got_error = False
  178    for _ in range(FAILURE_RETRIES):
  179        response = await service_client.get(SELECT_SMALL_TIMEOUT_URL)
  180        if response.status != 200:
  181            got_error = True
  182            break
  183    logger.debug('End of "test_network_limit_time_sends" check for 500')
  184 
  185    assert got_error, 'Previous steps unexpectedly finished with success'
  186 
  187    await _check_that_restores(service_client, gate)
  188 
  189 
  190@pytest.mark.skip(reason='flacky')
  191async def test_network_limit_time_recv(service_client, gate):
  192    gate.to_client_limit_time(CONNECTION_TIME_LIMIT, CONNECTION_LIMIT_JITTER)
  193 
  194    logger.debug('Starting "test_network_limit_time_recv" check for 500')
  195    got_error = False
  196    for _ in range(FAILURE_RETRIES):
  197        response = await service_client.get(SELECT_SMALL_TIMEOUT_URL)
  198        if response.status != 200:
  199            got_error = True
  200            break
  201    logger.debug('End of "test_network_limit_time_recv" check for 500')
  202 
  203    assert got_error, 'Previous steps unexpectedly finished with success'
  204 
  205    await _check_that_restores(service_client, gate)
  206 
  207 
  208@pytest.mark.skip(reason='flacky')
  209async def test_network_limit_time(service_client, gate):
  210    gate.to_server_limit_time(CONNECTION_TIME_LIMIT, CONNECTION_LIMIT_JITTER)
  211    gate.to_client_limit_time(CONNECTION_TIME_LIMIT, CONNECTION_LIMIT_JITTER)
  212 
  213    logger.debug('Starting "test_network_limit_time" check for 500')
  214    got_error = False
  215    for _ in range(FAILURE_RETRIES):
  216        response = await service_client.get(SELECT_SMALL_TIMEOUT_URL)
  217        if response.status != 200:
  218            got_error = True
  219            break
  220    logger.debug('End of "test_network_limit_time" check for 500')
  221 
  222    assert got_error, 'Previous steps unexpectedly finished with success'
  223 
  224    await _check_that_restores(service_client, gate)
  225 
  226 
  227async def test_network_smaller_parts_sends(service_client, gate):
  228    gate.to_server_smaller_parts(DATA_PARTS_MAX_SIZE)
  229 
  230    response = await service_client.get(SELECT_URL)
  231    assert response.status == 200
  232 
  233 
  234async def test_network_smaller_parts_recv(service_client, gate):
  235    gate.to_client_smaller_parts(DATA_PARTS_MAX_SIZE)
  236 
  237    response = await service_client.get(SELECT_URL)
  238    assert response.status == 200
  239 
  240 
  241async def test_network_smaller_parts(service_client, gate):
  242    gate.to_server_smaller_parts(DATA_PARTS_MAX_SIZE)
  243    gate.to_client_smaller_parts(DATA_PARTS_MAX_SIZE)
  244 
  245    response = await service_client.get(SELECT_URL)
  246    assert response.status == 200
  247 
  248 
  249
  250async def test_network_limit_bytes_sends(service_client, gate):
  251    gate.to_server_limit_bytes(BYTES_TRANSMISSION_LIMIT)
  252 
  253    logger.debug('Starting "test_network_limit_bytes_sends" check for 500')
  254    got_error = False
  255    for _ in range(FAILURE_RETRIES):
  256        response = await service_client.get(SELECT_SMALL_TIMEOUT_URL)
  257        if response.status != 200:
  258            got_error = True
  259            break
  260    logger.debug('End of "test_network_limit_bytes_sends" check for 500')
  261 
  262    assert got_error, 'Previous steps unexpectedly finished with success'
  263 
  264    await _check_that_restores(service_client, gate)
  265 
  266 
  267async def test_network_limit_bytes_recv(service_client, gate):
  268    gate.to_client_limit_bytes(BYTES_TRANSMISSION_LIMIT)
  269 
  270    logger.debug('Starting "test_network_limit_bytes_recv" check for 500')
  271    got_error = False
  272    for _ in range(FAILURE_RETRIES):
  273        response = await service_client.get(SELECT_SMALL_TIMEOUT_URL)
  274        if response.status != 200:
  275            got_error = True
  276            break
  277    logger.debug('End of "test_network_limit_bytes_recv" check for 500')
  278 
  279    assert got_error, 'Previous steps unexpectedly finished with success'
  280 
  281    await _check_that_restores(service_client, gate)
  282 
  283 
  284async def test_network_limit_bytes(service_client, gate):
  285    gate.to_server_limit_bytes(BYTES_TRANSMISSION_LIMIT)
  286    gate.to_client_limit_bytes(BYTES_TRANSMISSION_LIMIT)
  287 
  288    logger.debug('Starting "test_network_limit_bytes" check for 500')
  289    got_error = False
  290    for _ in range(FAILURE_RETRIES):
  291        response = await service_client.get(SELECT_SMALL_TIMEOUT_URL)
  292        if response.status != 200:
  293            got_error = True
  294            break
  295    logger.debug('End of "test_network_limit_bytes" check for 500')
  296 
  297    assert got_error, 'Previous steps unexpectedly finished with success'
  298 
  299    await _check_that_restores(service_client, gate)
  300 
  301 
  302async def _intercept_server_terminated(
  303        loop, socket_from: socket.socket, socket_to: socket.socket,
  304) -> None:
  305    error_msg = (
  306        b'E\x00\x00\x00tSFATAL\x00VFATAL\x00C57P01\x00'
  307        b'Mterminating connection due to administrator command\x00'
  308        b'Fpostgres.c\x00L3218\x00RProcessInterrupts\x00\x00'
  309    )
  310    ready_for_query = b'Z\x00\x00\x00\x05'
  311 
  312    
  313    
  314    
  315    data = b''
  316    n_bytes = -1
  317    while n_bytes < 0:
  318        data += await loop.sock_recv(socket_from, 4096)
  319        n_bytes = data.find(ready_for_query)
  320    await loop.sock_sendall(socket_to, data[:n_bytes])
  321    await loop.sock_sendall(socket_to, error_msg)
  322    raise chaos.GateInterceptException('Closing socket after error')
  323 
  324 
  325async def test_close_with_error(service_client, gate, testpoint):
  326    should_close = False
  327 
  328    @testpoint('after_trx_begin')
  329    async def _hook(_data):
  330        if should_close:
  331            gate.set_to_client_interceptor(_intercept_server_terminated)
  332 
  333    response = await service_client.get(SELECT_SMALL_TIMEOUT_URL)
  334    assert response.status == 200
  335 
  336    should_close = True
  337    gate.set_to_client_interceptor(_intercept_server_terminated)
  338 
  339    response = await service_client.get(SELECT_SMALL_TIMEOUT_URL)
  340    assert response.status == 500
  341 
  342    should_close = False
  343    await _check_that_restores(service_client, gate)
  344 
  345 
  346@pytest.mark.config(
  347    POSTGRES_CONNECTION_POOL_SETTINGS={
  348        '__default__': {'max_pool_size': 1, 'min_pool_size': 1},
  349    },
  350)
  351async def test_prepared_statement_already_exists(
  352        service_client, gate, testpoint,
  353):
  354    first = {1: True}
  355 
  356    @testpoint('after_trx_begin')
  357    async def _hook(_data):
  358        if first[1]:
  359            gate.to_client_delay(0.9)
  360        first[1] = False
  361 
  362    @testpoint('pg_cleanup')
  363    async def pg_cleanup_hook(_data):
  364        gate.to_client_pass()
  365 
  366    response = await service_client.get(SELECT_SMALL_TIMEOUT_URL)
  367    assert response.status == 500
  368 
  369    logger.debug('after slow select')
  370 
  371    gate.to_client_pass()
  372    await asyncio.sleep(2)
  373 
  374    logger.debug('after sleep')
  375 
  376    response = await service_client.get(SELECT_SMALL_TIMEOUT_URL)
  377    assert response.status == 500
  378 
  379    response = await service_client.get(SELECT_SMALL_TIMEOUT_URL)
  380    assert response.status == 200
  381 
  382    response = await service_client.get(SELECT_SMALL_TIMEOUT_URL)
  383    assert response.status == 200