1import logging
    2import socket
    3 
    4import pytest
    5 
    6from pytest_userver import chaos
    7 
    8import utils
    9 
   10 
   11DATA_TRANSMISSION_DELAY = 1
   12BYTES_PER_SECOND_LIMIT = 10
   13CONNECTION_TIME_LIMIT = 0.4
   14CONNECTION_LIMIT_JITTER = 0.004
   15FAILURE_RETRIES = 250
   16DATA_PARTS_MAX_SIZE = 40
   17BYTES_TRANSMISSION_LIMIT = 1024
   18 
   19SELECT_URL = '/chaos/postgres?type=select'
   20SELECT_SMALL_TIMEOUT_URL = '/chaos/postgres?type=select-small-timeout'
   21 
   22 
   23logger = logging.getLogger(__name__)
   24 
   25 
   26
   27async def _check_that_restores(service_client, gate):
   28    gate.to_server_pass()
   29    gate.to_client_pass()
   30    gate.start_accepting()
   31 
   32    await utils.consume_dead_db_connections(service_client)
   33 
   34    logger.debug('Starting "_check_that_restores" wait for 200')
   35    response = await service_client.get(SELECT_URL)
   36    assert response.status == 200, 'Bad results after connection restore'
   37    logger.debug('End of "_check_that_restores" wait for 200')
   38    
   39 
   40 
   41async def test_pg_fine(service_client, gate):
   42    response = await service_client.get(SELECT_URL)
   43    assert response.status == 200
   44 
   45 
   46async def test_pg_overload_no_accepts(service_client, gate):
   47    
   48    response = await service_client.get(SELECT_URL)
   49    assert response.status == 200
   50 
   51    await gate.stop_accepting()
   52 
   53    
   54    response = await service_client.get(SELECT_URL)
   55    assert response.status == 200
   56 
   57 
   58
   59async def test_pg_congestion_control(service_client, gate):
   60    gate.to_server_close_on_data()
   61    gate.to_client_close_on_data()
   62 
   63    for _ in range(gate.connections_count()):
   64        response = await service_client.get(SELECT_SMALL_TIMEOUT_URL)
   65        assert response.status == 500
   66 
   67    await _check_that_restores(service_client, gate)
   68    
   69 
   70 
   71async def test_close_to_client_limit(service_client, gate):
   72    for i in range(100, 250, 50):
   73        gate.to_client_limit_bytes(i)
   74        response = await service_client.get(SELECT_SMALL_TIMEOUT_URL)
   75        assert response.status == 500, i
   76 
   77    gate.to_client_pass()
   78    await _check_that_restores(service_client, gate)
   79 
   80 
   81async def test_close_to_server_limit(service_client, gate):
   82    for i in range(100, 250, 50):
   83        gate.to_server_limit_bytes(i)
   84        response = await service_client.get(SELECT_SMALL_TIMEOUT_URL)
   85        assert response.status == 500
   86 
   87    gate.to_server_pass()
   88    await _check_that_restores(service_client, gate)
   89 
   90 
   91@pytest.mark.skip(
   92    reason='Rarely breaks the server, and corrupted data can still be valid',
   93)
   94async def test_pg_corrupted_response(service_client, gate):
   95    gate.to_client_corrupt_data()
   96 
   97    for _ in range(gate.connections_count()):
   98        response = await service_client.get(SELECT_URL)
   99        assert response.status == 500
  100 
  101    await _check_that_restores(service_client, gate)
  102 
  103 
  104@pytest.mark.skip(reason='response.status == 200')
  105async def test_network_delay_sends(service_client, gate):
  106    gate.to_server_delay(DATA_TRANSMISSION_DELAY)
  107 
  108    logger.debug('Starting "test_network_delay_sends" check for 500')
  109    response = await service_client.get(SELECT_SMALL_TIMEOUT_URL)
  110    assert response.status == 500
  111    logger.debug('End of "test_network_delay_sends" check for 500')
  112 
  113    await _check_that_restores(service_client, gate)
  114 
  115 
  116@pytest.mark.skip(reason='response.status == 200')
  117async def test_network_delay_recv(service_client, gate):
  118    gate.to_client_delay(DATA_TRANSMISSION_DELAY)
  119 
  120    response = await service_client.get(SELECT_SMALL_TIMEOUT_URL)
  121    assert response.status == 500
  122 
  123    await _check_that_restores(service_client, gate)
  124 
  125 
  126async def test_network_delay(service_client, gate):
  127    gate.to_server_delay(DATA_TRANSMISSION_DELAY)
  128    gate.to_client_delay(DATA_TRANSMISSION_DELAY)
  129 
  130    logger.debug('Starting "test_network_delay" check for 500')
  131    response = await service_client.get(SELECT_SMALL_TIMEOUT_URL)
  132    assert response.status == 500
  133    logger.debug('End of "test_network_delay" check for 500')
  134 
  135    await _check_that_restores(service_client, gate)
  136 
  137 
  138async def test_network_limit_bps_sends(service_client, gate):
  139    gate.to_server_limit_bps(BYTES_PER_SECOND_LIMIT)
  140 
  141    logger.debug('Starting "test_network_limit_bps_sends" check for 500')
  142    response = await service_client.get(SELECT_SMALL_TIMEOUT_URL)
  143    assert response.status == 500
  144    logger.debug('End of "test_network_limit_bps_sends" check for 500')
  145 
  146    await _check_that_restores(service_client, gate)
  147 
  148 
  149async def test_network_limit_bps_recv(service_client, gate):
  150    gate.to_client_limit_bps(BYTES_PER_SECOND_LIMIT)
  151 
  152    logger.debug('Starting "test_network_limit_bps_recv" check for 500')
  153    response = await service_client.get(SELECT_SMALL_TIMEOUT_URL)
  154    assert response.status == 500
  155    logger.debug('End of "test_network_limit_bps_recv" check for 500')
  156 
  157    await _check_that_restores(service_client, gate)
  158 
  159 
  160async def test_network_limit_bps(service_client, gate):
  161    gate.to_server_limit_bps(BYTES_PER_SECOND_LIMIT)
  162    gate.to_client_limit_bps(BYTES_PER_SECOND_LIMIT)
  163 
  164    logger.debug('Starting "test_network_limit_bps" check for 500')
  165    response = await service_client.get(SELECT_SMALL_TIMEOUT_URL)
  166    assert response.status == 500
  167    logger.debug('End of "test_network_limit_bps" check for 500')
  168 
  169    await _check_that_restores(service_client, gate)
  170 
  171 
  172async def test_network_limit_time_sends(service_client, gate):
  173    gate.to_server_limit_time(CONNECTION_TIME_LIMIT, CONNECTION_LIMIT_JITTER)
  174 
  175    logger.debug('Starting "test_network_limit_time_sends" check for 500')
  176    got_error = False
  177    for _ in range(FAILURE_RETRIES):
  178        response = await service_client.get(SELECT_SMALL_TIMEOUT_URL)
  179        if response.status != 200:
  180            got_error = True
  181            break
  182    logger.debug('End of "test_network_limit_time_sends" check for 500')
  183 
  184    assert got_error, 'Previous steps unexpectedly finished with success'
  185 
  186    await _check_that_restores(service_client, gate)
  187 
  188 
  189async def test_network_limit_time_recv(service_client, gate):
  190    gate.to_client_limit_time(CONNECTION_TIME_LIMIT, CONNECTION_LIMIT_JITTER)
  191 
  192    logger.debug('Starting "test_network_limit_time_recv" check for 500')
  193    got_error = False
  194    for _ in range(FAILURE_RETRIES):
  195        response = await service_client.get(SELECT_SMALL_TIMEOUT_URL)
  196        if response.status != 200:
  197            got_error = True
  198            break
  199    logger.debug('End of "test_network_limit_time_recv" check for 500')
  200 
  201    assert got_error, 'Previous steps unexpectedly finished with success'
  202 
  203    await _check_that_restores(service_client, gate)
  204 
  205 
  206async def test_network_limit_time(service_client, gate):
  207    gate.to_server_limit_time(CONNECTION_TIME_LIMIT, CONNECTION_LIMIT_JITTER)
  208    gate.to_client_limit_time(CONNECTION_TIME_LIMIT, CONNECTION_LIMIT_JITTER)
  209 
  210    logger.debug('Starting "test_network_limit_time" check for 500')
  211    got_error = False
  212    for _ in range(FAILURE_RETRIES):
  213        response = await service_client.get(SELECT_SMALL_TIMEOUT_URL)
  214        if response.status != 200:
  215            got_error = True
  216            break
  217    logger.debug('End of "test_network_limit_time" check for 500')
  218 
  219    assert got_error, 'Previous steps unexpectedly finished with success'
  220 
  221    await _check_that_restores(service_client, gate)
  222 
  223 
  224async def test_network_smaller_parts_sends(service_client, gate):
  225    gate.to_server_smaller_parts(DATA_PARTS_MAX_SIZE)
  226 
  227    response = await service_client.get(SELECT_URL)
  228    assert response.status == 200
  229 
  230 
  231async def test_network_smaller_parts_recv(service_client, gate):
  232    gate.to_client_smaller_parts(DATA_PARTS_MAX_SIZE)
  233 
  234    response = await service_client.get(SELECT_URL)
  235    assert response.status == 200
  236 
  237 
  238async def test_network_smaller_parts(service_client, gate):
  239    gate.to_server_smaller_parts(DATA_PARTS_MAX_SIZE)
  240    gate.to_client_smaller_parts(DATA_PARTS_MAX_SIZE)
  241 
  242    response = await service_client.get(SELECT_URL)
  243    assert response.status == 200
  244 
  245 
  246
  247async def test_network_limit_bytes_sends(service_client, gate):
  248    gate.to_server_limit_bytes(BYTES_TRANSMISSION_LIMIT)
  249 
  250    logger.debug('Starting "test_network_limit_bytes_sends" check for 500')
  251    got_error = False
  252    for _ in range(FAILURE_RETRIES):
  253        response = await service_client.get(SELECT_SMALL_TIMEOUT_URL)
  254        if response.status != 200:
  255            got_error = True
  256            break
  257    logger.debug('End of "test_network_limit_bytes_sends" check for 500')
  258 
  259    assert got_error, 'Previous steps unexpectedly finished with success'
  260 
  261    await _check_that_restores(service_client, gate)
  262 
  263 
  264async def test_network_limit_bytes_recv(service_client, gate):
  265    gate.to_client_limit_bytes(BYTES_TRANSMISSION_LIMIT)
  266 
  267    logger.debug('Starting "test_network_limit_bytes_recv" check for 500')
  268    got_error = False
  269    for _ in range(FAILURE_RETRIES):
  270        response = await service_client.get(SELECT_SMALL_TIMEOUT_URL)
  271        if response.status != 200:
  272            got_error = True
  273            break
  274    logger.debug('End of "test_network_limit_bytes_recv" check for 500')
  275 
  276    assert got_error, 'Previous steps unexpectedly finished with success'
  277 
  278    await _check_that_restores(service_client, gate)
  279 
  280 
  281async def test_network_limit_bytes(service_client, gate):
  282    gate.to_server_limit_bytes(BYTES_TRANSMISSION_LIMIT)
  283    gate.to_client_limit_bytes(BYTES_TRANSMISSION_LIMIT)
  284 
  285    logger.debug('Starting "test_network_limit_bytes" check for 500')
  286    got_error = False
  287    for _ in range(FAILURE_RETRIES):
  288        response = await service_client.get(SELECT_SMALL_TIMEOUT_URL)
  289        if response.status != 200:
  290            got_error = True
  291            break
  292    logger.debug('End of "test_network_limit_bytes" check for 500')
  293 
  294    assert got_error, 'Previous steps unexpectedly finished with success'
  295 
  296    await _check_that_restores(service_client, gate)
  297 
  298 
  299async def _intercept_server_terminated(
  300        loop, socket_from: socket.socket, socket_to: socket.socket,
  301) -> None:
  302    error_msg = (
  303        b'E\x00\x00\x00tSFATAL\x00VFATAL\x00C57P01\x00'
  304        b'Mterminating connection due to administrator command\x00'
  305        b'Fpostgres.c\x00L3218\x00RProcessInterrupts\x00\x00'
  306    )
  307    ready_for_query = b'Z\x00\x00\x00\x05'
  308 
  309    
  310    
  311    
  312    data = b''
  313    n = -1
  314    while n < 0:
  315        data += await loop.sock_recv(socket_from, 4096)
  316        n = data.find(ready_for_query)
  317    await loop.sock_sendall(socket_to, data[:n])
  318    await loop.sock_sendall(socket_to, error_msg)
  319    raise chaos.GateInterceptException('Closing socket after error')
  320 
  321 
  322async def test_close_with_error(service_client, gate, testpoint):
  323    should_close = False
  324 
  325    @testpoint('after_trx_begin')
  326    async def _hook(_data):
  327        if should_close:
  328            gate.set_to_client_interceptor(_intercept_server_terminated)
  329 
  330    response = await service_client.get(SELECT_SMALL_TIMEOUT_URL)
  331    assert response.status == 200
  332 
  333    should_close = True
  334    gate.set_to_client_interceptor(_intercept_server_terminated)
  335 
  336    response = await service_client.get(SELECT_SMALL_TIMEOUT_URL)
  337    assert response.status == 500
  338 
  339    should_close = False
  340    await _check_that_restores(service_client, gate)