userver: postgresql/functional_tests/basic_chaos/tests/test_postgres.py
Loading...
Searching...
No Matches
postgresql/functional_tests/basic_chaos/tests/test_postgres.py
1import asyncio
2import logging
3import socket
4
5import pytest
6from pytest_userver import chaos
7
8import utils
9
10
11DATA_TRANSMISSION_DELAY = 1
12BYTES_PER_SECOND_LIMIT = 10
13CONNECTION_TIME_LIMIT = 0.4
14CONNECTION_LIMIT_JITTER = 0.004
15FAILURE_RETRIES = 250
16DATA_PARTS_MAX_SIZE = 40
17BYTES_TRANSMISSION_LIMIT = 1024
18
19SELECT_URL = '/chaos/postgres?type=select'
20SELECT_SMALL_TIMEOUT_URL = '/chaos/postgres?type=select-small-timeout'
21
22
23logger = logging.getLogger(__name__)
24
25
26# /// [restore]
27async def _check_that_restores(service_client, gate):
28 gate.to_server_pass()
29 gate.to_client_pass()
30 gate.start_accepting()
31
32 await utils.consume_dead_db_connections(service_client)
33
34 logger.debug('Starting "_check_that_restores" wait for 200')
35 response = await service_client.get(SELECT_URL)
36 assert response.status == 200, 'Bad results after connection restore'
37 logger.debug('End of "_check_that_restores" wait for 200')
38 # /// [restore]
39
40
41async def test_pg_fine(service_client, gate):
42 response = await service_client.get(SELECT_URL)
43 assert response.status == 200
44
45
46async def test_pg_overload_no_accepts(service_client, gate):
47 # Get a connection, if there is no one
48 response = await service_client.get(SELECT_URL)
49 assert response.status == 200
50
51 await gate.stop_accepting()
52
53 # Use already opened connection
54 response = await service_client.get(SELECT_URL)
55 assert response.status == 200
56
57
58# /// [test cc]
59async def test_pg_congestion_control(service_client, gate):
60 gate.to_server_close_on_data()
61 gate.to_client_close_on_data()
62
63 for _ in range(gate.connections_count()):
64 response = await service_client.get(SELECT_SMALL_TIMEOUT_URL)
65 assert response.status == 500
66
67 await _check_that_restores(service_client, gate)
68 # /// [test cc]
69
70
71async def test_close_to_client_limit(service_client, gate):
72 for i in range(100, 250, 50):
73 gate.to_client_limit_bytes(i)
74 response = await service_client.get(SELECT_SMALL_TIMEOUT_URL)
75 assert response.status == 500, i
76
77 gate.to_client_pass()
78 await _check_that_restores(service_client, gate)
79
80
81async def test_close_to_server_limit(service_client, gate):
82 for i in range(100, 250, 50):
83 gate.to_server_limit_bytes(i)
84 response = await service_client.get(SELECT_SMALL_TIMEOUT_URL)
85 assert response.status == 500
86
87 gate.to_server_pass()
88 await _check_that_restores(service_client, gate)
89
90
91@pytest.mark.skip(
92 reason='Rarely breaks the server, and corrupted data can still be valid',
93)
94async def test_pg_corrupted_response(service_client, gate):
95 gate.to_client_corrupt_data()
96
97 for _ in range(gate.connections_count()):
98 response = await service_client.get(SELECT_URL)
99 assert response.status == 500
100
101 await _check_that_restores(service_client, gate)
102
103
104@pytest.mark.skip(reason='response.status == 200')
105async def test_network_delay_sends(service_client, gate):
106 gate.to_server_delay(DATA_TRANSMISSION_DELAY)
107
108 logger.debug('Starting "test_network_delay_sends" check for 500')
109 response = await service_client.get(SELECT_SMALL_TIMEOUT_URL)
110 assert response.status == 500
111 logger.debug('End of "test_network_delay_sends" check for 500')
112
113 await _check_that_restores(service_client, gate)
114
115
116@pytest.mark.skip(reason='response.status == 200')
117async def test_network_delay_recv(service_client, gate):
118 gate.to_client_delay(DATA_TRANSMISSION_DELAY)
119
120 response = await service_client.get(SELECT_SMALL_TIMEOUT_URL)
121 assert response.status == 500
122
123 await _check_that_restores(service_client, gate)
124
125
126async def test_network_delay(service_client, gate):
127 gate.to_server_delay(DATA_TRANSMISSION_DELAY)
128 gate.to_client_delay(DATA_TRANSMISSION_DELAY)
129
130 logger.debug('Starting "test_network_delay" check for 500')
131 response = await service_client.get(SELECT_SMALL_TIMEOUT_URL)
132 assert response.status == 500
133 logger.debug('End of "test_network_delay" check for 500')
134
135 await _check_that_restores(service_client, gate)
136
137
138async def test_network_limit_bps_sends(service_client, gate):
139 gate.to_server_limit_bps(BYTES_PER_SECOND_LIMIT)
140
141 logger.debug('Starting "test_network_limit_bps_sends" check for 500')
142 response = await service_client.get(SELECT_SMALL_TIMEOUT_URL)
143 assert response.status == 500
144 logger.debug('End of "test_network_limit_bps_sends" check for 500')
145
146 await _check_that_restores(service_client, gate)
147
148
149async def test_network_limit_bps_recv(service_client, gate):
150 gate.to_client_limit_bps(BYTES_PER_SECOND_LIMIT)
151
152 logger.debug('Starting "test_network_limit_bps_recv" check for 500')
153 response = await service_client.get(SELECT_SMALL_TIMEOUT_URL)
154 assert response.status == 500
155 logger.debug('End of "test_network_limit_bps_recv" check for 500')
156
157 await _check_that_restores(service_client, gate)
158
159
160async def test_network_limit_bps(service_client, gate):
161 gate.to_server_limit_bps(BYTES_PER_SECOND_LIMIT)
162 gate.to_client_limit_bps(BYTES_PER_SECOND_LIMIT)
163
164 logger.debug('Starting "test_network_limit_bps" check for 500')
165 response = await service_client.get(SELECT_SMALL_TIMEOUT_URL)
166 assert response.status == 500
167 logger.debug('End of "test_network_limit_bps" check for 500')
168
169 await _check_that_restores(service_client, gate)
170
171
172@pytest.mark.skip(reason='flacky')
173async def test_network_limit_time_sends(service_client, gate):
174 gate.to_server_limit_time(CONNECTION_TIME_LIMIT, CONNECTION_LIMIT_JITTER)
175
176 logger.debug('Starting "test_network_limit_time_sends" check for 500')
177 got_error = False
178 for _ in range(FAILURE_RETRIES):
179 response = await service_client.get(SELECT_SMALL_TIMEOUT_URL)
180 if response.status != 200:
181 got_error = True
182 break
183 logger.debug('End of "test_network_limit_time_sends" check for 500')
184
185 assert got_error, 'Previous steps unexpectedly finished with success'
186
187 await _check_that_restores(service_client, gate)
188
189
190@pytest.mark.skip(reason='flacky')
191async def test_network_limit_time_recv(service_client, gate):
192 gate.to_client_limit_time(CONNECTION_TIME_LIMIT, CONNECTION_LIMIT_JITTER)
193
194 logger.debug('Starting "test_network_limit_time_recv" check for 500')
195 got_error = False
196 for _ in range(FAILURE_RETRIES):
197 response = await service_client.get(SELECT_SMALL_TIMEOUT_URL)
198 if response.status != 200:
199 got_error = True
200 break
201 logger.debug('End of "test_network_limit_time_recv" check for 500')
202
203 assert got_error, 'Previous steps unexpectedly finished with success'
204
205 await _check_that_restores(service_client, gate)
206
207
208@pytest.mark.skip(reason='flacky')
209async def test_network_limit_time(service_client, gate):
210 gate.to_server_limit_time(CONNECTION_TIME_LIMIT, CONNECTION_LIMIT_JITTER)
211 gate.to_client_limit_time(CONNECTION_TIME_LIMIT, CONNECTION_LIMIT_JITTER)
212
213 logger.debug('Starting "test_network_limit_time" check for 500')
214 got_error = False
215 for _ in range(FAILURE_RETRIES):
216 response = await service_client.get(SELECT_SMALL_TIMEOUT_URL)
217 if response.status != 200:
218 got_error = True
219 break
220 logger.debug('End of "test_network_limit_time" check for 500')
221
222 assert got_error, 'Previous steps unexpectedly finished with success'
223
224 await _check_that_restores(service_client, gate)
225
226
227async def test_network_smaller_parts_sends(service_client, gate):
228 gate.to_server_smaller_parts(DATA_PARTS_MAX_SIZE)
229
230 response = await service_client.get(SELECT_URL)
231 assert response.status == 200
232
233
234async def test_network_smaller_parts_recv(service_client, gate):
235 gate.to_client_smaller_parts(DATA_PARTS_MAX_SIZE)
236
237 response = await service_client.get(SELECT_URL)
238 assert response.status == 200
239
240
241async def test_network_smaller_parts(service_client, gate):
242 gate.to_server_smaller_parts(DATA_PARTS_MAX_SIZE)
243 gate.to_client_smaller_parts(DATA_PARTS_MAX_SIZE)
244
245 response = await service_client.get(SELECT_URL)
246 assert response.status == 200
247
248
249# TODO: timeout does not work!
250async def test_network_limit_bytes_sends(service_client, gate):
251 gate.to_server_limit_bytes(BYTES_TRANSMISSION_LIMIT)
252
253 logger.debug('Starting "test_network_limit_bytes_sends" check for 500')
254 got_error = False
255 for _ in range(FAILURE_RETRIES):
256 response = await service_client.get(SELECT_SMALL_TIMEOUT_URL)
257 if response.status != 200:
258 got_error = True
259 break
260 logger.debug('End of "test_network_limit_bytes_sends" check for 500')
261
262 assert got_error, 'Previous steps unexpectedly finished with success'
263
264 await _check_that_restores(service_client, gate)
265
266
267async def test_network_limit_bytes_recv(service_client, gate):
268 gate.to_client_limit_bytes(BYTES_TRANSMISSION_LIMIT)
269
270 logger.debug('Starting "test_network_limit_bytes_recv" check for 500')
271 got_error = False
272 for _ in range(FAILURE_RETRIES):
273 response = await service_client.get(SELECT_SMALL_TIMEOUT_URL)
274 if response.status != 200:
275 got_error = True
276 break
277 logger.debug('End of "test_network_limit_bytes_recv" check for 500')
278
279 assert got_error, 'Previous steps unexpectedly finished with success'
280
281 await _check_that_restores(service_client, gate)
282
283
284async def test_network_limit_bytes(service_client, gate):
285 gate.to_server_limit_bytes(BYTES_TRANSMISSION_LIMIT)
286 gate.to_client_limit_bytes(BYTES_TRANSMISSION_LIMIT)
287
288 logger.debug('Starting "test_network_limit_bytes" check for 500')
289 got_error = False
290 for _ in range(FAILURE_RETRIES):
291 response = await service_client.get(SELECT_SMALL_TIMEOUT_URL)
292 if response.status != 200:
293 got_error = True
294 break
295 logger.debug('End of "test_network_limit_bytes" check for 500')
296
297 assert got_error, 'Previous steps unexpectedly finished with success'
298
299 await _check_that_restores(service_client, gate)
300
301
302async def _intercept_server_terminated(
303 loop, socket_from: socket.socket, socket_to: socket.socket,
304) -> None:
305 error_msg = (
306 b'E\x00\x00\x00tSFATAL\x00VFATAL\x00C57P01\x00'
307 b'Mterminating connection due to administrator command\x00'
308 b'Fpostgres.c\x00L3218\x00RProcessInterrupts\x00\x00'
309 )
310 ready_for_query = b'Z\x00\x00\x00\x05'
311
312 # Wait until we get the entire server response,
313 # then send an error message instead of 'Z' and
314 # close the socket immediately after that.
315 data = b''
316 n_bytes = -1
317 while n_bytes < 0:
318 data += await loop.sock_recv(socket_from, 4096)
319 n_bytes = data.find(ready_for_query)
320 await loop.sock_sendall(socket_to, data[:n_bytes])
321 await loop.sock_sendall(socket_to, error_msg)
322 raise chaos.GateInterceptException('Closing socket after error')
323
324
325async def test_close_with_error(service_client, gate, testpoint):
326 should_close = False
327
328 @testpoint('after_trx_begin')
329 async def _hook(_data):
330 if should_close:
331 gate.set_to_client_interceptor(_intercept_server_terminated)
332
333 response = await service_client.get(SELECT_SMALL_TIMEOUT_URL)
334 assert response.status == 200
335
336 should_close = True
337 gate.set_to_client_interceptor(_intercept_server_terminated)
338
339 response = await service_client.get(SELECT_SMALL_TIMEOUT_URL)
340 assert response.status == 500
341
342 should_close = False
343 await _check_that_restores(service_client, gate)
344
345
346@pytest.mark.config(
347 POSTGRES_CONNECTION_POOL_SETTINGS={
348 '__default__': {'max_pool_size': 1, 'min_pool_size': 1},
349 },
350)
351async def test_prepared_statement_already_exists(
352 service_client, gate, testpoint,
353):
354 first = {1: True}
355
356 @testpoint('after_trx_begin')
357 async def _hook(_data):
358 if first[1]:
359 gate.to_client_delay(0.9)
360 first[1] = False
361
362 @testpoint('pg_cleanup')
363 async def pg_cleanup_hook(_data):
364 gate.to_client_pass()
365
366 response = await service_client.get(SELECT_SMALL_TIMEOUT_URL)
367 assert response.status == 500
368
369 logger.debug('after slow select')
370
371 gate.to_client_pass()
372 await asyncio.sleep(2)
373
374 logger.debug('after sleep')
375
376 response = await service_client.get(SELECT_SMALL_TIMEOUT_URL)
377 assert response.status == 500
378
379 response = await service_client.get(SELECT_SMALL_TIMEOUT_URL)
380 assert response.status == 200
381
382 response = await service_client.get(SELECT_SMALL_TIMEOUT_URL)
383 assert response.status == 200