userver: postgresql/functional_tests/basic_chaos/tests/test_postgres.py
⚠️ This is the documentation for an old userver version. Click here to switch to the latest version.
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages Concepts
postgresql/functional_tests/basic_chaos/tests/test_postgres.py
1import asyncio
2import logging
3import socket
4
5import pytest
6from pytest_userver import chaos
7
8import utils
9
10
11DATA_TRANSMISSION_DELAY = 1
12BYTES_PER_SECOND_LIMIT = 10
13CONNECTION_TIME_LIMIT = 0.4
14CONNECTION_LIMIT_JITTER = 0.004
15FAILURE_RETRIES = 250
16DATA_PARTS_MAX_SIZE = 40
17BYTES_TRANSMISSION_LIMIT = 1024
18
19SELECT_URL = '/chaos/postgres?type=select'
20SELECT_SMALL_TIMEOUT_URL = '/chaos/postgres?type=select-small-timeout'
21
22
23logger = logging.getLogger(__name__)
24
25
26# /// [restore]
27async def _check_that_restores(service_client, gate):
28 gate.to_server_pass()
29 gate.to_client_pass()
30 gate.start_accepting()
31
32 await utils.consume_dead_db_connections(service_client)
33
34 logger.debug('Starting "_check_that_restores" wait for 200')
35 response = await service_client.get(SELECT_URL)
36 assert response.status == 200, 'Bad results after connection restore'
37 logger.debug('End of "_check_that_restores" wait for 200')
38 # /// [restore]
39
40
41async def test_pg_fine(service_client, gate):
42 response = await service_client.get(SELECT_URL)
43 assert response.status == 200
44
45
46async def test_pg_overload_no_accepts(service_client, gate):
47 # Get a connection, if there is no one
48 response = await service_client.get(SELECT_URL)
49 assert response.status == 200
50
51 await gate.stop_accepting()
52
53 # Use already opened connection
54 response = await service_client.get(SELECT_URL)
55 assert response.status == 200
56
57
58# /// [test cc]
59async def test_pg_congestion_control(service_client, gate):
60 gate.to_server_close_on_data()
61 gate.to_client_close_on_data()
62
63 for _ in range(gate.connections_count()):
64 response = await service_client.get(SELECT_SMALL_TIMEOUT_URL)
65 assert response.status == 500
66
67 await _check_that_restores(service_client, gate)
68 # /// [test cc]
69
70
71async def test_close_to_client_limit(service_client, gate):
72 for i in range(100, 250, 50):
73 gate.to_client_limit_bytes(i)
74 response = await service_client.get(SELECT_SMALL_TIMEOUT_URL)
75 assert response.status == 500, i
76
77 gate.to_client_pass()
78 await _check_that_restores(service_client, gate)
79
80
81async def test_close_to_server_limit(service_client, gate):
82 for i in range(100, 250, 50):
83 gate.to_server_limit_bytes(i)
84 response = await service_client.get(SELECT_SMALL_TIMEOUT_URL)
85 assert response.status == 500
86
87 gate.to_server_pass()
88 await _check_that_restores(service_client, gate)
89
90
91@pytest.mark.skip(
92 reason='Rarely breaks the server, and corrupted data can still be valid',
93)
94async def test_pg_corrupted_response(service_client, gate):
95 gate.to_client_corrupt_data()
96
97 for _ in range(gate.connections_count()):
98 response = await service_client.get(SELECT_URL)
99 assert response.status == 500
100
101 await _check_that_restores(service_client, gate)
102
103
104@pytest.mark.skip(reason='response.status == 200')
105async def test_network_delay_sends(service_client, gate):
106 gate.to_server_delay(DATA_TRANSMISSION_DELAY)
107
108 logger.debug('Starting "test_network_delay_sends" check for 500')
109 response = await service_client.get(SELECT_SMALL_TIMEOUT_URL)
110 assert response.status == 500
111 logger.debug('End of "test_network_delay_sends" check for 500')
112
113 await _check_that_restores(service_client, gate)
114
115
116@pytest.mark.skip(reason='response.status == 200')
117async def test_network_delay_recv(service_client, gate):
118 gate.to_client_delay(DATA_TRANSMISSION_DELAY)
119
120 response = await service_client.get(SELECT_SMALL_TIMEOUT_URL)
121 assert response.status == 500
122
123 await _check_that_restores(service_client, gate)
124
125
126async def test_network_delay(service_client, gate):
127 gate.to_server_delay(DATA_TRANSMISSION_DELAY)
128 gate.to_client_delay(DATA_TRANSMISSION_DELAY)
129
130 logger.debug('Starting "test_network_delay" check for 500')
131 response = await service_client.get(SELECT_SMALL_TIMEOUT_URL)
132 assert response.status == 500
133 logger.debug('End of "test_network_delay" check for 500')
134
135 await _check_that_restores(service_client, gate)
136
137
138async def test_network_limit_bps_sends(service_client, gate):
139 gate.to_server_limit_bps(BYTES_PER_SECOND_LIMIT)
140
141 logger.debug('Starting "test_network_limit_bps_sends" check for 500')
142 response = await service_client.get(SELECT_SMALL_TIMEOUT_URL)
143 assert response.status == 500
144 logger.debug('End of "test_network_limit_bps_sends" check for 500')
145
146 await _check_that_restores(service_client, gate)
147
148
149async def test_network_limit_bps_recv(service_client, gate):
150 gate.to_client_limit_bps(BYTES_PER_SECOND_LIMIT)
151
152 logger.debug('Starting "test_network_limit_bps_recv" check for 500')
153 response = await service_client.get(SELECT_SMALL_TIMEOUT_URL)
154 assert response.status == 500
155 logger.debug('End of "test_network_limit_bps_recv" check for 500')
156
157 await _check_that_restores(service_client, gate)
158
159
160async def test_network_limit_bps(service_client, gate):
161 gate.to_server_limit_bps(BYTES_PER_SECOND_LIMIT)
162 gate.to_client_limit_bps(BYTES_PER_SECOND_LIMIT)
163
164 logger.debug('Starting "test_network_limit_bps" check for 500')
165 response = await service_client.get(SELECT_SMALL_TIMEOUT_URL)
166 assert response.status == 500
167 logger.debug('End of "test_network_limit_bps" check for 500')
168
169 await _check_that_restores(service_client, gate)
170
171
172@pytest.mark.skip(reason='flacky')
173async def test_network_limit_time_sends(service_client, gate):
174 gate.to_server_limit_time(CONNECTION_TIME_LIMIT, CONNECTION_LIMIT_JITTER)
175
176 logger.debug('Starting "test_network_limit_time_sends" check for 500')
177 got_error = False
178 for _ in range(FAILURE_RETRIES):
179 response = await service_client.get(SELECT_SMALL_TIMEOUT_URL)
180 if response.status != 200:
181 got_error = True
182 break
183 logger.debug('End of "test_network_limit_time_sends" check for 500')
184
185 assert got_error, 'Previous steps unexpectedly finished with success'
186
187 await _check_that_restores(service_client, gate)
188
189
190@pytest.mark.skip(reason='flacky')
191async def test_network_limit_time_recv(service_client, gate):
192 gate.to_client_limit_time(CONNECTION_TIME_LIMIT, CONNECTION_LIMIT_JITTER)
193
194 logger.debug('Starting "test_network_limit_time_recv" check for 500')
195 got_error = False
196 for _ in range(FAILURE_RETRIES):
197 response = await service_client.get(SELECT_SMALL_TIMEOUT_URL)
198 if response.status != 200:
199 got_error = True
200 break
201 logger.debug('End of "test_network_limit_time_recv" check for 500')
202
203 assert got_error, 'Previous steps unexpectedly finished with success'
204
205 await _check_that_restores(service_client, gate)
206
207
208@pytest.mark.skip(reason='flacky')
209async def test_network_limit_time(service_client, gate):
210 gate.to_server_limit_time(CONNECTION_TIME_LIMIT, CONNECTION_LIMIT_JITTER)
211 gate.to_client_limit_time(CONNECTION_TIME_LIMIT, CONNECTION_LIMIT_JITTER)
212
213 logger.debug('Starting "test_network_limit_time" check for 500')
214 got_error = False
215 for _ in range(FAILURE_RETRIES):
216 response = await service_client.get(SELECT_SMALL_TIMEOUT_URL)
217 if response.status != 200:
218 got_error = True
219 break
220 logger.debug('End of "test_network_limit_time" check for 500')
221
222 assert got_error, 'Previous steps unexpectedly finished with success'
223
224 await _check_that_restores(service_client, gate)
225
226
227async def test_network_smaller_parts_sends(service_client, gate):
228 gate.to_server_smaller_parts(DATA_PARTS_MAX_SIZE)
229
230 response = await service_client.get(SELECT_URL)
231 assert response.status == 200
232
233
234async def test_network_smaller_parts_recv(service_client, gate):
235 gate.to_client_smaller_parts(DATA_PARTS_MAX_SIZE)
236
237 response = await service_client.get(SELECT_URL)
238 assert response.status == 200
239
240
241async def test_network_smaller_parts(service_client, gate):
242 gate.to_server_smaller_parts(DATA_PARTS_MAX_SIZE)
243 gate.to_client_smaller_parts(DATA_PARTS_MAX_SIZE)
244
245 response = await service_client.get(SELECT_URL)
246 assert response.status == 200
247
248
249# TODO: timeout does not work!
250async def test_network_limit_bytes_sends(service_client, gate):
251 gate.to_server_limit_bytes(BYTES_TRANSMISSION_LIMIT)
252
253 logger.debug('Starting "test_network_limit_bytes_sends" check for 500')
254 got_error = False
255 for _ in range(FAILURE_RETRIES):
256 response = await service_client.get(SELECT_SMALL_TIMEOUT_URL)
257 if response.status != 200:
258 got_error = True
259 break
260 logger.debug('End of "test_network_limit_bytes_sends" check for 500')
261
262 assert got_error, 'Previous steps unexpectedly finished with success'
263
264 await _check_that_restores(service_client, gate)
265
266
267async def test_network_limit_bytes_recv(service_client, gate):
268 gate.to_client_limit_bytes(BYTES_TRANSMISSION_LIMIT)
269
270 logger.debug('Starting "test_network_limit_bytes_recv" check for 500')
271 got_error = False
272 for _ in range(FAILURE_RETRIES):
273 response = await service_client.get(SELECT_SMALL_TIMEOUT_URL)
274 if response.status != 200:
275 got_error = True
276 break
277 logger.debug('End of "test_network_limit_bytes_recv" check for 500')
278
279 assert got_error, 'Previous steps unexpectedly finished with success'
280
281 await _check_that_restores(service_client, gate)
282
283
284async def test_network_limit_bytes(service_client, gate):
285 gate.to_server_limit_bytes(BYTES_TRANSMISSION_LIMIT)
286 gate.to_client_limit_bytes(BYTES_TRANSMISSION_LIMIT)
287
288 logger.debug('Starting "test_network_limit_bytes" check for 500')
289 got_error = False
290 for _ in range(FAILURE_RETRIES):
291 response = await service_client.get(SELECT_SMALL_TIMEOUT_URL)
292 if response.status != 200:
293 got_error = True
294 break
295 logger.debug('End of "test_network_limit_bytes" check for 500')
296
297 assert got_error, 'Previous steps unexpectedly finished with success'
298
299 await _check_that_restores(service_client, gate)
300
301
302async def _intercept_server_terminated(
303 loop, socket_from: socket.socket, socket_to: socket.socket,
304) -> None:
305 error_msg = (
306 b'E\x00\x00\x00tSFATAL\x00VFATAL\x00C57P01\x00'
307 b'Mterminating connection due to administrator command\x00'
308 b'Fpostgres.c\x00L3218\x00RProcessInterrupts\x00\x00'
309 )
310 ready_for_query = b'Z\x00\x00\x00\x05'
311
312 # Wait until we get the entire server response,
313 # then send an error message instead of 'Z' and
314 # close the socket immediately after that.
315 data = b''
316 n_bytes = -1
317 while n_bytes < 0:
318 data += await loop.sock_recv(socket_from, 4096)
319 n_bytes = data.find(ready_for_query)
320 await loop.sock_sendall(socket_to, data[:n_bytes])
321 await loop.sock_sendall(socket_to, error_msg)
322 raise chaos.GateInterceptException('Closing socket after error')
323
324
325async def test_close_with_error(service_client, gate, testpoint):
326 should_close = False
327
328 @testpoint('after_trx_begin')
329 async def _hook(_data):
330 if should_close:
331 gate.set_to_client_interceptor(_intercept_server_terminated)
332
333 response = await service_client.get(SELECT_SMALL_TIMEOUT_URL)
334 assert response.status == 200
335
336 should_close = True
337 gate.set_to_client_interceptor(_intercept_server_terminated)
338
339 response = await service_client.get(SELECT_SMALL_TIMEOUT_URL)
340 assert response.status == 500
341
342 should_close = False
343 await _check_that_restores(service_client, gate)
344
345
346@pytest.mark.config(
347 POSTGRES_CONNECTION_POOL_SETTINGS={
348 '__default__': {'max_pool_size': 1, 'min_pool_size': 1},
349 },
350)
351async def test_prepared_statement_already_exists(
352 service_client, gate, testpoint,
353):
354 first = {1: True}
355
356 @testpoint('after_trx_begin')
357 async def _hook(_data):
358 if first[1]:
359 gate.to_client_delay(0.9)
360 first[1] = False
361
362 @testpoint('pg_cleanup')
363 async def pg_cleanup_hook(_data):
364 gate.to_client_pass()
365
366 response = await service_client.get(SELECT_SMALL_TIMEOUT_URL)
367 assert response.status == 500
368
369 logger.debug('after slow select')
370
371 gate.to_client_pass()
372 await asyncio.sleep(2)
373
374 logger.debug('after sleep')
375
376 response = await service_client.get(SELECT_SMALL_TIMEOUT_URL)
377 assert response.status == 500
378
379 response = await service_client.get(SELECT_SMALL_TIMEOUT_URL)
380 assert response.status == 200
381
382 response = await service_client.get(SELECT_SMALL_TIMEOUT_URL)
383 assert response.status == 200