1import asyncio
2import logging
3import socket
4
5import pytest
6from pytest_userver import chaos
7
8import utils
9
10DATA_TRANSMISSION_DELAY = 1
11BYTES_PER_SECOND_LIMIT = 10
12CONNECTION_TIME_LIMIT = 0.4
13CONNECTION_LIMIT_JITTER = 0.004
14FAILURE_RETRIES = 250
15DATA_PARTS_MAX_SIZE = 40
16BYTES_TRANSMISSION_LIMIT = 1024
17
18SELECT_URL = '/chaos/postgres?type=select'
19SELECT_SMALL_TIMEOUT_URL = '/chaos/postgres?type=select-small-timeout'
20
21
22logger = logging.getLogger(__name__)
23
24
25
26async def _check_that_restores(service_client, gate):
27 gate.to_server_pass()
28 gate.to_client_pass()
29 gate.start_accepting()
30
31 await utils.consume_dead_db_connections(service_client)
32
33 logger.debug('Starting "_check_that_restores" wait for 200')
34 response = await service_client.get(SELECT_URL)
35 assert response.status == 200, 'Bad results after connection restore'
36 logger.debug('End of "_check_that_restores" wait for 200')
37
38
39
40async def test_pg_fine(service_client, gate):
41 response = await service_client.get(SELECT_URL)
42 assert response.status == 200
43
44
45async def test_pg_overload_no_accepts(service_client, gate):
46
47 response = await service_client.get(SELECT_URL)
48 assert response.status == 200
49
50 await gate.stop_accepting()
51
52
53 response = await service_client.get(SELECT_URL)
54 assert response.status == 200
55
56
57
58async def test_pg_congestion_control(service_client, gate):
59 gate.to_server_close_on_data()
60 gate.to_client_close_on_data()
61
62 for _ in range(gate.connections_count()):
63 response = await service_client.get(SELECT_SMALL_TIMEOUT_URL)
64 assert response.status == 500
65
66 await _check_that_restores(service_client, gate)
67
68
69
70async def test_close_to_client_limit(service_client, gate):
71 for i in range(100, 250, 50):
72 gate.to_client_limit_bytes(i)
73 response = await service_client.get(SELECT_SMALL_TIMEOUT_URL)
74 assert response.status == 500, i
75
76 gate.to_client_pass()
77 await _check_that_restores(service_client, gate)
78
79
80async def test_close_to_server_limit(service_client, gate):
81 for i in range(100, 250, 50):
82 gate.to_server_limit_bytes(i)
83 response = await service_client.get(SELECT_SMALL_TIMEOUT_URL)
84 assert response.status == 500
85
86 gate.to_server_pass()
87 await _check_that_restores(service_client, gate)
88
89
90@pytest.mark.skip(
91 reason='Rarely breaks the server, and corrupted data can still be valid',
92)
93async def test_pg_corrupted_response(service_client, gate):
94 gate.to_client_corrupt_data()
95
96 for _ in range(gate.connections_count()):
97 response = await service_client.get(SELECT_URL)
98 assert response.status == 500
99
100 await _check_that_restores(service_client, gate)
101
102
103@pytest.mark.skip(reason='response.status == 200')
104async def test_network_delay_sends(service_client, gate):
105 gate.to_server_delay(DATA_TRANSMISSION_DELAY)
106
107 logger.debug('Starting "test_network_delay_sends" check for 500')
108 response = await service_client.get(SELECT_SMALL_TIMEOUT_URL)
109 assert response.status == 500
110 logger.debug('End of "test_network_delay_sends" check for 500')
111
112 await _check_that_restores(service_client, gate)
113
114
115@pytest.mark.skip(reason='response.status == 200')
116async def test_network_delay_recv(service_client, gate):
117 gate.to_client_delay(DATA_TRANSMISSION_DELAY)
118
119 response = await service_client.get(SELECT_SMALL_TIMEOUT_URL)
120 assert response.status == 500
121
122 await _check_that_restores(service_client, gate)
123
124
125async def test_network_delay(service_client, gate):
126 gate.to_server_delay(DATA_TRANSMISSION_DELAY)
127 gate.to_client_delay(DATA_TRANSMISSION_DELAY)
128
129 logger.debug('Starting "test_network_delay" check for 500')
130 response = await service_client.get(SELECT_SMALL_TIMEOUT_URL)
131 assert response.status == 500
132 logger.debug('End of "test_network_delay" check for 500')
133
134 await _check_that_restores(service_client, gate)
135
136
137async def test_network_limit_bps_sends(service_client, gate):
138 gate.to_server_limit_bps(BYTES_PER_SECOND_LIMIT)
139
140 logger.debug('Starting "test_network_limit_bps_sends" check for 500')
141 response = await service_client.get(SELECT_SMALL_TIMEOUT_URL)
142 assert response.status == 500
143 logger.debug('End of "test_network_limit_bps_sends" check for 500')
144
145 await _check_that_restores(service_client, gate)
146
147
148async def test_network_limit_bps_recv(service_client, gate):
149 gate.to_client_limit_bps(BYTES_PER_SECOND_LIMIT)
150
151 logger.debug('Starting "test_network_limit_bps_recv" check for 500')
152 response = await service_client.get(SELECT_SMALL_TIMEOUT_URL)
153 assert response.status == 500
154 logger.debug('End of "test_network_limit_bps_recv" check for 500')
155
156 await _check_that_restores(service_client, gate)
157
158
159async def test_network_limit_bps(service_client, gate):
160 gate.to_server_limit_bps(BYTES_PER_SECOND_LIMIT)
161 gate.to_client_limit_bps(BYTES_PER_SECOND_LIMIT)
162
163 logger.debug('Starting "test_network_limit_bps" check for 500')
164 response = await service_client.get(SELECT_SMALL_TIMEOUT_URL)
165 assert response.status == 500
166 logger.debug('End of "test_network_limit_bps" check for 500')
167
168 await _check_that_restores(service_client, gate)
169
170
171@pytest.mark.skip(reason='flaky')
172async def test_network_limit_time_sends(service_client, gate):
173 gate.to_server_limit_time(CONNECTION_TIME_LIMIT, CONNECTION_LIMIT_JITTER)
174
175 logger.debug('Starting "test_network_limit_time_sends" check for 500')
176 got_error = False
177 for _ in range(FAILURE_RETRIES):
178 response = await service_client.get(SELECT_SMALL_TIMEOUT_URL)
179 if response.status != 200:
180 got_error = True
181 break
182 logger.debug('End of "test_network_limit_time_sends" check for 500')
183
184 assert got_error, 'Previous steps unexpectedly finished with success'
185
186 await _check_that_restores(service_client, gate)
187
188
189@pytest.mark.skip(reason='flaky')
190async def test_network_limit_time_recv(service_client, gate):
191 gate.to_client_limit_time(CONNECTION_TIME_LIMIT, CONNECTION_LIMIT_JITTER)
192
193 logger.debug('Starting "test_network_limit_time_recv" check for 500')
194 got_error = False
195 for _ in range(FAILURE_RETRIES):
196 response = await service_client.get(SELECT_SMALL_TIMEOUT_URL)
197 if response.status != 200:
198 got_error = True
199 break
200 logger.debug('End of "test_network_limit_time_recv" check for 500')
201
202 assert got_error, 'Previous steps unexpectedly finished with success'
203
204 await _check_that_restores(service_client, gate)
205
206
207@pytest.mark.skip(reason='flaky')
208async def test_network_limit_time(service_client, gate):
209 gate.to_server_limit_time(CONNECTION_TIME_LIMIT, CONNECTION_LIMIT_JITTER)
210 gate.to_client_limit_time(CONNECTION_TIME_LIMIT, CONNECTION_LIMIT_JITTER)
211
212 logger.debug('Starting "test_network_limit_time" check for 500')
213 got_error = False
214 for _ in range(FAILURE_RETRIES):
215 response = await service_client.get(SELECT_SMALL_TIMEOUT_URL)
216 if response.status != 200:
217 got_error = True
218 break
219 logger.debug('End of "test_network_limit_time" check for 500')
220
221 assert got_error, 'Previous steps unexpectedly finished with success'
222
223 await _check_that_restores(service_client, gate)
224
225
226async def test_network_smaller_parts_sends(service_client, gate):
227 gate.to_server_smaller_parts(DATA_PARTS_MAX_SIZE)
228
229 response = await service_client.get(SELECT_URL)
230 assert response.status == 200
231
232
233async def test_network_smaller_parts_recv(service_client, gate):
234 gate.to_client_smaller_parts(DATA_PARTS_MAX_SIZE)
235
236 response = await service_client.get(SELECT_URL)
237 assert response.status == 200
238
239
240async def test_network_smaller_parts(service_client, gate):
241 gate.to_server_smaller_parts(DATA_PARTS_MAX_SIZE)
242 gate.to_client_smaller_parts(DATA_PARTS_MAX_SIZE)
243
244 response = await service_client.get(SELECT_URL)
245 assert response.status == 200
246
247
248
249async def test_network_limit_bytes_sends(service_client, gate):
250 gate.to_server_limit_bytes(BYTES_TRANSMISSION_LIMIT)
251
252 logger.debug('Starting "test_network_limit_bytes_sends" check for 500')
253 got_error = False
254 for _ in range(FAILURE_RETRIES):
255 response = await service_client.get(SELECT_SMALL_TIMEOUT_URL)
256 if response.status != 200:
257 got_error = True
258 break
259 logger.debug('End of "test_network_limit_bytes_sends" check for 500')
260
261 assert got_error, 'Previous steps unexpectedly finished with success'
262
263 await _check_that_restores(service_client, gate)
264
265
266async def test_network_limit_bytes_recv(service_client, gate):
267 gate.to_client_limit_bytes(BYTES_TRANSMISSION_LIMIT)
268
269 logger.debug('Starting "test_network_limit_bytes_recv" check for 500')
270 got_error = False
271 for _ in range(FAILURE_RETRIES):
272 response = await service_client.get(SELECT_SMALL_TIMEOUT_URL)
273 if response.status != 200:
274 got_error = True
275 break
276 logger.debug('End of "test_network_limit_bytes_recv" check for 500')
277
278 assert got_error, 'Previous steps unexpectedly finished with success'
279
280 await _check_that_restores(service_client, gate)
281
282
283async def test_network_limit_bytes(service_client, gate):
284 gate.to_server_limit_bytes(BYTES_TRANSMISSION_LIMIT)
285 gate.to_client_limit_bytes(BYTES_TRANSMISSION_LIMIT)
286
287 logger.debug('Starting "test_network_limit_bytes" check for 500')
288 got_error = False
289 for _ in range(FAILURE_RETRIES):
290 response = await service_client.get(SELECT_SMALL_TIMEOUT_URL)
291 if response.status != 200:
292 got_error = True
293 break
294 logger.debug('End of "test_network_limit_bytes" check for 500')
295
296 assert got_error, 'Previous steps unexpectedly finished with success'
297
298 await _check_that_restores(service_client, gate)
299
300
301async def _intercept_server_terminated(
302 loop, socket_from: socket.socket, socket_to: socket.socket,
303) -> None:
304 error_msg = (
305 b'E\x00\x00\x00tSFATAL\x00VFATAL\x00C57P01\x00'
306 b'Mterminating connection due to administrator command\x00'
307 b'Fpostgres.c\x00L3218\x00RProcessInterrupts\x00\x00'
308 )
309 ready_for_query = b'Z\x00\x00\x00\x05'
310
311
312
313
314 data = b''
315 n_bytes = -1
316 while n_bytes < 0:
317 data += await loop.sock_recv(socket_from, 4096)
318 n_bytes = data.find(ready_for_query)
319 await loop.sock_sendall(socket_to, data[:n_bytes])
320 await loop.sock_sendall(socket_to, error_msg)
321 raise chaos.GateInterceptException('Closing socket after error')
322
323
324async def test_close_with_error(service_client, gate, testpoint):
325 should_close = False
326
327 @testpoint('after_trx_begin')
328 async def _hook(_data):
329 if should_close:
330 gate.set_to_client_interceptor(_intercept_server_terminated)
331
332 response = await service_client.get(SELECT_SMALL_TIMEOUT_URL)
333 assert response.status == 200
334
335 should_close = True
336 gate.set_to_client_interceptor(_intercept_server_terminated)
337
338 response = await service_client.get(SELECT_SMALL_TIMEOUT_URL)
339 assert response.status == 500
340
341 should_close = False
342 await _check_that_restores(service_client, gate)
343
344
345@pytest.mark.config(
346 POSTGRES_CONNECTION_POOL_SETTINGS={
347 '__default__': {'max_pool_size': 1, 'min_pool_size': 1},
348 },
349)
350async def test_prepared_statement_already_exists(
351 service_client, gate, testpoint,
352):
353 first = {1: True}
354
355 @testpoint('after_trx_begin')
356 async def _hook(_data):
357 if first[1]:
358 gate.to_client_delay(0.9)
359 first[1] = False
360
361 @testpoint('pg_cleanup')
362 async def pg_cleanup_hook(_data):
363 gate.to_client_pass()
364
365 response = await service_client.get(SELECT_SMALL_TIMEOUT_URL)
366 assert response.status == 500
367
368 logger.debug('after slow select')
369
370 gate.to_client_pass()
371 await asyncio.sleep(2)
372
373 logger.debug('after sleep')
374
375 response = await service_client.get(SELECT_SMALL_TIMEOUT_URL)
376 assert response.status == 500
377
378 response = await service_client.get(SELECT_SMALL_TIMEOUT_URL)
379 assert response.status == 200
380
381 response = await service_client.get(SELECT_SMALL_TIMEOUT_URL)
382 assert response.status == 200