userver: postgresql/functional_tests/basic_chaos/tests/test_postgres.py
Loading...
Searching...
No Matches
postgresql/functional_tests/basic_chaos/tests/test_postgres.py
1import logging
2import socket
3
4import pytest
5
6from pytest_userver import chaos
7
8import utils
9
10
11DATA_TRANSMISSION_DELAY = 1
12BYTES_PER_SECOND_LIMIT = 10
13CONNECTION_TIME_LIMIT = 0.4
14CONNECTION_LIMIT_JITTER = 0.004
15FAILURE_RETRIES = 250
16DATA_PARTS_MAX_SIZE = 40
17BYTES_TRANSMISSION_LIMIT = 1024
18
19SELECT_URL = '/chaos/postgres?type=select'
20SELECT_SMALL_TIMEOUT_URL = '/chaos/postgres?type=select-small-timeout'
21
22
23logger = logging.getLogger(__name__)
24
25
26# /// [restore]
27async def _check_that_restores(service_client, gate):
28 gate.to_server_pass()
29 gate.to_client_pass()
30 gate.start_accepting()
31
32 await utils.consume_dead_db_connections(service_client)
33
34 logger.debug('Starting "_check_that_restores" wait for 200')
35 response = await service_client.get(SELECT_URL)
36 assert response.status == 200, 'Bad results after connection restore'
37 logger.debug('End of "_check_that_restores" wait for 200')
38 # /// [restore]
39
40
41async def test_pg_fine(service_client, gate):
42 response = await service_client.get(SELECT_URL)
43 assert response.status == 200
44
45
46async def test_pg_overload_no_accepts(service_client, gate):
47 # Get a connection, if there is no one
48 response = await service_client.get(SELECT_URL)
49 assert response.status == 200
50
51 await gate.stop_accepting()
52
53 # Use already opened connection
54 response = await service_client.get(SELECT_URL)
55 assert response.status == 200
56
57
58# /// [test cc]
59async def test_pg_congestion_control(service_client, gate):
60 gate.to_server_close_on_data()
61 gate.to_client_close_on_data()
62
63 for _ in range(gate.connections_count()):
64 response = await service_client.get(SELECT_SMALL_TIMEOUT_URL)
65 assert response.status == 500
66
67 await _check_that_restores(service_client, gate)
68 # /// [test cc]
69
70
71async def test_close_to_client_limit(service_client, gate):
72 for i in range(100, 250, 50):
73 gate.to_client_limit_bytes(i)
74 response = await service_client.get(SELECT_SMALL_TIMEOUT_URL)
75 assert response.status == 500, i
76
77 gate.to_client_pass()
78 await _check_that_restores(service_client, gate)
79
80
81async def test_close_to_server_limit(service_client, gate):
82 for i in range(100, 250, 50):
83 gate.to_server_limit_bytes(i)
84 response = await service_client.get(SELECT_SMALL_TIMEOUT_URL)
85 assert response.status == 500
86
87 gate.to_server_pass()
88 await _check_that_restores(service_client, gate)
89
90
91@pytest.mark.skip(
92 reason='Rarely breaks the server, and corrupted data can still be valid',
93)
94async def test_pg_corrupted_response(service_client, gate):
95 gate.to_client_corrupt_data()
96
97 for _ in range(gate.connections_count()):
98 response = await service_client.get(SELECT_URL)
99 assert response.status == 500
100
101 await _check_that_restores(service_client, gate)
102
103
104@pytest.mark.skip(reason='response.status == 200')
105async def test_network_delay_sends(service_client, gate):
106 gate.to_server_delay(DATA_TRANSMISSION_DELAY)
107
108 logger.debug('Starting "test_network_delay_sends" check for 500')
109 response = await service_client.get(SELECT_SMALL_TIMEOUT_URL)
110 assert response.status == 500
111 logger.debug('End of "test_network_delay_sends" check for 500')
112
113 await _check_that_restores(service_client, gate)
114
115
116@pytest.mark.skip(reason='response.status == 200')
117async def test_network_delay_recv(service_client, gate):
118 gate.to_client_delay(DATA_TRANSMISSION_DELAY)
119
120 response = await service_client.get(SELECT_SMALL_TIMEOUT_URL)
121 assert response.status == 500
122
123 await _check_that_restores(service_client, gate)
124
125
126async def test_network_delay(service_client, gate):
127 gate.to_server_delay(DATA_TRANSMISSION_DELAY)
128 gate.to_client_delay(DATA_TRANSMISSION_DELAY)
129
130 logger.debug('Starting "test_network_delay" check for 500')
131 response = await service_client.get(SELECT_SMALL_TIMEOUT_URL)
132 assert response.status == 500
133 logger.debug('End of "test_network_delay" check for 500')
134
135 await _check_that_restores(service_client, gate)
136
137
138async def test_network_limit_bps_sends(service_client, gate):
139 gate.to_server_limit_bps(BYTES_PER_SECOND_LIMIT)
140
141 logger.debug('Starting "test_network_limit_bps_sends" check for 500')
142 response = await service_client.get(SELECT_SMALL_TIMEOUT_URL)
143 assert response.status == 500
144 logger.debug('End of "test_network_limit_bps_sends" check for 500')
145
146 await _check_that_restores(service_client, gate)
147
148
149async def test_network_limit_bps_recv(service_client, gate):
150 gate.to_client_limit_bps(BYTES_PER_SECOND_LIMIT)
151
152 logger.debug('Starting "test_network_limit_bps_recv" check for 500')
153 response = await service_client.get(SELECT_SMALL_TIMEOUT_URL)
154 assert response.status == 500
155 logger.debug('End of "test_network_limit_bps_recv" check for 500')
156
157 await _check_that_restores(service_client, gate)
158
159
160async def test_network_limit_bps(service_client, gate):
161 gate.to_server_limit_bps(BYTES_PER_SECOND_LIMIT)
162 gate.to_client_limit_bps(BYTES_PER_SECOND_LIMIT)
163
164 logger.debug('Starting "test_network_limit_bps" check for 500')
165 response = await service_client.get(SELECT_SMALL_TIMEOUT_URL)
166 assert response.status == 500
167 logger.debug('End of "test_network_limit_bps" check for 500')
168
169 await _check_that_restores(service_client, gate)
170
171
172async def test_network_limit_time_sends(service_client, gate):
173 gate.to_server_limit_time(CONNECTION_TIME_LIMIT, CONNECTION_LIMIT_JITTER)
174
175 logger.debug('Starting "test_network_limit_time_sends" check for 500')
176 got_error = False
177 for _ in range(FAILURE_RETRIES):
178 response = await service_client.get(SELECT_SMALL_TIMEOUT_URL)
179 if response.status != 200:
180 got_error = True
181 break
182 logger.debug('End of "test_network_limit_time_sends" check for 500')
183
184 assert got_error, 'Previous steps unexpectedly finished with success'
185
186 await _check_that_restores(service_client, gate)
187
188
189async def test_network_limit_time_recv(service_client, gate):
190 gate.to_client_limit_time(CONNECTION_TIME_LIMIT, CONNECTION_LIMIT_JITTER)
191
192 logger.debug('Starting "test_network_limit_time_recv" check for 500')
193 got_error = False
194 for _ in range(FAILURE_RETRIES):
195 response = await service_client.get(SELECT_SMALL_TIMEOUT_URL)
196 if response.status != 200:
197 got_error = True
198 break
199 logger.debug('End of "test_network_limit_time_recv" check for 500')
200
201 assert got_error, 'Previous steps unexpectedly finished with success'
202
203 await _check_that_restores(service_client, gate)
204
205
206async def test_network_limit_time(service_client, gate):
207 gate.to_server_limit_time(CONNECTION_TIME_LIMIT, CONNECTION_LIMIT_JITTER)
208 gate.to_client_limit_time(CONNECTION_TIME_LIMIT, CONNECTION_LIMIT_JITTER)
209
210 logger.debug('Starting "test_network_limit_time" check for 500')
211 got_error = False
212 for _ in range(FAILURE_RETRIES):
213 response = await service_client.get(SELECT_SMALL_TIMEOUT_URL)
214 if response.status != 200:
215 got_error = True
216 break
217 logger.debug('End of "test_network_limit_time" check for 500')
218
219 assert got_error, 'Previous steps unexpectedly finished with success'
220
221 await _check_that_restores(service_client, gate)
222
223
224async def test_network_smaller_parts_sends(service_client, gate):
225 gate.to_server_smaller_parts(DATA_PARTS_MAX_SIZE)
226
227 response = await service_client.get(SELECT_URL)
228 assert response.status == 200
229
230
231async def test_network_smaller_parts_recv(service_client, gate):
232 gate.to_client_smaller_parts(DATA_PARTS_MAX_SIZE)
233
234 response = await service_client.get(SELECT_URL)
235 assert response.status == 200
236
237
238async def test_network_smaller_parts(service_client, gate):
239 gate.to_server_smaller_parts(DATA_PARTS_MAX_SIZE)
240 gate.to_client_smaller_parts(DATA_PARTS_MAX_SIZE)
241
242 response = await service_client.get(SELECT_URL)
243 assert response.status == 200
244
245
246# TODO: timeout does not work!
247async def test_network_limit_bytes_sends(service_client, gate):
248 gate.to_server_limit_bytes(BYTES_TRANSMISSION_LIMIT)
249
250 logger.debug('Starting "test_network_limit_bytes_sends" check for 500')
251 got_error = False
252 for _ in range(FAILURE_RETRIES):
253 response = await service_client.get(SELECT_SMALL_TIMEOUT_URL)
254 if response.status != 200:
255 got_error = True
256 break
257 logger.debug('End of "test_network_limit_bytes_sends" check for 500')
258
259 assert got_error, 'Previous steps unexpectedly finished with success'
260
261 await _check_that_restores(service_client, gate)
262
263
264async def test_network_limit_bytes_recv(service_client, gate):
265 gate.to_client_limit_bytes(BYTES_TRANSMISSION_LIMIT)
266
267 logger.debug('Starting "test_network_limit_bytes_recv" check for 500')
268 got_error = False
269 for _ in range(FAILURE_RETRIES):
270 response = await service_client.get(SELECT_SMALL_TIMEOUT_URL)
271 if response.status != 200:
272 got_error = True
273 break
274 logger.debug('End of "test_network_limit_bytes_recv" check for 500')
275
276 assert got_error, 'Previous steps unexpectedly finished with success'
277
278 await _check_that_restores(service_client, gate)
279
280
281async def test_network_limit_bytes(service_client, gate):
282 gate.to_server_limit_bytes(BYTES_TRANSMISSION_LIMIT)
283 gate.to_client_limit_bytes(BYTES_TRANSMISSION_LIMIT)
284
285 logger.debug('Starting "test_network_limit_bytes" check for 500')
286 got_error = False
287 for _ in range(FAILURE_RETRIES):
288 response = await service_client.get(SELECT_SMALL_TIMEOUT_URL)
289 if response.status != 200:
290 got_error = True
291 break
292 logger.debug('End of "test_network_limit_bytes" check for 500')
293
294 assert got_error, 'Previous steps unexpectedly finished with success'
295
296 await _check_that_restores(service_client, gate)
297
298
299async def _intercept_server_terminated(
300 loop, socket_from: socket.socket, socket_to: socket.socket,
301) -> None:
302 error_msg = (
303 b'E\x00\x00\x00tSFATAL\x00VFATAL\x00C57P01\x00'
304 b'Mterminating connection due to administrator command\x00'
305 b'Fpostgres.c\x00L3218\x00RProcessInterrupts\x00\x00'
306 )
307 ready_for_query = b'Z\x00\x00\x00\x05'
308
309 # Wait until we get the entire server response,
310 # then send an error message instead of 'Z' and
311 # close the socket immediately after that.
312 data = b''
313 n = -1
314 while n < 0:
315 data += await loop.sock_recv(socket_from, 4096)
316 n = data.find(ready_for_query)
317 await loop.sock_sendall(socket_to, data[:n])
318 await loop.sock_sendall(socket_to, error_msg)
319 raise chaos.GateInterceptException('Closing socket after error')
320
321
322async def test_close_with_error(service_client, gate, testpoint):
323 should_close = False
324
325 @testpoint('after_trx_begin')
326 async def _hook(_data):
327 if should_close:
328 gate.set_to_client_interceptor(_intercept_server_terminated)
329
330 response = await service_client.get(SELECT_SMALL_TIMEOUT_URL)
331 assert response.status == 200
332
333 should_close = True
334 gate.set_to_client_interceptor(_intercept_server_terminated)
335
336 response = await service_client.get(SELECT_SMALL_TIMEOUT_URL)
337 assert response.status == 500
338
339 should_close = False
340 await _check_that_restores(service_client, gate)