userver: grpc/functional_tests/middleware_server/tests/test_graceful_shutdown_headers.py
Loading...
Searching...
No Matches
grpc/functional_tests/middleware_server/tests/test_graceful_shutdown_headers.py

1import asyncio
2from signal import SIGTERM
3
4import pytest
5
6import samples.greeter_pb2 as greeter_protos
7
8MD_ONE_REQ = ' One'
9MD_TWO_REQ = ' Two'
10MD_ONE_RES = ' EndOne'
11MD_TWO_RES = ' EndTwo'
12
13
14@pytest.mark.uservice_oneshot
15async def test_graceful_shutdown_headers(service_daemon_instance, grpc_client, service_client, dynamic_config):
16 params = [
17 (True, {'x-envoy-immediate-health-check-fail': ['true']}),
18 (False, {'x-envoy-immediate-health-check-fail': ['true']}),
19 (True, {'x-hdr1': ['true', 'aaa'], 'x-hdr2': ['1', 'bbb', 'yyyy']}),
20 (False, {'x-hdr1': ['true', 'aaa'], 'x-hdr2': ['1', 'bbb', 'yyyy']}),
21 ]
22
23 for headers_enabled, headers in params:
24 await update_graceful_shutdown_headers(service_client, dynamic_config, headers_enabled, headers)
25
26 request = greeter_protos.GreetingRequest(name='Python')
27 call = grpc_client.SayHello(request)
28 response = await call
29 assert response.greeting == f'Hello, Python{MD_ONE_REQ}{MD_TWO_REQ}!{MD_TWO_RES}{MD_ONE_RES}'
30 check_not_present(await call.initial_metadata(), headers)
31 check_not_present(await call.trailing_metadata(), headers)
32
33 service_daemon_instance.process.send_signal(SIGTERM)
34 await asyncio.sleep(1) # Give the service time to process the signal.
35
36 for headers_enabled, headers in params:
37 await update_graceful_shutdown_headers(service_client, dynamic_config, headers_enabled, headers)
38
39 request = greeter_protos.GreetingRequest(name='Python')
40 call = grpc_client.SayHello(request)
41 response = await call
42 assert response.greeting == f'Hello, Python{MD_ONE_REQ}{MD_TWO_REQ}!{MD_TWO_RES}{MD_ONE_RES}'
43
44 if headers_enabled:
45 check_present(await call.initial_metadata(), headers)
46 else:
47 check_not_present(await call.initial_metadata(), headers)
48
49 check_not_present(await call.trailing_metadata(), headers)
50
51 # After a couple more seconds, the service will start shutting down.
52 service_daemon_instance.process.wait()
53
54
55@pytest.mark.uservice_oneshot
56async def test_graceful_shutdown_headers_streams(service_daemon_instance, grpc_client, service_client, dynamic_config):
57 headers = {'x-envoy-immediate-health-check-fail': ['true']}
58 await update_graceful_shutdown_headers(service_client, dynamic_config, True, headers)
59
60 start = f'Python{MD_ONE_REQ}{MD_TWO_REQ}'
61 end = f'{MD_TWO_RES}{MD_ONE_RES}'
62
63 service_daemon_instance.process.send_signal(SIGTERM)
64 await asyncio.sleep(1) # Give the service time to process the signal.
65
66 call = grpc_client.SayHelloStreams(
67 _prepare_requests(['Python', '!', '!', '!'], 0.1),
68 wait_for_ready=True,
69 )
70 async for response in call:
71 assert response.greeting == f'Hello, {start}{end}'
72 start += f'!{MD_ONE_REQ}{MD_TWO_REQ}'
73
74 check_present(await call.initial_metadata(), headers)
75 check_not_present(await call.trailing_metadata(), headers)
76
77 # After a couple more seconds, the service will start shutting down.
78 service_daemon_instance.process.wait()
79
80
81@pytest.mark.uservice_oneshot
82async def test_late_graceful_shutdown_headers_streams(
83 service_daemon_instance, grpc_client, service_client, dynamic_config
84):
85 headers = {'x-envoy-immediate-health-check-fail': ['true']}
86 await update_graceful_shutdown_headers(service_client, dynamic_config, True, headers)
87
88 start = f'Python{MD_ONE_REQ}{MD_TWO_REQ}'
89 end = f'{MD_TWO_RES}{MD_ONE_RES}'
90
91 call = grpc_client.SayHelloStreams(
92 _prepare_late_requests(service_daemon_instance, ['Python', '!', '!'], 0.2),
93 wait_for_ready=True,
94 )
95 async for response in call:
96 assert response.greeting == f'Hello, {start}{end}'
97 start += f'!{MD_ONE_REQ}{MD_TWO_REQ}'
98
99 check_not_present(await call.initial_metadata(), headers)
100 check_present(await call.trailing_metadata(), headers)
101
102 # After a couple more seconds, the service will start shutting down.
103 service_daemon_instance.process.wait()
104
105
106async def _prepare_requests(names, sleep=1):
107 reqs = []
108 for name in names:
109 reqs.append(greeter_protos.GreetingRequest(name=name))
110 for req in reqs:
111 await asyncio.sleep(sleep)
112 yield req
113
114
115async def _prepare_late_requests(service_daemon_instance, names, sleep=1):
116 reqs = []
117 for name in names:
118 reqs.append(greeter_protos.GreetingRequest(name=name))
119 i = 0
120 for req in reqs:
121 if i == 1:
122 service_daemon_instance.process.send_signal(SIGTERM)
123 i += 1
124 await asyncio.sleep(sleep)
125 yield req
126
127
128async def update_graceful_shutdown_headers(service_client, dynamic_config, headers_enabled, headers):
129 dynamic_config.set_values({'GRACEFUL_SHUTDOWN_HEADERS': {'enabled': headers_enabled, 'headers': headers}})
130 await service_client.update_server_state()
131
132
133def check_present(metadata, headers: dict[str, list[str]]):
134 metadata_dict = to_dict(metadata)
135 for k, v in headers.items():
136 assert k in metadata_dict
137 assert metadata_dict[k] == v
138
139
140def check_not_present(metadata, headers: dict[str, list[str]]):
141 metadata_dict = to_dict(metadata)
142 for k in headers.keys():
143 assert k not in metadata_dict
144
145
146def to_dict(metadata) -> dict[str, list[str]]:
147 result: dict[str, list[str]] = {}
148 for k, v in metadata:
149 result.setdefault(k, []).append(v)
150
151 return result