1
    2import asyncio
    3import socket
    4 
    5import pytest
    6from pytest_userver import chaos
    7 
    8DATA = (
    9    b'Once upon a midnight dreary, while I pondered, weak and weary, ',
   10    b'Over many a quaint and curious volume of forgotten lore - ',
   11    b'While I nodded, nearly napping, suddenly there came a tapping, ',
   12    b'As of some one gently rapping, rapping at my chamber door - ',
   13    b'"Tis some visitor," I muttered, "tapping at my chamber door - ',
   14    b'Only this and nothing more."',
   15)
   16DATA_LENGTH = sum(len(x) for x in DATA)
   17 
   18 
   19
   20@pytest.fixture(scope='session')
   21def monitor_port(service_port) -> int:
   22    return service_port
   23 
   24 
   25async def send_all_data(sock, loop):
   26    for data in DATA:
   27        await loop.sock_sendall(sock, data)
   28 
   29 
   30async def recv_all_data(sock, loop):
   31    answer = b''
   32    while len(answer) < DATA_LENGTH:
   33        answer += await loop.sock_recv(sock, DATA_LENGTH - len(answer))
   34 
   35    assert answer == b''.join(DATA)
   36 
   37 
   38async def test_basic(service_client, loop, monitor_client, tcp_service_port):
   39    await service_client.reset_metrics()
   40 
   41    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
   42    await loop.sock_connect(sock, ('localhost', tcp_service_port))
   43 
   44    send_task = asyncio.create_task(send_all_data(sock, loop))
   45    await recv_all_data(sock, loop)
   46    await send_task
   47    metrics = await monitor_client.metrics(prefix='tcp-echo.')
   48    assert metrics.value_at('tcp-echo.sockets.opened') == 1
   49    assert metrics.value_at('tcp-echo.sockets.closed') == 0
   50    assert metrics.value_at('tcp-echo.bytes.read') == DATA_LENGTH
   51    
   52 
   53 
   54@pytest.fixture(name='gate', scope='function')
   55async def _gate(loop, tcp_service_port):
   56    gate_config = chaos.GateRoute(
   57        name='tcp proxy',
   58        host_to_server='localhost',
   59        port_to_server=tcp_service_port,
   60    )
   61    async with chaos.TcpGate(gate_config, loop) as proxy:
   62        yield proxy
   63 
   64 
   65async def test_delay_recv(service_client, loop, monitor_client, gate):
   66    await service_client.reset_metrics()
   67    timeout = 10.0
   68 
   69    
   70    gate.to_client_delay(timeout)
   71 
   72    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
   73    await loop.sock_connect(sock, gate.get_sockname_for_clients())
   74    sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
   75 
   76    recv_task = asyncio.create_task(recv_all_data(sock, loop))
   77    await send_all_data(sock, loop)
   78 
   79    done, _ = await asyncio.wait(
   80        [recv_task], timeout=timeout / 2, return_when=asyncio.FIRST_COMPLETED,
   81    )
   82    assert not done
   83 
   84    gate.to_client_pass()
   85 
   86    await recv_task
   87    metrics = await monitor_client.metrics(prefix='tcp-echo.')
   88    assert metrics.value_at('tcp-echo.sockets.opened') == 1
   89    assert metrics.value_at('tcp-echo.sockets.closed') == 0
   90    assert metrics.value_at('tcp-echo.bytes.read') == DATA_LENGTH
   91 
   92 
   93async def test_data_combine(service_client, loop, monitor_client, gate):
   94    await service_client.reset_metrics()
   95    gate.to_client_concat_packets(DATA_LENGTH)
   96 
   97    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
   98    await loop.sock_connect(sock, gate.get_sockname_for_clients())
   99    sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
  100 
  101    send_task = asyncio.create_task(send_all_data(sock, loop))
  102    await recv_all_data(sock, loop)
  103    await send_task
  104 
  105    gate.to_client_pass()
  106 
  107    metrics = await monitor_client.metrics(prefix='tcp-echo.')
  108    assert metrics.value_at('tcp-echo.sockets.opened') == 1
  109    assert metrics.value_at('tcp-echo.sockets.closed') == 0
  110    assert metrics.value_at('tcp-echo.bytes.read') == DATA_LENGTH
  111 
  112 
  113async def test_down_pending_recv(service_client, loop, gate):
  114    gate.to_client_noop()
  115 
  116    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  117    await loop.sock_connect(sock, gate.get_sockname_for_clients())
  118    sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
  119 
  120    async def _recv_no_data():
  121        answer = b''
  122        try:
  123            while True:
  124                answer += await loop.sock_recv(sock, 2)
  125                assert False
  126        except Exception:  
  127            pass
  128 
  129        assert answer == b''
  130 
  131    recv_task = asyncio.create_task(_recv_no_data())
  132 
  133    await send_all_data(sock, loop)
  134 
  135    await asyncio.wait(
  136        [recv_task], timeout=1, return_when=asyncio.FIRST_COMPLETED,
  137    )
  138    await gate.sockets_close()
  139    await recv_task
  140    assert gate.connections_count() == 0
  141 
  142    gate.to_client_pass()
  143 
  144    sock2 = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  145    sock2.connect(gate.get_sockname_for_clients())
  146    await loop.sock_sendall(sock2, b'hi')
  147    hello = await loop.sock_recv(sock2, 2)
  148    assert hello == b'hi'
  149    assert gate.connections_count() == 1
  150 
  151 
  152async def test_multiple_socks(
  153        service_client, loop, monitor_client, tcp_service_port,
  154):
  155    await service_client.reset_metrics()
  156    sockets_count = 250
  157 
  158    tasks = []
  159    for _ in range(sockets_count):
  160        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  161        await loop.sock_connect(sock, ('localhost', tcp_service_port))
  162        sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
  163        tasks.append(asyncio.create_task(send_all_data(sock, loop)))
  164        tasks.append(asyncio.create_task(recv_all_data(sock, loop)))
  165    await asyncio.gather(*tasks)
  166 
  167    metrics = await monitor_client.metrics(prefix='tcp-echo.')
  168    assert metrics.value_at('tcp-echo.sockets.opened') == sockets_count
  169    assert (
  170        metrics.value_at('tcp-echo.bytes.read') == DATA_LENGTH * sockets_count
  171    )
  172 
  173 
  174async def test_multiple_send_only(
  175        service_client, loop, monitor_client, tcp_service_port,
  176):
  177    await service_client.reset_metrics()
  178    sockets_count = 25
  179 
  180    tasks = []
  181    for _ in range(sockets_count):
  182        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  183        await loop.sock_connect(sock, ('localhost', tcp_service_port))
  184        sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
  185        tasks.append(asyncio.create_task(send_all_data(sock, loop)))
  186    await asyncio.gather(*tasks)
  187 
  188 
  189async def test_metrics_smoke(monitor_client):
  190    metrics = await monitor_client.metrics()
  191    assert len(metrics) > 1