userver: samples/tcp_full_duplex_service/tests/test_echo.py
⚠️ This is the documentation for an old userver version. Click here to switch to the latest version.
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages Concepts
samples/tcp_full_duplex_service/tests/test_echo.py
1# /// [Functional test]
2import asyncio
3import socket
4
5import pytest
6from pytest_userver import chaos
7
8DATA = (
9 b'Once upon a midnight dreary, while I pondered, weak and weary, ',
10 b'Over many a quaint and curious volume of forgotten lore - ',
11 b'While I nodded, nearly napping, suddenly there came a tapping, ',
12 b'As of some one gently rapping, rapping at my chamber door - ',
13 b'"Tis some visitor," I muttered, "tapping at my chamber door - ',
14 b'Only this and nothing more."',
15)
16DATA_LENGTH = sum(len(x) for x in DATA)
17
18
19# Another way to say that monitor handlers listen for the main service port
20@pytest.fixture(scope='session')
21def monitor_port(service_port) -> int:
22 return service_port
23
24
25async def send_all_data(sock, loop):
26 for data in DATA:
27 await loop.sock_sendall(sock, data)
28
29
30async def recv_all_data(sock, loop):
31 answer = b''
32 while len(answer) < DATA_LENGTH:
33 answer += await loop.sock_recv(sock, DATA_LENGTH - len(answer))
34
35 assert answer == b''.join(DATA)
36
37
38async def test_basic(service_client, loop, monitor_client, tcp_service_port):
39 await service_client.reset_metrics()
40
41 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
42 await loop.sock_connect(sock, ('localhost', tcp_service_port))
43
44 send_task = asyncio.create_task(send_all_data(sock, loop))
45 await recv_all_data(sock, loop)
46 await send_task
47 metrics = await monitor_client.metrics(prefix='tcp-echo.')
48 assert metrics.value_at('tcp-echo.sockets.opened') == 1
49 assert metrics.value_at('tcp-echo.sockets.closed') == 0
50 assert metrics.value_at('tcp-echo.bytes.read') == DATA_LENGTH
51 # /// [Functional test]
52
53
54@pytest.fixture(name='gate', scope='function')
55async def _gate(loop, tcp_service_port):
56 gate_config = chaos.GateRoute(
57 name='tcp proxy',
58 host_to_server='localhost',
59 port_to_server=tcp_service_port,
60 )
61 async with chaos.TcpGate(gate_config, loop) as proxy:
62 yield proxy
63
64
65async def test_delay_recv(service_client, loop, monitor_client, gate):
66 await service_client.reset_metrics()
67 timeout = 10.0
68
69 # respond with delay in TIMEOUT seconds
70 gate.to_client_delay(timeout)
71
72 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
73 await loop.sock_connect(sock, gate.get_sockname_for_clients())
74 sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
75
76 recv_task = asyncio.create_task(recv_all_data(sock, loop))
77 await send_all_data(sock, loop)
78
79 done, _ = await asyncio.wait(
80 [recv_task], timeout=timeout / 2, return_when=asyncio.FIRST_COMPLETED,
81 )
82 assert not done
83
84 gate.to_client_pass()
85
86 await recv_task
87 metrics = await monitor_client.metrics(prefix='tcp-echo.')
88 assert metrics.value_at('tcp-echo.sockets.opened') == 1
89 assert metrics.value_at('tcp-echo.sockets.closed') == 0
90 assert metrics.value_at('tcp-echo.bytes.read') == DATA_LENGTH
91
92
93async def test_data_combine(service_client, loop, monitor_client, gate):
94 await service_client.reset_metrics()
95 gate.to_client_concat_packets(DATA_LENGTH)
96
97 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
98 await loop.sock_connect(sock, gate.get_sockname_for_clients())
99 sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
100
101 send_task = asyncio.create_task(send_all_data(sock, loop))
102 await recv_all_data(sock, loop)
103 await send_task
104
105 gate.to_client_pass()
106
107 metrics = await monitor_client.metrics(prefix='tcp-echo.')
108 assert metrics.value_at('tcp-echo.sockets.opened') == 1
109 assert metrics.value_at('tcp-echo.sockets.closed') == 0
110 assert metrics.value_at('tcp-echo.bytes.read') == DATA_LENGTH
111
112
113async def test_down_pending_recv(service_client, loop, gate):
114 gate.to_client_noop()
115
116 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
117 await loop.sock_connect(sock, gate.get_sockname_for_clients())
118 sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
119
120 async def _recv_no_data():
121 answer = b''
122 try:
123 while True:
124 answer += await loop.sock_recv(sock, 2)
125 assert False
126 except Exception: # pylint: disable=broad-except
127 pass
128
129 assert answer == b''
130
131 recv_task = asyncio.create_task(_recv_no_data())
132
133 await send_all_data(sock, loop)
134
135 await asyncio.wait(
136 [recv_task], timeout=1, return_when=asyncio.FIRST_COMPLETED,
137 )
138 await gate.sockets_close()
139 await recv_task
140 assert gate.connections_count() == 0
141
142 gate.to_client_pass()
143
144 sock2 = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
145 sock2.connect(gate.get_sockname_for_clients())
146 await loop.sock_sendall(sock2, b'hi')
147 hello = await loop.sock_recv(sock2, 2)
148 assert hello == b'hi'
149 assert gate.connections_count() == 1
150
151
152async def test_multiple_socks(
153 service_client, loop, monitor_client, tcp_service_port,
154):
155 await service_client.reset_metrics()
156 sockets_count = 250
157
158 tasks = []
159 for _ in range(sockets_count):
160 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
161 await loop.sock_connect(sock, ('localhost', tcp_service_port))
162 sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
163 tasks.append(asyncio.create_task(send_all_data(sock, loop)))
164 tasks.append(asyncio.create_task(recv_all_data(sock, loop)))
165 await asyncio.gather(*tasks)
166
167 metrics = await monitor_client.metrics(prefix='tcp-echo.')
168 assert metrics.value_at('tcp-echo.sockets.opened') == sockets_count
169 assert (
170 metrics.value_at('tcp-echo.bytes.read') == DATA_LENGTH * sockets_count
171 )
172
173
174async def test_multiple_send_only(
175 service_client, loop, monitor_client, tcp_service_port,
176):
177 await service_client.reset_metrics()
178 sockets_count = 25
179
180 tasks = []
181 for _ in range(sockets_count):
182 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
183 await loop.sock_connect(sock, ('localhost', tcp_service_port))
184 sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
185 tasks.append(asyncio.create_task(send_all_data(sock, loop)))
186 await asyncio.gather(*tasks)
187
188
189async def test_metrics_smoke(monitor_client):
190 metrics = await monitor_client.metrics()
191 assert len(metrics) > 1