1
2import asyncio
3import socket
4
5import pytest
6from pytest_userver import chaos
7
8DATA = (
9 b'Once upon a midnight dreary, while I pondered, weak and weary, ',
10 b'Over many a quaint and curious volume of forgotten lore - ',
11 b'While I nodded, nearly napping, suddenly there came a tapping, ',
12 b'As of some one gently rapping, rapping at my chamber door - ',
13 b'"Tis some visitor," I muttered, "tapping at my chamber door - ',
14 b'Only this and nothing more."',
15)
16DATA_LENGTH = sum(len(x) for x in DATA)
17
18
19
20@pytest.fixture(scope='session')
21def monitor_port(service_port) -> int:
22 return service_port
23
24
25async def send_all_data(sock):
26 for data in DATA:
27 await sock.sendall(data)
28
29
30async def recv_all_data(sock):
31 answer = b''
32 while len(answer) < DATA_LENGTH:
33 answer += await sock.recv(DATA_LENGTH - len(answer))
34
35 assert answer == b''.join(DATA)
36
37
38async def test_basic(service_client, asyncio_socket, monitor_client, tcp_service_port):
39 await service_client.reset_metrics()
40
41 sock = asyncio_socket.tcp()
42 await sock.connect(('localhost', tcp_service_port))
43
44 send_task = asyncio.create_task(send_all_data(sock))
45 await recv_all_data(sock)
46 await send_task
47 metrics = await monitor_client.metrics(prefix='tcp-echo.')
48 assert metrics.value_at('tcp-echo.sockets.opened') == 1
49 assert metrics.value_at('tcp-echo.sockets.closed') == 0
50 assert metrics.value_at('tcp-echo.bytes.read') == DATA_LENGTH
51
52
53
54@pytest.fixture(name='gate', scope='function')
55async def _gate(tcp_service_port):
56 gate_config = chaos.GateRoute(
57 name='tcp proxy',
58 host_to_server='localhost',
59 port_to_server=tcp_service_port,
60 )
61 async with chaos.TcpGate(gate_config) as proxy:
62 yield proxy
63
64
65async def test_delay_recv(service_client, asyncio_socket, monitor_client, gate):
66 await service_client.reset_metrics()
67 timeout = 2.0
68
69
70 await gate.to_client_delay(timeout)
71
72 sock = asyncio_socket.tcp()
73 await sock.connect(gate.get_sockname_for_clients())
74 sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
75
76 recv_task = asyncio.create_task(recv_all_data(sock))
77 await send_all_data(sock)
78
79 done, _ = await asyncio.wait(
80 [recv_task],
81 timeout=timeout / 2,
82 return_when=asyncio.FIRST_COMPLETED,
83 )
84 assert not done
85
86 await gate.to_client_pass()
87
88 await recv_task
89 metrics = await monitor_client.metrics(prefix='tcp-echo.')
90 assert metrics.value_at('tcp-echo.sockets.opened') == 1
91 assert metrics.value_at('tcp-echo.sockets.closed') == 0
92 assert metrics.value_at('tcp-echo.bytes.read') == DATA_LENGTH
93
94
95async def test_data_combine(asyncio_socket, service_client, monitor_client, gate):
96 await service_client.reset_metrics()
97 await gate.to_client_concat_packets(DATA_LENGTH)
98
99 sock = asyncio_socket.socket()
100 await sock.connect(gate.get_sockname_for_clients())
101 sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
102
103 send_task = asyncio.create_task(send_all_data(sock))
104 await recv_all_data(sock)
105 await send_task
106
107 await gate.to_client_pass()
108
109 metrics = await monitor_client.metrics(prefix='tcp-echo.')
110 assert metrics.value_at('tcp-echo.sockets.opened') == 1
111 assert metrics.value_at('tcp-echo.sockets.closed') == 0
112 assert metrics.value_at('tcp-echo.bytes.read') == DATA_LENGTH
113
114
115async def test_down_pending_recv(service_client, asyncio_socket, gate):
116 drop_queue = await gate.to_client_drop()
117
118 sock = asyncio_socket.tcp()
119 await sock.connect(gate.get_sockname_for_clients())
120 sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
121
122 async def _recv_no_data():
123 data = await sock.recv(2)
124 assert data == b''
125
126 await send_all_data(sock)
127 await drop_queue.wait_call()
128 await gate.sockets_close()
129
130 assert gate.connections_count() == 0
131 await asyncio.wait_for(_recv_no_data(), timeout=10.0)
132
133 await gate.to_client_pass()
134
135 sock2 = asyncio_socket.tcp()
136 await sock2.connect(gate.get_sockname_for_clients())
137 await sock2.sendall(b'hi')
138 hello = await sock2.recv(2)
139 assert hello == b'hi'
140 assert gate.connections_count() == 1
141
142
143async def test_multiple_socks(
144 asyncio_socket,
145 service_client,
146 monitor_client,
147 tcp_service_port,
148):
149 await service_client.reset_metrics()
150 sockets_count = 250
151
152 tasks = []
153 for _ in range(sockets_count):
154 sock = asyncio_socket.tcp()
155 await sock.connect(('localhost', tcp_service_port))
156 sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
157 tasks.append(asyncio.create_task(send_all_data(sock)))
158 tasks.append(asyncio.create_task(recv_all_data(sock)))
159 await asyncio.gather(*tasks)
160
161 metrics = await monitor_client.metrics(prefix='tcp-echo.')
162 assert metrics.value_at('tcp-echo.sockets.opened') == sockets_count
163 assert metrics.value_at('tcp-echo.bytes.read') == DATA_LENGTH * sockets_count
164
165
166async def test_multiple_send_only(
167 asyncio_socket,
168 service_client,
169 monitor_client,
170 tcp_service_port,
171):
172 await service_client.reset_metrics()
173 sockets_count = 25
174
175 tasks = []
176 for _ in range(sockets_count):
177 sock = asyncio_socket.tcp()
178 await sock.connect(('localhost', tcp_service_port))
179 sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
180 tasks.append(asyncio.create_task(send_all_data(sock)))
181 await asyncio.gather(*tasks)
182
183
184async def test_metrics_smoke(monitor_client):
185 metrics = await monitor_client.metrics()
186 assert len(metrics) > 1