1
2import asyncio
3import socket
4
5import pytest
6from pytest_userver import chaos
7
8DATA = (
9 b'Once upon a midnight dreary, while I pondered, weak and weary, ',
10 b'Over many a quaint and curious volume of forgotten lore - ',
11 b'While I nodded, nearly napping, suddenly there came a tapping, ',
12 b'As of some one gently rapping, rapping at my chamber door - ',
13 b'"Tis some visitor," I muttered, "tapping at my chamber door - ',
14 b'Only this and nothing more."',
15)
16DATA_LENGTH = sum(len(x) for x in DATA)
17
18
19
20@pytest.fixture(scope='session')
21def monitor_port(service_port) -> int:
22 return service_port
23
24
25async def send_all_data(sock, loop):
26 for data in DATA:
27 await loop.sock_sendall(sock, data)
28
29
30async def recv_all_data(sock, loop):
31 answer = b''
32 while len(answer) < DATA_LENGTH:
33 answer += await loop.sock_recv(sock, DATA_LENGTH - len(answer))
34
35 assert answer == b''.join(DATA)
36
37
38async def test_basic(service_client, loop, monitor_client, tcp_service_port):
39 await service_client.reset_metrics()
40
41 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
42 await loop.sock_connect(sock, ('localhost', tcp_service_port))
43
44 send_task = asyncio.create_task(send_all_data(sock, loop))
45 await recv_all_data(sock, loop)
46 await send_task
47 metrics = await monitor_client.metrics(prefix='tcp-echo.')
48 assert metrics.value_at('tcp-echo.sockets.opened') == 1
49 assert metrics.value_at('tcp-echo.sockets.closed') == 0
50 assert metrics.value_at('tcp-echo.bytes.read') == DATA_LENGTH
51
52
53
54@pytest.fixture(name='gate', scope='function')
55async def _gate(loop, tcp_service_port):
56 gate_config = chaos.GateRoute(
57 name='tcp proxy',
58 host_to_server='localhost',
59 port_to_server=tcp_service_port,
60 )
61 async with chaos.TcpGate(gate_config, loop) as proxy:
62 yield proxy
63
64
65async def test_delay_recv(service_client, loop, monitor_client, gate):
66 await service_client.reset_metrics()
67 timeout = 10.0
68
69
70 gate.to_client_delay(timeout)
71
72 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
73 await loop.sock_connect(sock, gate.get_sockname_for_clients())
74 sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
75
76 recv_task = asyncio.create_task(recv_all_data(sock, loop))
77 await send_all_data(sock, loop)
78
79 done, _ = await asyncio.wait(
80 [recv_task], timeout=timeout / 2, return_when=asyncio.FIRST_COMPLETED,
81 )
82 assert not done
83
84 gate.to_client_pass()
85
86 await recv_task
87 metrics = await monitor_client.metrics(prefix='tcp-echo.')
88 assert metrics.value_at('tcp-echo.sockets.opened') == 1
89 assert metrics.value_at('tcp-echo.sockets.closed') == 0
90 assert metrics.value_at('tcp-echo.bytes.read') == DATA_LENGTH
91
92
93async def test_data_combine(service_client, loop, monitor_client, gate):
94 await service_client.reset_metrics()
95 gate.to_client_concat_packets(DATA_LENGTH)
96
97 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
98 await loop.sock_connect(sock, gate.get_sockname_for_clients())
99 sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
100
101 send_task = asyncio.create_task(send_all_data(sock, loop))
102 await recv_all_data(sock, loop)
103 await send_task
104
105 gate.to_client_pass()
106
107 metrics = await monitor_client.metrics(prefix='tcp-echo.')
108 assert metrics.value_at('tcp-echo.sockets.opened') == 1
109 assert metrics.value_at('tcp-echo.sockets.closed') == 0
110 assert metrics.value_at('tcp-echo.bytes.read') == DATA_LENGTH
111
112
113async def test_down_pending_recv(service_client, loop, gate):
114 gate.to_client_noop()
115
116 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
117 await loop.sock_connect(sock, gate.get_sockname_for_clients())
118 sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
119
120 async def _recv_no_data():
121 answer = b''
122 try:
123 while True:
124 answer += await loop.sock_recv(sock, 2)
125 assert False
126 except Exception:
127 pass
128
129 assert answer == b''
130
131 recv_task = asyncio.create_task(_recv_no_data())
132
133 await send_all_data(sock, loop)
134
135 await asyncio.wait(
136 [recv_task], timeout=1, return_when=asyncio.FIRST_COMPLETED,
137 )
138 await gate.sockets_close()
139 await recv_task
140 assert gate.connections_count() == 0
141
142 gate.to_client_pass()
143
144 sock2 = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
145 sock2.connect(gate.get_sockname_for_clients())
146 await loop.sock_sendall(sock2, b'hi')
147 hello = await loop.sock_recv(sock2, 2)
148 assert hello == b'hi'
149 assert gate.connections_count() == 1
150
151
152async def test_multiple_socks(
153 service_client, loop, monitor_client, tcp_service_port,
154):
155 await service_client.reset_metrics()
156 sockets_count = 250
157
158 tasks = []
159 for _ in range(sockets_count):
160 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
161 await loop.sock_connect(sock, ('localhost', tcp_service_port))
162 sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
163 tasks.append(asyncio.create_task(send_all_data(sock, loop)))
164 tasks.append(asyncio.create_task(recv_all_data(sock, loop)))
165 await asyncio.gather(*tasks)
166
167 metrics = await monitor_client.metrics(prefix='tcp-echo.')
168 assert metrics.value_at('tcp-echo.sockets.opened') == sockets_count
169 assert (
170 metrics.value_at('tcp-echo.bytes.read') == DATA_LENGTH * sockets_count
171 )
172
173
174async def test_multiple_send_only(
175 service_client, loop, monitor_client, tcp_service_port,
176):
177 await service_client.reset_metrics()
178 sockets_count = 25
179
180 tasks = []
181 for _ in range(sockets_count):
182 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
183 await loop.sock_connect(sock, ('localhost', tcp_service_port))
184 sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
185 tasks.append(asyncio.create_task(send_all_data(sock, loop)))
186 await asyncio.gather(*tasks)
187
188
189async def test_metrics_smoke(monitor_client):
190 metrics = await monitor_client.metrics()
191 assert len(metrics) > 1