userver: postgresql/functional_tests/basic_chaos/tests/test_postgres.py
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages Concepts
postgresql/functional_tests/basic_chaos/tests/test_postgres.py
1import asyncio
2import logging
3import socket
4
5import pytest
6from pytest_userver import chaos
7
8import utils
9
10DATA_TRANSMISSION_DELAY = 1
11BYTES_PER_SECOND_LIMIT = 10
12CONNECTION_TIME_LIMIT = 0.4
13CONNECTION_LIMIT_JITTER = 0.004
14FAILURE_RETRIES = 250
15DATA_PARTS_MAX_SIZE = 40
16BYTES_TRANSMISSION_LIMIT = 1024
17
18SELECT_URL = '/chaos/postgres?type=select'
19SELECT_SMALL_TIMEOUT_URL = '/chaos/postgres?type=select-small-timeout'
20
21
22logger = logging.getLogger(__name__)
23
24
25# /// [restore]
26async def _check_that_restores(service_client, gate):
27 await gate.to_server_pass()
28 await gate.to_client_pass()
29 gate.start_accepting()
30
31 await utils.consume_dead_db_connections(service_client)
32
33 logger.debug('Starting "_check_that_restores" wait for 200')
34 response = await service_client.get(SELECT_URL)
35 assert response.status == 200, 'Bad results after connection restore'
36 logger.debug('End of "_check_that_restores" wait for 200')
37 # /// [restore]
38
39
40async def test_pg_fine(service_client, gate):
41 response = await service_client.get(SELECT_URL)
42 assert response.status == 200
43
44
45async def test_pg_overload_no_accepts(service_client, gate):
46 # Get a connection, if there is no one
47 response = await service_client.get(SELECT_URL)
48 assert response.status == 200
49
50 await gate.stop_accepting()
51
52 # Use already opened connection
53 response = await service_client.get(SELECT_URL)
54 assert response.status == 200
55
56
57# /// [test cc]
58async def test_pg_congestion_control(service_client, gate):
59 await gate.to_server_close_on_data()
60 await gate.to_client_close_on_data()
61
62 for _ in range(gate.connections_count()):
63 response = await service_client.get(SELECT_SMALL_TIMEOUT_URL)
64 assert response.status == 500
65
66 await _check_that_restores(service_client, gate)
67 # /// [test cc]
68
69
70async def test_close_to_client_limit(service_client, gate):
71 for i in range(100, 250, 50):
72 await gate.to_client_limit_bytes(i)
73 response = await service_client.get(SELECT_SMALL_TIMEOUT_URL)
74 assert response.status == 500, i
75
76 await gate.to_client_pass()
77 await _check_that_restores(service_client, gate)
78
79
80async def test_close_to_server_limit(service_client, gate):
81 for i in range(100, 250, 50):
82 await gate.to_server_limit_bytes(i)
83 response = await service_client.get(SELECT_SMALL_TIMEOUT_URL)
84 assert response.status == 500
85
86 await gate.to_server_pass()
87 await _check_that_restores(service_client, gate)
88
89
90@pytest.mark.skip(
91 reason='Rarely breaks the server, and corrupted data can still be valid',
92)
93async def test_pg_corrupted_response(service_client, gate):
94 await gate.to_client_corrupt_data()
95
96 for _ in range(gate.connections_count()):
97 response = await service_client.get(SELECT_URL)
98 assert response.status == 500
99
100 await _check_that_restores(service_client, gate)
101
102
103@pytest.mark.skip(reason='response.status == 200')
104async def test_network_delay_sends(service_client, gate):
105 await gate.to_server_delay(DATA_TRANSMISSION_DELAY)
106
107 logger.debug('Starting "test_network_delay_sends" check for 500')
108 response = await service_client.get(SELECT_SMALL_TIMEOUT_URL)
109 assert response.status == 500
110 logger.debug('End of "test_network_delay_sends" check for 500')
111
112 await _check_that_restores(service_client, gate)
113
114
115@pytest.mark.skip(reason='response.status == 200')
116async def test_network_delay_recv(service_client, gate):
117 await gate.to_client_delay(DATA_TRANSMISSION_DELAY)
118
119 response = await service_client.get(SELECT_SMALL_TIMEOUT_URL)
120 assert response.status == 500
121
122 await _check_that_restores(service_client, gate)
123
124
125async def test_network_delay(service_client, gate):
126 await gate.to_server_delay(DATA_TRANSMISSION_DELAY)
127 await gate.to_client_delay(DATA_TRANSMISSION_DELAY)
128
129 logger.debug('Starting "test_network_delay" check for 500')
130 response = await service_client.get(SELECT_SMALL_TIMEOUT_URL)
131 assert response.status == 500
132 logger.debug('End of "test_network_delay" check for 500')
133
134 await _check_that_restores(service_client, gate)
135
136
137async def test_network_limit_bps_sends(service_client, gate):
138 await gate.to_server_limit_bps(BYTES_PER_SECOND_LIMIT)
139
140 logger.debug('Starting "test_network_limit_bps_sends" check for 500')
141 response = await service_client.get(SELECT_SMALL_TIMEOUT_URL)
142 assert response.status == 500
143 logger.debug('End of "test_network_limit_bps_sends" check for 500')
144
145 await _check_that_restores(service_client, gate)
146
147
148async def test_network_limit_bps_recv(service_client, gate):
149 await gate.to_client_limit_bps(BYTES_PER_SECOND_LIMIT)
150
151 logger.debug('Starting "test_network_limit_bps_recv" check for 500')
152 response = await service_client.get(SELECT_SMALL_TIMEOUT_URL)
153 assert response.status == 500
154 logger.debug('End of "test_network_limit_bps_recv" check for 500')
155
156 await _check_that_restores(service_client, gate)
157
158
159async def test_network_limit_bps(service_client, gate):
160 await gate.to_server_limit_bps(BYTES_PER_SECOND_LIMIT)
161 await gate.to_client_limit_bps(BYTES_PER_SECOND_LIMIT)
162
163 logger.debug('Starting "test_network_limit_bps" check for 500')
164 response = await service_client.get(SELECT_SMALL_TIMEOUT_URL)
165 assert response.status == 500
166 logger.debug('End of "test_network_limit_bps" check for 500')
167
168 await _check_that_restores(service_client, gate)
169
170
171@pytest.mark.skip(reason='flaky')
172async def test_network_limit_time_sends(service_client, gate):
173 await gate.to_server_limit_time(CONNECTION_TIME_LIMIT, CONNECTION_LIMIT_JITTER)
174
175 logger.debug('Starting "test_network_limit_time_sends" check for 500')
176 got_error = False
177 for _ in range(FAILURE_RETRIES):
178 response = await service_client.get(SELECT_SMALL_TIMEOUT_URL)
179 if response.status != 200:
180 got_error = True
181 break
182 logger.debug('End of "test_network_limit_time_sends" check for 500')
183
184 assert got_error, 'Previous steps unexpectedly finished with success'
185
186 await _check_that_restores(service_client, gate)
187
188
189@pytest.mark.skip(reason='flaky')
190async def test_network_limit_time_recv(service_client, gate):
191 await gate.to_client_limit_time(CONNECTION_TIME_LIMIT, CONNECTION_LIMIT_JITTER)
192
193 logger.debug('Starting "test_network_limit_time_recv" check for 500')
194 got_error = False
195 for _ in range(FAILURE_RETRIES):
196 response = await service_client.get(SELECT_SMALL_TIMEOUT_URL)
197 if response.status != 200:
198 got_error = True
199 break
200 logger.debug('End of "test_network_limit_time_recv" check for 500')
201
202 assert got_error, 'Previous steps unexpectedly finished with success'
203
204 await _check_that_restores(service_client, gate)
205
206
207@pytest.mark.skip(reason='flaky')
208async def test_network_limit_time(service_client, gate):
209 await gate.to_server_limit_time(CONNECTION_TIME_LIMIT, CONNECTION_LIMIT_JITTER)
210 await gate.to_client_limit_time(CONNECTION_TIME_LIMIT, CONNECTION_LIMIT_JITTER)
211
212 logger.debug('Starting "test_network_limit_time" check for 500')
213 got_error = False
214 for _ in range(FAILURE_RETRIES):
215 response = await service_client.get(SELECT_SMALL_TIMEOUT_URL)
216 if response.status != 200:
217 got_error = True
218 break
219 logger.debug('End of "test_network_limit_time" check for 500')
220
221 assert got_error, 'Previous steps unexpectedly finished with success'
222
223 await _check_that_restores(service_client, gate)
224
225
226async def test_network_smaller_parts_sends(service_client, gate):
227 await gate.to_server_smaller_parts(DATA_PARTS_MAX_SIZE)
228
229 response = await service_client.get(SELECT_URL)
230 assert response.status == 200
231
232
233async def test_network_smaller_parts_recv(service_client, gate):
234 await gate.to_client_smaller_parts(DATA_PARTS_MAX_SIZE)
235
236 response = await service_client.get(SELECT_URL)
237 assert response.status == 200
238
239
240async def test_network_smaller_parts(service_client, gate):
241 await gate.to_server_smaller_parts(DATA_PARTS_MAX_SIZE)
242 await gate.to_client_smaller_parts(DATA_PARTS_MAX_SIZE)
243
244 response = await service_client.get(SELECT_URL)
245 assert response.status == 200
246
247
248# TODO: timeout does not work!
249async def test_network_limit_bytes_sends(service_client, gate):
250 await gate.to_server_limit_bytes(BYTES_TRANSMISSION_LIMIT)
251
252 logger.debug('Starting "test_network_limit_bytes_sends" check for 500')
253 got_error = False
254 for _ in range(FAILURE_RETRIES):
255 response = await service_client.get(SELECT_SMALL_TIMEOUT_URL)
256 if response.status != 200:
257 got_error = True
258 break
259 logger.debug('End of "test_network_limit_bytes_sends" check for 500')
260
261 assert got_error, 'Previous steps unexpectedly finished with success'
262
263 await _check_that_restores(service_client, gate)
264
265
266async def test_network_limit_bytes_recv(service_client, gate):
267 await gate.to_client_limit_bytes(BYTES_TRANSMISSION_LIMIT)
268
269 logger.debug('Starting "test_network_limit_bytes_recv" check for 500')
270 got_error = False
271 for _ in range(FAILURE_RETRIES):
272 response = await service_client.get(SELECT_SMALL_TIMEOUT_URL)
273 if response.status != 200:
274 got_error = True
275 break
276 logger.debug('End of "test_network_limit_bytes_recv" check for 500')
277
278 assert got_error, 'Previous steps unexpectedly finished with success'
279
280 await _check_that_restores(service_client, gate)
281
282
283async def test_network_limit_bytes(service_client, gate):
284 await gate.to_server_limit_bytes(BYTES_TRANSMISSION_LIMIT)
285 await gate.to_client_limit_bytes(BYTES_TRANSMISSION_LIMIT)
286
287 logger.debug('Starting "test_network_limit_bytes" check for 500')
288 got_error = False
289 for _ in range(FAILURE_RETRIES):
290 response = await service_client.get(SELECT_SMALL_TIMEOUT_URL)
291 if response.status != 200:
292 got_error = True
293 break
294 logger.debug('End of "test_network_limit_bytes" check for 500')
295
296 assert got_error, 'Previous steps unexpectedly finished with success'
297
298 await _check_that_restores(service_client, gate)
299
300
301async def _intercept_server_terminated(
302 loop,
303 socket_from: socket.socket,
304 socket_to: socket.socket,
305) -> None:
306 error_msg = (
307 b'E\x00\x00\x00tSFATAL\x00VFATAL\x00C57P01\x00'
308 b'Mterminating connection due to administrator command\x00'
309 b'Fpostgres.c\x00L3218\x00RProcessInterrupts\x00\x00'
310 )
311 ready_for_query = b'Z\x00\x00\x00\x05'
312
313 # Wait until we get the entire server response,
314 # then send an error message instead of 'Z' and
315 # close the socket immediately after that.
316 data = b''
317 n_bytes = -1
318 while n_bytes < 0:
319 chunk = await loop.sock_recv(socket_from, 4096)
320 if not chunk:
321 raise RuntimeError('Socket connection was closed by the other side')
322 data += chunk
323 n_bytes = data.find(ready_for_query)
324 await loop.sock_sendall(socket_to, data[:n_bytes])
325 await loop.sock_sendall(socket_to, error_msg)
326 raise chaos.GateInterceptException('Closing socket after error')
327
328
329async def test_close_with_error(service_client, gate, testpoint):
330 should_close = False
331
332 @testpoint('after_trx_begin')
333 async def _hook(_data):
334 if should_close:
335 await gate.set_to_client_interceptor(_intercept_server_terminated)
336
337 response = await service_client.get(SELECT_SMALL_TIMEOUT_URL)
338 assert response.status == 200
339
340 should_close = True
341 await gate.set_to_client_interceptor(_intercept_server_terminated)
342
343 response = await service_client.get(SELECT_SMALL_TIMEOUT_URL)
344 assert response.status == 500
345
346 should_close = False
347 await _check_that_restores(service_client, gate)
348
349
350@pytest.mark.config(
351 POSTGRES_CONNECTION_POOL_SETTINGS={
352 '__default__': {'max_pool_size': 1, 'min_pool_size': 1},
353 },
354)
355async def test_prepared_statement_already_exists(
356 service_client,
357 gate,
358 testpoint,
359):
360 first = {1: True}
361
362 @testpoint('after_trx_begin')
363 async def _hook(_data):
364 if first[1]:
365 await gate.to_client_delay(0.9)
366 first[1] = False
367
368 @testpoint('pg_cleanup')
369 async def pg_cleanup_hook(_data):
370 await gate.to_client_pass()
371
372 response = await service_client.get(SELECT_SMALL_TIMEOUT_URL)
373 assert response.status == 500
374
375 logger.debug('after slow select')
376
377 await gate.to_client_pass()
378 await asyncio.sleep(2)
379
380 logger.debug('after sleep')
381
382 response = await service_client.get(SELECT_SMALL_TIMEOUT_URL)
383 assert response.status == 500
384
385 response = await service_client.get(SELECT_SMALL_TIMEOUT_URL)
386 assert response.status == 200
387
388 response = await service_client.get(SELECT_SMALL_TIMEOUT_URL)
389 assert response.status == 200