147 grpc_service_port_fallback,
148 substitute_config_vars,
153 Returns a function that adjusts the static config for testsuite.
155 * if the original service config specifies `grpc-server.port`, and that port is taken,
156 then adjusts it to a free port;
157 * if the original service config specifies `grpc-server.unix-socket-path`,
158 then adjusts it to a tmp path
159 (see @ref pytest_userver.plugins.grpc.client.grpc_socket_path "grpc_socket_path");
160 * in service runner mode, uses the original grpc port from config or
161 @ref pytest_userver.plugins.grpc.client.grpc_service_port_fallback "grpc_service_port_fallback".
163 Override this fixture to change the way `grpc-server` endpoint config is patched for tests.
165 @ingroup userver_testsuite_fixtures
168 def patch_config(config_yaml, config_vars):
169 components = config_yaml[
'components_manager'][
'components']
170 grpc_server = components.get(
'grpc-server',
None)
174 original_grpc_server = substitute_config_vars(grpc_server, config_vars)
176 if pytestconfig.option.service_runner_mode:
177 grpc_server.pop(
'unix-socket-path',
None)
178 if 'port' not in original_grpc_server:
179 grpc_server[
'port'] = grpc_service_port_fallback
180 config_vars[
'grpc_server_port'] = grpc_service_port_fallback
181 elif 'unix-socket-path' in original_grpc_server:
182 grpc_server.pop(
'port',
None)
183 grpc_socket_path = request.getfixturevalue(
'grpc_socket_path')
184 grpc_server[
'unix-socket-path'] = str(grpc_socket_path)
186 grpc_server.pop(
'unix-socket-path',
None)
187 grpc_server[
'port'] = choose_free_port(original_grpc_server.get(
'port',
None))
195def pytest_addhooks(pluginmanager: pytest.PytestPluginManager):
196 pluginmanager.add_hookspecs(_hookspec)
199class _AsyncExcCheckInterceptor(
200 grpc.aio.UnaryUnaryClientInterceptor,
201 grpc.aio.UnaryStreamClientInterceptor,
202 grpc.aio.StreamUnaryClientInterceptor,
203 grpc.aio.StreamStreamClientInterceptor,
205 def __init__(self, asyncexc_check: _AsyncExcCheck):
206 self._asyncexc_check = asyncexc_check
209 async def intercept_unary_unary(
211 continuation: Callable[[grpc.aio.ClientCallDetails, Any], Awaitable[grpc.aio.UnaryUnaryCall]],
212 client_call_details: grpc.aio.ClientCallDetails,
214 ) -> grpc.aio.UnaryUnaryCall:
215 self._asyncexc_check()
217 return await continuation(client_call_details, request)
219 self._asyncexc_check()
223 async def intercept_unary_stream(
225 continuation: Callable[[grpc.aio.ClientCallDetails, Any], grpc.aio.UnaryStreamCall],
226 client_call_details: grpc.aio.ClientCallDetails,
228 ) -> AsyncIterator[Any]:
229 self._asyncexc_check()
230 call = await continuation(client_call_details, request)
232 async def response_stream() -> AsyncIterator[Any]:
234 async for response
in call:
237 self._asyncexc_check()
239 return response_stream()
242 async def intercept_stream_unary(
244 continuation: Callable[[grpc.aio.ClientCallDetails, AsyncIterator[Any]], Awaitable[grpc.aio.StreamUnaryCall]],
245 client_call_details: grpc.aio.ClientCallDetails,
246 request_iterator: AsyncIterator[Any],
247 ) -> grpc.aio.StreamUnaryCall:
248 self._asyncexc_check()
250 return await continuation(client_call_details, request_iterator)
252 self._asyncexc_check()
256 async def intercept_stream_stream(
258 continuation: Callable[[grpc.aio.ClientCallDetails, AsyncIterator[Any]], grpc.aio.StreamStreamCall],
259 client_call_details: grpc.aio.ClientCallDetails,
260 request_iterator: AsyncIterator[Any],
261 ) -> AsyncIterator[Any]:
262 self._asyncexc_check()
263 call = await continuation(client_call_details, request_iterator)
265 async def response_stream() -> AsyncIterator[Any]:
267 async for response
in call:
270 self._asyncexc_check()
272 return response_stream()
276 def __init__(self, service_client: pytest_userver.
client.Client):
277 self._service_client = service_client
280 async def pre_call_hook(self, client_call_details: grpc.aio.ClientCallDetails) ->
None:
281 if hasattr(self._service_client,
'update_server_state'):
282 await self._service_client.update_server_state()
285def pytest_grpc_client_interceptors(request: pytest.FixtureRequest) -> Sequence[grpc.aio.ClientInterceptor]:
287 _AsyncExcCheckInterceptor(request.getfixturevalue(
'asyncexc_check')),
288 _UpdateServerStateInterceptor(request.getfixturevalue(
'service_client')),
292def _filter_interceptors(
293 interceptors: Sequence[grpc.aio.ClientInterceptor], desired_type: type[grpc.aio.ClientInterceptor]
294) -> list[grpc.aio.ClientInterceptor]:
295 return [interceptor
for interceptor
in interceptors
if isinstance(interceptor, desired_type)]
298def _set_client_interceptors(channel: grpc.aio.Channel, interceptors: Sequence[grpc.aio.ClientInterceptor]) ->
None:
300 Allows to set interceptors dynamically while reusing the same underlying channel,
301 which is something grpc-io currently doesn't support.
303 Also fixes the bug: multi-inheritance interceptors are only registered for first matching type
304 https://github.com/grpc/grpc/issues/31442
306 channel._unary_unary_interceptors = _filter_interceptors(interceptors, grpc.aio.UnaryUnaryClientInterceptor)
307 channel._unary_stream_interceptors = _filter_interceptors(interceptors, grpc.aio.UnaryStreamClientInterceptor)
308 channel._stream_unary_interceptors = _filter_interceptors(interceptors, grpc.aio.StreamUnaryClientInterceptor)
309 channel._stream_stream_interceptors = _filter_interceptors(interceptors, grpc.aio.StreamStreamClientInterceptor)