userver: /data/code/userver/testsuite/pytest_plugins/pytest_userver/plugins/grpc/client.py Source File
Loading...
Searching...
No Matches
client.py
1"""
2Make gRPC requests to the service.
3
4@sa @ref scripts/docs/en/userver/tutorial/grpc_service.md
5"""
6
7# pylint: disable=redefined-outer-name
8import asyncio
9import pathlib
10import tempfile
11from typing import AsyncIterable
12from typing import Awaitable
13from typing import Callable
14from typing import Optional
15from typing import Union
16
17import google.protobuf.message
18import grpc
19import pytest
20
21DEFAULT_TIMEOUT = 15.0
22USERVER_CONFIG_HOOKS = ['userver_config_grpc_endpoint']
23
24MessageOrStream = Union[google.protobuf.message.Message, AsyncIterable[google.protobuf.message.Message]]
25_AsyncExcCheck = Callable[[], None]
26
27
28@pytest.fixture(scope='session')
30 """
31 Returns the gRPC port that should be used in service runner mode in case
32 no port is specified in the source config_yaml.
33
34 @ingroup userver_testsuite_fixtures
35 """
36 return 11080
37
38
39@pytest.fixture(scope='session')
40def grpc_service_endpoint(service_config) -> str:
41 """
42 Returns the gRPC endpoint of the service.
43 Used by @ref pytest_userver.plugins.grpc.client.grpc_channel "grpc_channel" fixture.
44
45 By default, gets the actual gRPC endpoint from `service_config`.
46 Override this fixture if you add gRPC server listening ports in a custom way.
47
48 @ingroup userver_testsuite_fixtures
49 """
50 components = service_config['components_manager']['components']
51 if 'grpc-server' not in components:
52 raise RuntimeError('No grpc-server component')
53 grpc_server = components['grpc-server']
54
55 if unix_socket_path := grpc_server.get('unix-socket-path'):
56 return f'unix:{unix_socket_path}'
57 if port := grpc_server.get('port'):
58 return f'localhost:{port}'
59 raise RuntimeError("No 'port' or 'unix-socket-path' found in 'grpc-server' component config")
60
61
62@pytest.fixture(scope='session')
63def grpc_service_timeout(pytestconfig) -> float:
64 """
65 Returns the gRPC timeout for the service that is set by the command
66 line option `--service-timeout`.
67
68 Override this fixture to change the way the gRPC timeout
69 is set.
70
71 @ingroup userver_testsuite_fixtures
72 """
73 return float(pytestconfig.option.service_timeout) or DEFAULT_TIMEOUT
74
75
76@pytest.fixture
78 service_client,
79 asyncexc_check,
80) -> Callable[[grpc.aio.ClientCallDetails, MessageOrStream], Awaitable[None]]:
81 """
82 Returns the function that will be called in before each gRPC request,
83 client-side.
84
85 @ingroup userver_testsuite_fixtures
86 """
87
88 async def prepare(
89 _client_call_details: grpc.aio.ClientCallDetails,
90 _request_or_stream: MessageOrStream,
91 /,
92 ) -> None:
93 asyncexc_check()
94 if hasattr(service_client, 'update_server_state'):
95 await service_client.update_server_state()
96
97 return prepare
98
99
100@pytest.fixture(scope='session')
101async def grpc_session_channel(
102 grpc_service_endpoint,
103 _grpc_channel_interceptor,
104 _grpc_channel_interceptor_asyncexc,
105):
106 async with grpc.aio.insecure_channel(
107 grpc_service_endpoint,
108 interceptors=[_grpc_channel_interceptor, _grpc_channel_interceptor_asyncexc],
109 ) as channel:
110 yield channel
111
112
113@pytest.fixture
114async def grpc_channel(
115 service_client, # For daemon setup and userver_client_cleanup
116 grpc_service_endpoint,
117 grpc_service_timeout,
118 grpc_session_channel,
119 _grpc_channel_interceptor,
120 grpc_client_prepare,
121 _grpc_channel_interceptor_asyncexc,
122 asyncexc_check,
123):
124 """
125 Returns the gRPC channel configured by the parameters from the
126 @ref pytest_userver.plugins.grpc.client.grpc_service_endpoint "grpc_service_endpoint" fixture.
127
128 @ingroup userver_testsuite_fixtures
129 """
130 _grpc_channel_interceptor.prepare_func = grpc_client_prepare
131 _grpc_channel_interceptor_asyncexc.asyncexc_check = asyncexc_check
132 try:
133 await asyncio.wait_for(
134 grpc_session_channel.channel_ready(),
135 timeout=grpc_service_timeout,
136 )
137 except asyncio.TimeoutError:
138 raise RuntimeError(
139 f'Failed to connect to remote gRPC server by address {grpc_service_endpoint}',
140 )
141 return grpc_session_channel
142
143
144@pytest.fixture(scope='session')
145def grpc_socket_path() -> Optional[pathlib.Path]:
146 """
147 Path for the UNIX socket over which testsuite will talk to the gRPC service, if it chooses to use a UNIX socket.
148
149 @see pytest_userver.plugins.grpc.client.userver_config_grpc_endpoint "userver_config_grpc_endpoint"
150
151 @ingroup userver_testsuite_fixtures
152 """
153 # Path must be as short as possible due to 108 character limitation.
154 # 'service_tempdir', for example, is typically too long.
155 with tempfile.TemporaryDirectory(prefix='userver-grpc-socket-') as name:
156 yield pathlib.Path(name) / 'grpc.sock'
157
158
159@pytest.fixture(scope='session')
161 pytestconfig,
162 grpc_service_port_fallback,
163 substitute_config_vars,
164 request,
165 choose_free_port,
166):
167 """
168 Returns a function that adjusts the static config for testsuite.
169
170 * if the original service config specifies `grpc-server.port`, and that port is taken,
171 then adjusts it to a free port;
172 * if the original service config specifies `grpc-server.unix-socket-path`,
173 then adjusts it to a tmp path
174 (see @ref pytest_userver.plugins.grpc.client.grpc_socket_path "grpc_socket_path");
175 * in service runner mode, uses the original grpc port from config or
176 @ref pytest_userver.plugins.grpc.client.grpc_service_port_fallback "grpc_service_port_fallback".
177
178 Override this fixture to change the way `grpc-server` endpoint config is patched for tests.
179
180 @ingroup userver_testsuite_fixtures
181 """
182
183 def patch_config(config_yaml, config_vars):
184 components = config_yaml['components_manager']['components']
185 grpc_server = components.get('grpc-server', None)
186 if not grpc_server:
187 return
188
189 original_grpc_server = substitute_config_vars(grpc_server, config_vars)
190
191 if pytestconfig.option.service_runner_mode:
192 grpc_server.pop('unix-socket-path', None)
193 if 'port' not in original_grpc_server:
194 grpc_server['port'] = grpc_service_port_fallback
195 config_vars['grpc_server_port'] = grpc_service_port_fallback
196 elif 'unix-socket-path' in original_grpc_server:
197 grpc_server.pop('port', None)
198 grpc_socket_path = request.getfixturevalue('grpc_socket_path')
199 grpc_server['unix-socket-path'] = str(grpc_socket_path)
200 else:
201 grpc_server.pop('unix-socket-path', None)
202 grpc_server['port'] = choose_free_port(original_grpc_server.get('port', None))
203
204 return patch_config
205
206
207# Taken from
208# https://github.com/grpc/grpc/blob/master/examples/python/interceptors/headers/generic_client_interceptor.py
210 grpc.aio.UnaryUnaryClientInterceptor,
211 grpc.aio.UnaryStreamClientInterceptor,
212 grpc.aio.StreamUnaryClientInterceptor,
213 grpc.aio.StreamStreamClientInterceptor,
214):
215 def __init__(self):
216 self.prepare_func: Optional[Callable[[grpc.aio.ClientCallDetails, MessageOrStream], Awaitable[None]]] = None
217
218 async def intercept_unary_unary(self, continuation, client_call_details, request):
219 await self.prepare_func(client_call_details, request)
220 return await continuation(client_call_details, request)
221
222 async def intercept_unary_stream(self, continuation, client_call_details, request):
223 await self.prepare_func(client_call_details, request)
224 return continuation(client_call_details, next(request))
225
226 async def intercept_stream_unary(self, continuation, client_call_details, request_iterator):
227 await self.prepare_func(client_call_details, request_iterator)
228 return await continuation(client_call_details, request_iterator)
229
230 async def intercept_stream_stream(self, continuation, client_call_details, request_iterator):
231 await self.prepare_func(client_call_details, request_iterator)
232 return continuation(client_call_details, request_iterator)
233
234
235@pytest.fixture(scope='session')
236def _grpc_channel_interceptor(daemon_scoped_mark) -> _GenericClientInterceptor:
238
239
240class _AsyncExcClientInterceptor(grpc.aio.UnaryUnaryClientInterceptor, grpc.aio.StreamUnaryClientInterceptor):
241 def __init__(self):
242 self.asyncexc_check: Optional[_AsyncExcCheck] = None
243
244 async def intercept_unary_unary(self, continuation, client_call_details, request):
245 try:
246 return await continuation(client_call_details, request)
247 finally:
248 self.asyncexc_check()
249
250 async def intercept_stream_unary(self, continuation, client_call_details, request_iterator):
251 try:
252 return await continuation(client_call_details, request_iterator)
253 finally:
254 self.asyncexc_check()
255
256
257@pytest.fixture(scope='session')
258def _grpc_channel_interceptor_asyncexc() -> _AsyncExcClientInterceptor: