userver: samples/tcp_full_duplex_service/tests/test_echo.py
Loading...
Searching...
No Matches
samples/tcp_full_duplex_service/tests/test_echo.py
1# /// [Functional test]
2import asyncio
3import socket
4
5import pytest
6from pytest_userver import chaos
7
8DATA = (
9 b'Once upon a midnight dreary, while I pondered, weak and weary, ',
10 b'Over many a quaint and curious volume of forgotten lore - ',
11 b'While I nodded, nearly napping, suddenly there came a tapping, ',
12 b'As of some one gently rapping, rapping at my chamber door - ',
13 b'"Tis some visitor," I muttered, "tapping at my chamber door - ',
14 b'Only this and nothing more."',
15)
16DATA_LENGTH = sum(len(x) for x in DATA)
17
18
19# Another way to say that monitor handlers listen for the main service port
20@pytest.fixture(scope='session')
21def monitor_port(service_port) -> int:
22 return service_port
23
24
25async def send_all_data(sock, loop):
26 for data in DATA:
27 await loop.sock_sendall(sock, data)
28
29
30async def recv_all_data(sock, loop):
31 answer = b''
32 while len(answer) < DATA_LENGTH:
33 answer += await loop.sock_recv(sock, DATA_LENGTH - len(answer))
34
35 assert answer == b''.join(DATA)
36
37
38async def test_basic(service_client, loop, monitor_client, tcp_service_port):
39 await service_client.reset_metrics()
40
41 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
42 await loop.sock_connect(sock, ('localhost', tcp_service_port))
43
44 send_task = asyncio.create_task(send_all_data(sock, loop))
45 await recv_all_data(sock, loop)
46 await send_task
47 metrics = await monitor_client.metrics(prefix='tcp-echo.')
48 assert metrics.value_at('tcp-echo.sockets.opened') == 1
49 assert metrics.value_at('tcp-echo.sockets.closed') == 0
50 assert metrics.value_at('tcp-echo.bytes.read') == DATA_LENGTH
51 # /// [Functional test]
52
53
54@pytest.fixture(name='gate', scope='function')
55async def _gate(loop, tcp_service_port):
56 gate_config = chaos.GateRoute(
57 name='tcp proxy',
58 host_to_server='localhost',
59 port_to_server=tcp_service_port,
60 )
61 async with chaos.TcpGate(gate_config, loop) as proxy:
62 yield proxy
63
64
65async def test_delay_recv(service_client, loop, monitor_client, gate):
66 await service_client.reset_metrics()
67 timeout = 10.0
68
69 # respond with delay in TIMEOUT seconds
70 gate.to_client_delay(timeout)
71
72 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
73 await loop.sock_connect(sock, gate.get_sockname_for_clients())
74 sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
75
76 recv_task = asyncio.create_task(recv_all_data(sock, loop))
77 await send_all_data(sock, loop)
78
79 done, _ = await asyncio.wait(
80 [recv_task],
81 timeout=timeout / 2,
82 return_when=asyncio.FIRST_COMPLETED,
83 )
84 assert not done
85
86 gate.to_client_pass()
87
88 await recv_task
89 metrics = await monitor_client.metrics(prefix='tcp-echo.')
90 assert metrics.value_at('tcp-echo.sockets.opened') == 1
91 assert metrics.value_at('tcp-echo.sockets.closed') == 0
92 assert metrics.value_at('tcp-echo.bytes.read') == DATA_LENGTH
93
94
95async def test_data_combine(service_client, loop, monitor_client, gate):
96 await service_client.reset_metrics()
97 gate.to_client_concat_packets(DATA_LENGTH)
98
99 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
100 await loop.sock_connect(sock, gate.get_sockname_for_clients())
101 sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
102
103 send_task = asyncio.create_task(send_all_data(sock, loop))
104 await recv_all_data(sock, loop)
105 await send_task
106
107 gate.to_client_pass()
108
109 metrics = await monitor_client.metrics(prefix='tcp-echo.')
110 assert metrics.value_at('tcp-echo.sockets.opened') == 1
111 assert metrics.value_at('tcp-echo.sockets.closed') == 0
112 assert metrics.value_at('tcp-echo.bytes.read') == DATA_LENGTH
113
114
115async def test_down_pending_recv(service_client, loop, gate):
116 gate.to_client_noop()
117
118 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
119 await loop.sock_connect(sock, gate.get_sockname_for_clients())
120 sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
121
122 async def _recv_no_data():
123 answer = b''
124 try:
125 while True:
126 answer += await loop.sock_recv(sock, 2)
127 assert False
128 except Exception: # pylint: disable=broad-except
129 pass
130
131 assert answer == b''
132
133 recv_task = asyncio.create_task(_recv_no_data())
134
135 await send_all_data(sock, loop)
136
137 await asyncio.wait(
138 [recv_task],
139 timeout=1,
140 return_when=asyncio.FIRST_COMPLETED,
141 )
142 await gate.sockets_close()
143 await recv_task
144 assert gate.connections_count() == 0
145
146 gate.to_client_pass()
147
148 sock2 = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
149 sock2.connect(gate.get_sockname_for_clients())
150 await loop.sock_sendall(sock2, b'hi')
151 hello = await loop.sock_recv(sock2, 2)
152 assert hello == b'hi'
153 assert gate.connections_count() == 1
154
155
156async def test_multiple_socks(
157 service_client,
158 loop,
159 monitor_client,
160 tcp_service_port,
161):
162 await service_client.reset_metrics()
163 sockets_count = 250
164
165 tasks = []
166 for _ in range(sockets_count):
167 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
168 await loop.sock_connect(sock, ('localhost', tcp_service_port))
169 sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
170 tasks.append(asyncio.create_task(send_all_data(sock, loop)))
171 tasks.append(asyncio.create_task(recv_all_data(sock, loop)))
172 await asyncio.gather(*tasks)
173
174 metrics = await monitor_client.metrics(prefix='tcp-echo.')
175 assert metrics.value_at('tcp-echo.sockets.opened') == sockets_count
176 assert metrics.value_at('tcp-echo.bytes.read') == DATA_LENGTH * sockets_count
177
178
179async def test_multiple_send_only(
180 service_client,
181 loop,
182 monitor_client,
183 tcp_service_port,
184):
185 await service_client.reset_metrics()
186 sockets_count = 25
187
188 tasks = []
189 for _ in range(sockets_count):
190 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
191 await loop.sock_connect(sock, ('localhost', tcp_service_port))
192 sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
193 tasks.append(asyncio.create_task(send_all_data(sock, loop)))
194 await asyncio.gather(*tasks)
195
196
197async def test_metrics_smoke(monitor_client):
198 metrics = await monitor_client.metrics()
199 assert len(metrics) > 1