145 grpc_service_port_fallback,
146 substitute_config_vars,
151 Returns a function that adjusts the static config for testsuite.
153 * if the original service config specifies `grpc-server.port`, and that port is taken,
154 then adjusts it to a free port;
155 * if the original service config specifies `grpc-server.unix-socket-path`,
156 then adjusts it to a tmp path
157 (see @ref pytest_userver.plugins.grpc.client.grpc_socket_path "grpc_socket_path");
158 * in service runner mode, uses the original grpc port from config or
159 @ref pytest_userver.plugins.grpc.client.grpc_service_port_fallback "grpc_service_port_fallback".
161 Override this fixture to change the way `grpc-server` endpoint config is patched for tests.
163 @ingroup userver_testsuite_fixtures
166 def patch_config(config_yaml, config_vars):
167 components = config_yaml[
'components_manager'][
'components']
168 grpc_server = components.get(
'grpc-server',
None)
172 original_grpc_server = substitute_config_vars(grpc_server, config_vars)
174 if pytestconfig.option.service_runner_mode:
175 grpc_server.pop(
'unix-socket-path',
None)
176 if 'port' not in original_grpc_server:
177 grpc_server[
'port'] = grpc_service_port_fallback
178 config_vars[
'grpc_server_port'] = grpc_service_port_fallback
179 elif 'unix-socket-path' in original_grpc_server:
180 grpc_server.pop(
'port',
None)
181 grpc_socket_path = request.getfixturevalue(
'grpc_socket_path')
182 grpc_server[
'unix-socket-path'] = str(grpc_socket_path)
184 grpc_server.pop(
'unix-socket-path',
None)
185 grpc_server[
'port'] = choose_free_port(original_grpc_server.get(
'port',
None))
193def pytest_addhooks(pluginmanager: pytest.PytestPluginManager):
194 pluginmanager.add_hookspecs(_hookspec)
197class _UpdateServerStateInterceptor(
198 grpc.aio.UnaryUnaryClientInterceptor,
199 grpc.aio.UnaryStreamClientInterceptor,
200 grpc.aio.StreamUnaryClientInterceptor,
201 grpc.aio.StreamStreamClientInterceptor,
203 def __init__(self, service_client: pytest_userver.
client.Client, asyncexc_check: _AsyncExcCheck):
204 self._service_client = service_client
205 self._asyncexc_check = asyncexc_check
207 async def _before_call_hook(self) -> None:
208 self._asyncexc_check()
209 if hasattr(self._service_client,
'update_server_state'):
210 await self._service_client.update_server_state()
212 async def intercept_unary_unary(
214 continuation: Callable[[grpc.aio.ClientCallDetails, Any], Awaitable[grpc.aio.UnaryUnaryCall]],
215 client_call_details: grpc.aio.ClientCallDetails,
217 ) -> grpc.aio.UnaryUnaryCall:
218 await self._before_call_hook()
220 return await continuation(client_call_details, request)
222 self._asyncexc_check()
225 async def intercept_unary_stream(
227 continuation: Callable[[grpc.aio.ClientCallDetails, Any], grpc.aio.UnaryStreamCall],
228 client_call_details: grpc.aio.ClientCallDetails,
230 ) -> AsyncIterator[Any]:
231 await self._before_call_hook()
232 call = await continuation(client_call_details, request)
234 async def response_stream() -> AsyncIterator[Any]:
236 async for response
in call:
239 self._asyncexc_check()
241 return response_stream()
243 async def intercept_stream_unary(
245 continuation: Callable[[grpc.aio.ClientCallDetails, AsyncIterator[Any]], Awaitable[grpc.aio.StreamUnaryCall]],
246 client_call_details: grpc.aio.ClientCallDetails,
247 request_iterator: AsyncIterator[Any],
248 ) -> grpc.aio.StreamUnaryCall:
249 await self._before_call_hook()
251 return await continuation(client_call_details, request_iterator)
253 self._asyncexc_check()
256 async def intercept_stream_stream(
258 continuation: Callable[[grpc.aio.ClientCallDetails, AsyncIterator[Any]], grpc.aio.StreamStreamCall],
259 client_call_details: grpc.aio.ClientCallDetails,
260 request_iterator: AsyncIterator[Any],
261 ) -> AsyncIterator[Any]:
262 self._asyncexc_check()
263 await self._before_call_hook()
264 call = await continuation(client_call_details, request_iterator)
266 async def response_stream() -> AsyncIterator[Any]:
268 async for response
in call:
271 self._asyncexc_check()
273 return response_stream()
276def pytest_grpc_client_interceptors(request: pytest.FixtureRequest) -> Sequence[grpc.aio.ClientInterceptor]:
278 _UpdateServerStateInterceptor(
279 request.getfixturevalue(
'service_client'),
280 request.getfixturevalue(
'asyncexc_check'),
285def _filter_interceptors(
286 interceptors: Sequence[grpc.aio.ClientInterceptor], desired_type: type[grpc.aio.ClientInterceptor]
287) -> list[grpc.aio.ClientInterceptor]:
288 return [interceptor
for interceptor
in interceptors
if isinstance(interceptor, desired_type)]
291def _set_client_interceptors(channel: grpc.aio.Channel, interceptors: Sequence[grpc.aio.ClientInterceptor]) ->
None:
293 Allows to set interceptors dynamically while reusing the same underlying channel,
294 which is something grpc-io currently doesn't support.
296 Also fixes the bug: multi-inheritance interceptors are only registered for first matching type
297 https://github.com/grpc/grpc/issues/31442
299 channel._unary_unary_interceptors = _filter_interceptors(interceptors, grpc.aio.UnaryUnaryClientInterceptor)
300 channel._unary_stream_interceptors = _filter_interceptors(interceptors, grpc.aio.UnaryStreamClientInterceptor)
301 channel._stream_unary_interceptors = _filter_interceptors(interceptors, grpc.aio.StreamUnaryClientInterceptor)
302 channel._stream_stream_interceptors = _filter_interceptors(interceptors, grpc.aio.StreamStreamClientInterceptor)