1
2import asyncio
3import socket
4
5import pytest
6from pytest_userver import chaos
7
8DATA = (
9 b'Once upon a midnight dreary, while I pondered, weak and weary, ',
10 b'Over many a quaint and curious volume of forgotten lore - ',
11 b'While I nodded, nearly napping, suddenly there came a tapping, ',
12 b'As of some one gently rapping, rapping at my chamber door - ',
13 b'"Tis some visitor," I muttered, "tapping at my chamber door - ',
14 b'Only this and nothing more."',
15)
16DATA_LENGTH = sum(len(x) for x in DATA)
17
18
19
20@pytest.fixture(scope='session')
21def monitor_port(service_port) -> int:
22 return service_port
23
24
25async def send_all_data(sock):
26 for data in DATA:
27 await sock.sendall(data)
28
29
30async def recv_all_data(sock):
31 answer = b''
32 while len(answer) < DATA_LENGTH:
33 answer += await sock.recv(DATA_LENGTH - len(answer))
34
35 assert answer == b''.join(DATA)
36
37
38async def test_basic(service_client, asyncio_socket, monitor_client, tcp_service_port):
39 sock = asyncio_socket.tcp()
40 async with monitor_client.metrics_diff(prefix='tcp-echo') as differ:
41 await sock.connect(('localhost', tcp_service_port))
42
43 send_task = asyncio.create_task(send_all_data(sock))
44 await recv_all_data(sock)
45 await send_task
46
47 assert differ.value_at('sockets.opened') == 1
48 assert differ.value_at('sockets.closed') == 0
49 assert differ.value_at('bytes.read') == DATA_LENGTH
50
51
52
53@pytest.fixture(name='gate')
54async def _gate(tcp_service_port):
55 gate_config = chaos.GateRoute(
56 name='tcp proxy',
57 host_to_server='localhost',
58 port_to_server=tcp_service_port,
59 )
60 async with chaos.TcpGate(gate_config) as proxy:
61 yield proxy
62
63
64async def test_delay_recv(service_client, asyncio_socket, monitor_client, gate):
65 await service_client.reset_metrics()
66 timeout = 2.0
67
68
69 await gate.to_client_delay(timeout)
70
71 sock = asyncio_socket.tcp()
72 await sock.connect(gate.get_sockname_for_clients())
73 sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
74
75 recv_task = asyncio.create_task(recv_all_data(sock))
76 await send_all_data(sock)
77
78 done, _ = await asyncio.wait(
79 [recv_task],
80 timeout=timeout / 2,
81 return_when=asyncio.FIRST_COMPLETED,
82 )
83 assert not done
84
85 await gate.to_client_pass()
86
87 await recv_task
88 metrics = await monitor_client.metrics(prefix='tcp-echo.')
89 assert metrics.value_at('tcp-echo.sockets.opened') == 1
90 assert metrics.value_at('tcp-echo.sockets.closed') == 0
91 assert metrics.value_at('tcp-echo.bytes.read') == DATA_LENGTH
92
93
94async def test_data_combine(asyncio_socket, service_client, monitor_client, gate):
95 await service_client.reset_metrics()
96 await gate.to_client_concat_packets(DATA_LENGTH)
97
98 sock = asyncio_socket.socket()
99 await sock.connect(gate.get_sockname_for_clients())
100 sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
101
102 send_task = asyncio.create_task(send_all_data(sock))
103 await recv_all_data(sock)
104 await send_task
105
106 await gate.to_client_pass()
107
108 metrics = await monitor_client.metrics(prefix='tcp-echo.')
109 assert metrics.value_at('tcp-echo.sockets.opened') == 1
110 assert metrics.value_at('tcp-echo.sockets.closed') == 0
111 assert metrics.value_at('tcp-echo.bytes.read') == DATA_LENGTH
112
113
114async def test_down_pending_recv(service_client, asyncio_socket, gate):
115 drop_queue = await gate.to_client_drop()
116
117 sock = asyncio_socket.tcp()
118 await sock.connect(gate.get_sockname_for_clients())
119 sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
120
121 async def _recv_no_data():
122 data = await sock.recv(2)
123 assert data == b''
124
125 await send_all_data(sock)
126 await drop_queue.wait_call()
127 await gate.sockets_close()
128
129 assert gate.connections_count() == 0
130 await asyncio.wait_for(_recv_no_data(), timeout=10.0)
131
132 await gate.to_client_pass()
133
134 sock2 = asyncio_socket.tcp()
135 await sock2.connect(gate.get_sockname_for_clients())
136 await sock2.sendall(b'hi')
137 hello = await sock2.recv(2)
138 assert hello == b'hi'
139 assert gate.connections_count() == 1
140
141
142async def test_multiple_socks(
143 asyncio_socket,
144 service_client,
145 monitor_client,
146 tcp_service_port,
147):
148 await service_client.reset_metrics()
149 sockets_count = 250
150
151 tasks = []
152 for _ in range(sockets_count):
153 sock = asyncio_socket.tcp()
154 await sock.connect(('localhost', tcp_service_port))
155 sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
156 tasks.append(asyncio.create_task(send_all_data(sock)))
157 tasks.append(asyncio.create_task(recv_all_data(sock)))
158 await asyncio.gather(*tasks)
159
160 metrics = await monitor_client.metrics(prefix='tcp-echo.')
161 assert metrics.value_at('tcp-echo.sockets.opened') == sockets_count
162 assert metrics.value_at('tcp-echo.bytes.read') == DATA_LENGTH * sockets_count
163
164
165async def test_multiple_send_only(
166 asyncio_socket,
167 service_client,
168 monitor_client,
169 tcp_service_port,
170):
171 await service_client.reset_metrics()
172 sockets_count = 25
173
174 tasks = []
175 for _ in range(sockets_count):
176 sock = asyncio_socket.tcp()
177 await sock.connect(('localhost', tcp_service_port))
178 sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
179 tasks.append(asyncio.create_task(send_all_data(sock)))
180 await asyncio.gather(*tasks)
181
182
183async def test_metrics_smoke(monitor_client):
184 metrics = await monitor_client.metrics()
185 assert len(metrics) > 1