78 service_client, _testsuite_client_config: client.TestsuiteClientConfig,
79) -> Callable[[grpc.aio.ClientCallDetails], Awaitable[
None]]:
81 Returns the function that will be called in before each gRPC request,
84 @ingroup userver_testsuite_fixtures
88 _client_call_details: grpc.aio.ClientCallDetails, /,
90 if isinstance(service_client, client.AiohttpClient):
91 await service_client.update_server_state()
96@pytest.fixture(scope='session')
108 grpc_service_endpoint,
110 grpc_service_timeout,
111 grpc_session_channel,
112 _grpc_channel_interceptor,
116 Returns the gRPC channel configured by the parameters from the
117 @ref plugins.grpc.grpc_service_endpoint "grpc_service_endpoint" fixture.
119 @ingroup userver_testsuite_fixtures
121 _grpc_channel_interceptor.prepare_func = grpc_client_prepare
123 await asyncio.wait_for(
124 grpc_session_channel.channel_ready(), timeout=grpc_service_timeout,
126 except asyncio.TimeoutError:
128 f
'Failed to connect to remote gRPC server by '
129 f
'address {grpc_service_endpoint}',
131 return grpc_session_channel
164 grpc.aio.UnaryUnaryClientInterceptor,
165 grpc.aio.UnaryStreamClientInterceptor,
166 grpc.aio.StreamUnaryClientInterceptor,
167 grpc.aio.StreamStreamClientInterceptor,
170 self.prepare_func: Optional[
171 Callable[[grpc.aio.ClientCallDetails], Awaitable[
None]]
174 async def intercept_unary_unary(
175 self, continuation, client_call_details, request,
177 await self.prepare_func(client_call_details)
178 return await continuation(client_call_details, request)
180 async def intercept_unary_stream(
181 self, continuation, client_call_details, request,
183 await self.prepare_func(client_call_details)
184 return await continuation(client_call_details, next(request))
186 async def intercept_stream_unary(
187 self, continuation, client_call_details, request_iterator,
189 await self.prepare_func(client_call_details)
190 return await continuation(client_call_details, request_iterator)
192 async def intercept_stream_stream(
193 self, continuation, client_call_details, request_iterator,
195 await self.prepare_func(client_call_details)
196 return await continuation(client_call_details, request_iterator)
199@pytest.fixture(scope='session')