93) -> Callable[[grpc.aio.ClientCallDetails, MessageOrStream], Awaitable[None]]:
95 Returns the function that will be called in before each gRPC request,
98 @ingroup userver_testsuite_fixtures
102 _client_call_details: grpc.aio.ClientCallDetails,
103 _request_or_stream: MessageOrStream,
107 if hasattr(service_client,
'update_server_state'):
108 await service_client.update_server_state()
113@pytest.fixture(scope='session')
129 grpc_service_endpoint,
130 grpc_service_timeout,
131 grpc_session_channel,
132 _grpc_channel_interceptor,
134 _grpc_channel_interceptor_asyncexc,
138 Returns the gRPC channel configured by the parameters from the
139 @ref pytest_userver.plugins.grpc.client.grpc_service_endpoint "grpc_service_endpoint" fixture.
141 @ingroup userver_testsuite_fixtures
143 _grpc_channel_interceptor.prepare_func = grpc_client_prepare
144 _grpc_channel_interceptor_asyncexc.asyncexc_check = asyncexc_check
146 await asyncio.wait_for(
147 grpc_session_channel.channel_ready(),
148 timeout=grpc_service_timeout,
150 except asyncio.TimeoutError:
152 f
'Failed to connect to remote gRPC server by address {grpc_service_endpoint}',
154 return grpc_session_channel
157@pytest.fixture(scope='session')
175 grpc_service_port_fallback,
176 substitute_config_vars,
181 Returns a function that adjusts the static config for testsuite.
183 * if the original service config specifies `grpc-server.port`, and that port is taken,
184 then adjusts it to a free port;
185 * if the original service config specifies `grpc-server.unix-socket-path`,
186 then adjusts it to a tmp path
187 (see @ref pytest_userver.plugins.grpc.client.grpc_socket_path "grpc_socket_path");
188 * in service runner mode, uses the original grpc port from config or
189 @ref pytest_userver.plugins.grpc.client.grpc_service_port_fallback "grpc_service_port_fallback".
191 @ingroup userver_testsuite_fixtures
194 def patch_config(config_yaml, config_vars):
195 components = config_yaml[
'components_manager'][
'components']
196 grpc_server = components.get(
'grpc-server',
None)
200 original_grpc_server = substitute_config_vars(grpc_server, config_vars)
202 if pytestconfig.option.service_runner_mode:
203 grpc_server.pop(
'unix-socket-path',
None)
204 if 'port' not in original_grpc_server:
205 grpc_server[
'port'] = grpc_service_port_fallback
206 config_vars[
'grpc_server_port'] = grpc_service_port_fallback
207 elif 'unix-socket-path' in original_grpc_server:
208 grpc_server.pop(
'port',
None)
209 grpc_socket_path = request.getfixturevalue(
'grpc_socket_path')
210 grpc_server[
'unix-socket-path'] = str(grpc_socket_path)
212 grpc_server.pop(
'unix-socket-path',
None)
213 grpc_server[
'port'] = choose_free_port(original_grpc_server.get(
'port',
None))
221 grpc.aio.UnaryUnaryClientInterceptor,
222 grpc.aio.UnaryStreamClientInterceptor,
223 grpc.aio.StreamUnaryClientInterceptor,
224 grpc.aio.StreamStreamClientInterceptor,
227 self.
prepare_func: Optional[Callable[[grpc.aio.ClientCallDetails, MessageOrStream], Awaitable[
None]]] =
None
229 async def intercept_unary_unary(self, continuation, client_call_details, request):
231 return await continuation(client_call_details, request)
233 async def intercept_unary_stream(self, continuation, client_call_details, request):
235 return continuation(client_call_details, next(request))
237 async def intercept_stream_unary(self, continuation, client_call_details, request_iterator):
238 await self.
prepare_func(client_call_details, request_iterator)
239 return await continuation(client_call_details, request_iterator)
241 async def intercept_stream_stream(self, continuation, client_call_details, request_iterator):
242 await self.
prepare_func(client_call_details, request_iterator)
243 return continuation(client_call_details, request_iterator)
246@pytest.fixture(scope='session')
255 async def intercept_unary_unary(self, continuation, client_call_details, request):
257 return await continuation(client_call_details, request)
261 async def intercept_stream_unary(self, continuation, client_call_details, request_iterator):
263 return await continuation(client_call_details, request_iterator)
268@pytest.fixture(scope='session')