userver: postgresql/functional_tests/basic_chaos/tests/test_postgres.py
⚠️ This is the documentation for an old userver version. Click here to switch to the latest version.
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages Concepts
postgresql/functional_tests/basic_chaos/tests/test_postgres.py
1import logging
2import socket
3
4import pytest
5
6from pytest_userver import chaos
7
8import utils
9
10
11DATA_TRANSMISSION_DELAY = 1
12BYTES_PER_SECOND_LIMIT = 10
13CONNECTION_TIME_LIMIT = 0.4
14CONNECTION_LIMIT_JITTER = 0.004
15FAILURE_RETRIES = 250
16DATA_PARTS_MAX_SIZE = 40
17BYTES_TRANSMISSION_LIMIT = 1024
18
19SELECT_URL = '/chaos/postgres?type=select'
20SELECT_SMALL_TIMEOUT_URL = '/chaos/postgres?type=select-small-timeout'
21
22
23logger = logging.getLogger(__name__)
24
25
26# /// [restore]
27async def _check_that_restores(service_client, gate):
28 gate.to_server_pass()
29 gate.to_client_pass()
30 gate.start_accepting()
31
32 await utils.consume_dead_db_connections(service_client)
33
34 logger.debug('Starting "_check_that_restores" wait for 200')
35 response = await service_client.get(SELECT_URL)
36 assert response.status == 200, 'Bad results after connection restore'
37 logger.debug('End of "_check_that_restores" wait for 200')
38 # /// [restore]
39
40
41async def test_pg_fine(service_client, gate):
42 response = await service_client.get(SELECT_URL)
43 assert response.status == 200
44
45
46async def test_pg_overload_no_accepts(service_client, gate):
47 # Get a connection, if there is no one
48 response = await service_client.get(SELECT_URL)
49 assert response.status == 200
50
51 await gate.stop_accepting()
52
53 # Use already opened connection
54 response = await service_client.get(SELECT_URL)
55 assert response.status == 200
56
57
58# /// [test cc]
59async def test_pg_congestion_control(service_client, gate):
60 gate.to_server_close_on_data()
61 gate.to_client_close_on_data()
62
63 for _ in range(gate.connections_count()):
64 response = await service_client.get(SELECT_SMALL_TIMEOUT_URL)
65 assert response.status == 500
66
67 await _check_that_restores(service_client, gate)
68 # /// [test cc]
69
70
71async def test_close_to_client_limit(service_client, gate):
72 for i in range(100, 250, 50):
73 gate.to_client_limit_bytes(i)
74 response = await service_client.get(SELECT_SMALL_TIMEOUT_URL)
75 assert response.status == 500, i
76
77 gate.to_client_pass()
78 await _check_that_restores(service_client, gate)
79
80
81async def test_close_to_server_limit(service_client, gate):
82 for i in range(100, 250, 50):
83 gate.to_server_limit_bytes(i)
84 response = await service_client.get(SELECT_SMALL_TIMEOUT_URL)
85 assert response.status == 500
86
87 gate.to_server_pass()
88 await _check_that_restores(service_client, gate)
89
90
91@pytest.mark.skip(
92 reason='Rarely breaks the server, and corrupted data can still be valid',
93)
94async def test_pg_corrupted_response(service_client, gate):
95 gate.to_client_corrupt_data()
96
97 for _ in range(gate.connections_count()):
98 response = await service_client.get(SELECT_URL)
99 assert response.status == 500
100
101 await _check_that_restores(service_client, gate)
102
103
104@pytest.mark.skip(reason='response.status == 200')
105async def test_network_delay_sends(service_client, gate):
106 gate.to_server_delay(DATA_TRANSMISSION_DELAY)
107
108 logger.debug('Starting "test_network_delay_sends" check for 500')
109 response = await service_client.get(SELECT_SMALL_TIMEOUT_URL)
110 assert response.status == 500
111 logger.debug('End of "test_network_delay_sends" check for 500')
112
113 await _check_that_restores(service_client, gate)
114
115
116@pytest.mark.skip(reason='response.status == 200')
117async def test_network_delay_recv(service_client, gate):
118 gate.to_client_delay(DATA_TRANSMISSION_DELAY)
119
120 response = await service_client.get(SELECT_SMALL_TIMEOUT_URL)
121 assert response.status == 500
122
123 await _check_that_restores(service_client, gate)
124
125
126async def test_network_delay(service_client, gate):
127 gate.to_server_delay(DATA_TRANSMISSION_DELAY)
128 gate.to_client_delay(DATA_TRANSMISSION_DELAY)
129
130 logger.debug('Starting "test_network_delay" check for 500')
131 response = await service_client.get(SELECT_SMALL_TIMEOUT_URL)
132 assert response.status == 500
133 logger.debug('End of "test_network_delay" check for 500')
134
135 await _check_that_restores(service_client, gate)
136
137
138async def test_network_limit_bps_sends(service_client, gate):
139 gate.to_server_limit_bps(BYTES_PER_SECOND_LIMIT)
140
141 logger.debug('Starting "test_network_limit_bps_sends" check for 500')
142 response = await service_client.get(SELECT_SMALL_TIMEOUT_URL)
143 assert response.status == 500
144 logger.debug('End of "test_network_limit_bps_sends" check for 500')
145
146 await _check_that_restores(service_client, gate)
147
148
149async def test_network_limit_bps_recv(service_client, gate):
150 gate.to_client_limit_bps(BYTES_PER_SECOND_LIMIT)
151
152 logger.debug('Starting "test_network_limit_bps_recv" check for 500')
153 response = await service_client.get(SELECT_SMALL_TIMEOUT_URL)
154 assert response.status == 500
155 logger.debug('End of "test_network_limit_bps_recv" check for 500')
156
157 await _check_that_restores(service_client, gate)
158
159
160async def test_network_limit_bps(service_client, gate):
161 gate.to_server_limit_bps(BYTES_PER_SECOND_LIMIT)
162 gate.to_client_limit_bps(BYTES_PER_SECOND_LIMIT)
163
164 logger.debug('Starting "test_network_limit_bps" check for 500')
165 response = await service_client.get(SELECT_SMALL_TIMEOUT_URL)
166 assert response.status == 500
167 logger.debug('End of "test_network_limit_bps" check for 500')
168
169 await _check_that_restores(service_client, gate)
170
171
172async def test_network_limit_time_sends(service_client, gate):
173 gate.to_server_limit_time(CONNECTION_TIME_LIMIT, CONNECTION_LIMIT_JITTER)
174
175 logger.debug('Starting "test_network_limit_time_sends" check for 500')
176 got_error = False
177 for _ in range(FAILURE_RETRIES):
178 response = await service_client.get(SELECT_SMALL_TIMEOUT_URL)
179 if response.status != 200:
180 got_error = True
181 break
182 logger.debug('End of "test_network_limit_time_sends" check for 500')
183
184 assert got_error, 'Previous steps unexpectedly finished with success'
185
186 await _check_that_restores(service_client, gate)
187
188
189async def test_network_limit_time_recv(service_client, gate):
190 gate.to_client_limit_time(CONNECTION_TIME_LIMIT, CONNECTION_LIMIT_JITTER)
191
192 logger.debug('Starting "test_network_limit_time_recv" check for 500')
193 got_error = False
194 for _ in range(FAILURE_RETRIES):
195 response = await service_client.get(SELECT_SMALL_TIMEOUT_URL)
196 if response.status != 200:
197 got_error = True
198 break
199 logger.debug('End of "test_network_limit_time_recv" check for 500')
200
201 assert got_error, 'Previous steps unexpectedly finished with success'
202
203 await _check_that_restores(service_client, gate)
204
205
206async def test_network_limit_time(service_client, gate):
207 gate.to_server_limit_time(CONNECTION_TIME_LIMIT, CONNECTION_LIMIT_JITTER)
208 gate.to_client_limit_time(CONNECTION_TIME_LIMIT, CONNECTION_LIMIT_JITTER)
209
210 logger.debug('Starting "test_network_limit_time" check for 500')
211 got_error = False
212 for _ in range(FAILURE_RETRIES):
213 response = await service_client.get(SELECT_SMALL_TIMEOUT_URL)
214 if response.status != 200:
215 got_error = True
216 break
217 logger.debug('End of "test_network_limit_time" check for 500')
218
219 assert got_error, 'Previous steps unexpectedly finished with success'
220
221 await _check_that_restores(service_client, gate)
222
223
224async def test_network_smaller_parts_sends(service_client, gate):
225 gate.to_server_smaller_parts(DATA_PARTS_MAX_SIZE)
226
227 response = await service_client.get(SELECT_URL)
228 assert response.status == 200
229
230
231async def test_network_smaller_parts_recv(service_client, gate):
232 gate.to_client_smaller_parts(DATA_PARTS_MAX_SIZE)
233
234 response = await service_client.get(SELECT_URL)
235 assert response.status == 200
236
237
238async def test_network_smaller_parts(service_client, gate):
239 gate.to_server_smaller_parts(DATA_PARTS_MAX_SIZE)
240 gate.to_client_smaller_parts(DATA_PARTS_MAX_SIZE)
241
242 response = await service_client.get(SELECT_URL)
243 assert response.status == 200
244
245
246# TODO: timeout does not work!
247async def test_network_limit_bytes_sends(service_client, gate):
248 gate.to_server_limit_bytes(BYTES_TRANSMISSION_LIMIT)
249
250 logger.debug('Starting "test_network_limit_bytes_sends" check for 500')
251 got_error = False
252 for _ in range(FAILURE_RETRIES):
253 response = await service_client.get(SELECT_SMALL_TIMEOUT_URL)
254 if response.status != 200:
255 got_error = True
256 break
257 logger.debug('End of "test_network_limit_bytes_sends" check for 500')
258
259 assert got_error, 'Previous steps unexpectedly finished with success'
260
261 await _check_that_restores(service_client, gate)
262
263
264async def test_network_limit_bytes_recv(service_client, gate):
265 gate.to_client_limit_bytes(BYTES_TRANSMISSION_LIMIT)
266
267 logger.debug('Starting "test_network_limit_bytes_recv" check for 500')
268 got_error = False
269 for _ in range(FAILURE_RETRIES):
270 response = await service_client.get(SELECT_SMALL_TIMEOUT_URL)
271 if response.status != 200:
272 got_error = True
273 break
274 logger.debug('End of "test_network_limit_bytes_recv" check for 500')
275
276 assert got_error, 'Previous steps unexpectedly finished with success'
277
278 await _check_that_restores(service_client, gate)
279
280
281async def test_network_limit_bytes(service_client, gate):
282 gate.to_server_limit_bytes(BYTES_TRANSMISSION_LIMIT)
283 gate.to_client_limit_bytes(BYTES_TRANSMISSION_LIMIT)
284
285 logger.debug('Starting "test_network_limit_bytes" check for 500')
286 got_error = False
287 for _ in range(FAILURE_RETRIES):
288 response = await service_client.get(SELECT_SMALL_TIMEOUT_URL)
289 if response.status != 200:
290 got_error = True
291 break
292 logger.debug('End of "test_network_limit_bytes" check for 500')
293
294 assert got_error, 'Previous steps unexpectedly finished with success'
295
296 await _check_that_restores(service_client, gate)
297
298
299async def _intercept_server_terminated(
300 loop, socket_from: socket.socket, socket_to: socket.socket,
301) -> None:
302 error_msg = (
303 b'E\x00\x00\x00tSFATAL\x00VFATAL\x00C57P01\x00'
304 b'Mterminating connection due to administrator command\x00'
305 b'Fpostgres.c\x00L3218\x00RProcessInterrupts\x00\x00'
306 )
307 ready_for_query = b'Z\x00\x00\x00\x05'
308
309 # Wait until we get the entire server response,
310 # then send an error message instead of 'Z' and
311 # close the socket immediately after that.
312 data = b''
313 n = -1
314 while n < 0:
315 data += await loop.sock_recv(socket_from, 4096)
316 n = data.find(ready_for_query)
317 await loop.sock_sendall(socket_to, data[:n])
318 await loop.sock_sendall(socket_to, error_msg)
319 raise chaos.GateInterceptException('Closing socket after error')
320
321
322async def test_close_with_error(service_client, gate, testpoint):
323 should_close = False
324
325 @testpoint('after_trx_begin')
326 async def _hook(_data):
327 if should_close:
328 gate.set_to_client_interceptor(_intercept_server_terminated)
329
330 response = await service_client.get(SELECT_SMALL_TIMEOUT_URL)
331 assert response.status == 200
332
333 should_close = True
334 gate.set_to_client_interceptor(_intercept_server_terminated)
335
336 response = await service_client.get(SELECT_SMALL_TIMEOUT_URL)
337 assert response.status == 500
338
339 should_close = False
340 await _check_that_restores(service_client, gate)