80) -> Callable[[grpc.aio.ClientCallDetails, MessageOrStream], Awaitable[None]]:
82 Returns the function that will be called in before each gRPC request,
85 @ingroup userver_testsuite_fixtures
89 _client_call_details: grpc.aio.ClientCallDetails,
90 _request_or_stream: MessageOrStream,
94 if hasattr(service_client,
'update_server_state'):
95 await service_client.update_server_state()
100@pytest.fixture(scope='session')
116 grpc_service_endpoint,
117 grpc_service_timeout,
118 grpc_session_channel,
119 _grpc_channel_interceptor,
121 _grpc_channel_interceptor_asyncexc,
125 Returns the gRPC channel configured by the parameters from the
126 @ref pytest_userver.plugins.grpc.client.grpc_service_endpoint "grpc_service_endpoint" fixture.
128 @ingroup userver_testsuite_fixtures
130 _grpc_channel_interceptor.prepare_func = grpc_client_prepare
131 _grpc_channel_interceptor_asyncexc.asyncexc_check = asyncexc_check
133 await asyncio.wait_for(
134 grpc_session_channel.channel_ready(),
135 timeout=grpc_service_timeout,
137 except asyncio.TimeoutError:
139 f
'Failed to connect to remote gRPC server by address {grpc_service_endpoint}',
141 return grpc_session_channel
144@pytest.fixture(scope='session')
162 grpc_service_port_fallback,
163 substitute_config_vars,
168 Returns a function that adjusts the static config for testsuite.
170 * if the original service config specifies `grpc-server.port`, and that port is taken,
171 then adjusts it to a free port;
172 * if the original service config specifies `grpc-server.unix-socket-path`,
173 then adjusts it to a tmp path
174 (see @ref pytest_userver.plugins.grpc.client.grpc_socket_path "grpc_socket_path");
175 * in service runner mode, uses the original grpc port from config or
176 @ref pytest_userver.plugins.grpc.client.grpc_service_port_fallback "grpc_service_port_fallback".
178 Override this fixture to change the way `grpc-server` endpoint config is patched for tests.
180 @ingroup userver_testsuite_fixtures
183 def patch_config(config_yaml, config_vars):
184 components = config_yaml[
'components_manager'][
'components']
185 grpc_server = components.get(
'grpc-server',
None)
189 original_grpc_server = substitute_config_vars(grpc_server, config_vars)
191 if pytestconfig.option.service_runner_mode:
192 grpc_server.pop(
'unix-socket-path',
None)
193 if 'port' not in original_grpc_server:
194 grpc_server[
'port'] = grpc_service_port_fallback
195 config_vars[
'grpc_server_port'] = grpc_service_port_fallback
196 elif 'unix-socket-path' in original_grpc_server:
197 grpc_server.pop(
'port',
None)
198 grpc_socket_path = request.getfixturevalue(
'grpc_socket_path')
199 grpc_server[
'unix-socket-path'] = str(grpc_socket_path)
201 grpc_server.pop(
'unix-socket-path',
None)
202 grpc_server[
'port'] = choose_free_port(original_grpc_server.get(
'port',
None))
210 grpc.aio.UnaryUnaryClientInterceptor,
211 grpc.aio.UnaryStreamClientInterceptor,
212 grpc.aio.StreamUnaryClientInterceptor,
213 grpc.aio.StreamStreamClientInterceptor,
216 self.prepare_func: Optional[Callable[[grpc.aio.ClientCallDetails, MessageOrStream], Awaitable[
None]]] =
None
218 async def intercept_unary_unary(self, continuation, client_call_details, request):
219 await self.prepare_func(client_call_details, request)
220 return await continuation(client_call_details, request)
222 async def intercept_unary_stream(self, continuation, client_call_details, request):
223 await self.prepare_func(client_call_details, request)
224 return continuation(client_call_details, next(request))
226 async def intercept_stream_unary(self, continuation, client_call_details, request_iterator):
227 await self.prepare_func(client_call_details, request_iterator)
228 return await continuation(client_call_details, request_iterator)
230 async def intercept_stream_stream(self, continuation, client_call_details, request_iterator):
231 await self.prepare_func(client_call_details, request_iterator)
232 return continuation(client_call_details, request_iterator)
235@pytest.fixture(scope='session')
242 self.asyncexc_check: Optional[_AsyncExcCheck] =
None
244 async def intercept_unary_unary(self, continuation, client_call_details, request):
246 return await continuation(client_call_details, request)
248 self.asyncexc_check()
250 async def intercept_stream_unary(self, continuation, client_call_details, request_iterator):
252 return await continuation(client_call_details, request_iterator)
254 self.asyncexc_check()
257@pytest.fixture(scope='session')