userver: /home/antonyzhilin/arcadia/taxi/uservices/userver/testsuite/pytest_plugins/pytest_userver/plugins/grpc/client.py Source File
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages Concepts
client.py
1"""
2Make gRPC requests to the service.
3
4@sa @ref scripts/docs/en/userver/tutorial/grpc_service.md
5"""
6
7# pylint: disable=redefined-outer-name
8import asyncio
9import pathlib
10import tempfile
11from typing import AsyncIterable
12from typing import Awaitable
13from typing import Callable
14from typing import Optional
15from typing import Union
16
17import google.protobuf.message
18import grpc
19import pytest
20
21DEFAULT_TIMEOUT = 15.0
22USERVER_CONFIG_HOOKS = ['userver_config_grpc_endpoint']
23
24MessageOrStream = Union[google.protobuf.message.Message, AsyncIterable[google.protobuf.message.Message]]
25_AsyncExcCheck = Callable[[], None]
26
27
28@pytest.fixture(scope='session')
29def grpc_service_port(service_config) -> Optional[int]:
30 """
31 Returns the gRPC listener port number of the service that is set in the
32 static configuration file.
33
34 Override this fixture to change the way the gRPC listener port number
35 is retrieved by the testsuite for tests.
36
37 @ingroup userver_testsuite_fixtures
38 """
39 components = service_config['components_manager']['components']
40 if 'grpc-server' not in components:
41 raise RuntimeError('No grpc-server component')
42 return components['grpc-server'].get('port', None)
43
44
45@pytest.fixture(scope='session')
47 """
48 Returns the gRPC port that should be used in service runner mode in case
49 no port is specified in the source config_yaml.
50
51 @ingroup userver_testsuite_fixtures
52 """
53 return 11080
54
55
56@pytest.fixture(scope='session')
57def grpc_service_endpoint(service_config, grpc_service_port) -> str:
58 """
59 Returns the gRPC endpoint of the service.
60
61 Override this fixture to change the way the gRPC endpoint
62 is retrieved by the testsuite for tests.
63
64 @ingroup userver_testsuite_fixtures
65 """
66 components = service_config['components_manager']['components']
67 if 'grpc-server' not in components:
68 raise RuntimeError('No grpc-server component')
69 grpc_server_unix_socket = components['grpc-server'].get('unix-socket-path')
70 return (
71 f'unix:{grpc_server_unix_socket}' if grpc_server_unix_socket is not None else f'localhost:{grpc_service_port}'
72 )
73
74
75@pytest.fixture(scope='session')
76def grpc_service_timeout(pytestconfig) -> float:
77 """
78 Returns the gRPC timeout for the service that is set by the command
79 line option `--service-timeout`.
80
81 Override this fixture to change the way the gRPC timeout
82 is set.
83
84 @ingroup userver_testsuite_fixtures
85 """
86 return float(pytestconfig.option.service_timeout) or DEFAULT_TIMEOUT
87
88
89@pytest.fixture
91 service_client,
92 asyncexc_check,
93) -> Callable[[grpc.aio.ClientCallDetails, MessageOrStream], Awaitable[None]]:
94 """
95 Returns the function that will be called in before each gRPC request,
96 client-side.
97
98 @ingroup userver_testsuite_fixtures
99 """
100
101 async def prepare(
102 _client_call_details: grpc.aio.ClientCallDetails,
103 _request_or_stream: MessageOrStream,
104 /,
105 ) -> None:
106 asyncexc_check()
107 if hasattr(service_client, 'update_server_state'):
108 await service_client.update_server_state()
109
110 return prepare
111
112
113@pytest.fixture(scope='session')
114async def grpc_session_channel(
115 grpc_service_endpoint,
116 _grpc_channel_interceptor,
117 _grpc_channel_interceptor_asyncexc,
118):
119 async with grpc.aio.insecure_channel(
120 grpc_service_endpoint,
121 interceptors=[_grpc_channel_interceptor, _grpc_channel_interceptor_asyncexc],
122 ) as channel:
123 yield channel
124
125
126@pytest.fixture
127async def grpc_channel(
128 service_client, # For daemon setup and userver_client_cleanup
129 grpc_service_endpoint,
130 grpc_service_timeout,
131 grpc_session_channel,
132 _grpc_channel_interceptor,
133 grpc_client_prepare,
134 _grpc_channel_interceptor_asyncexc,
135 asyncexc_check,
136):
137 """
138 Returns the gRPC channel configured by the parameters from the
139 @ref pytest_userver.plugins.grpc.client.grpc_service_endpoint "grpc_service_endpoint" fixture.
140
141 @ingroup userver_testsuite_fixtures
142 """
143 _grpc_channel_interceptor.prepare_func = grpc_client_prepare
144 _grpc_channel_interceptor_asyncexc.asyncexc_check = asyncexc_check
145 try:
146 await asyncio.wait_for(
147 grpc_session_channel.channel_ready(),
148 timeout=grpc_service_timeout,
149 )
150 except asyncio.TimeoutError:
151 raise RuntimeError(
152 f'Failed to connect to remote gRPC server by address {grpc_service_endpoint}',
153 )
154 return grpc_session_channel
155
156
157@pytest.fixture(scope='session')
158def grpc_socket_path() -> Optional[pathlib.Path]:
159 """
160 Path for the UNIX socket over which testsuite will talk to the gRPC service, if it chooses to use a UNIX socket.
161
162 @see pytest_userver.plugins.grpc.client.userver_config_grpc_endpoint "userver_config_grpc_endpoint"
163
164 @ingroup userver_testsuite_fixtures
165 """
166 # Path must be as short as possible due to 108 character limitation.
167 # 'service_tempdir', for example, is typically too long.
168 with tempfile.TemporaryDirectory(prefix='userver-grpc-socket-') as name:
169 yield pathlib.Path(name) / 'grpc.sock'
170
171
172@pytest.fixture(scope='session')
174 pytestconfig,
175 grpc_service_port_fallback,
176 substitute_config_vars,
177 request,
178 choose_free_port,
179):
180 """
181 Returns a function that adjusts the static config for testsuite.
182
183 * if the original service config specifies `grpc-server.port`, and that port is taken,
184 then adjusts it to a free port;
185 * if the original service config specifies `grpc-server.unix-socket-path`,
186 then adjusts it to a tmp path
187 (see @ref pytest_userver.plugins.grpc.client.grpc_socket_path "grpc_socket_path");
188 * in service runner mode, uses the original grpc port from config or
189 @ref pytest_userver.plugins.grpc.client.grpc_service_port_fallback "grpc_service_port_fallback".
190
191 @ingroup userver_testsuite_fixtures
192 """
193
194 def patch_config(config_yaml, config_vars):
195 components = config_yaml['components_manager']['components']
196 grpc_server = components.get('grpc-server', None)
197 if not grpc_server:
198 return
199
200 original_grpc_server = substitute_config_vars(grpc_server, config_vars)
201
202 if pytestconfig.option.service_runner_mode:
203 grpc_server.pop('unix-socket-path', None)
204 if 'port' not in original_grpc_server:
205 grpc_server['port'] = grpc_service_port_fallback
206 config_vars['grpc_server_port'] = grpc_service_port_fallback
207 elif 'unix-socket-path' in original_grpc_server:
208 grpc_server.pop('port', None)
209 grpc_socket_path = request.getfixturevalue('grpc_socket_path')
210 grpc_server['unix-socket-path'] = str(grpc_socket_path)
211 else:
212 grpc_server.pop('unix-socket-path', None)
213 grpc_server['port'] = choose_free_port(original_grpc_server.get('port', None))
214
215 return patch_config
216
217
218# Taken from
219# https://github.com/grpc/grpc/blob/master/examples/python/interceptors/headers/generic_client_interceptor.py
221 grpc.aio.UnaryUnaryClientInterceptor,
222 grpc.aio.UnaryStreamClientInterceptor,
223 grpc.aio.StreamUnaryClientInterceptor,
224 grpc.aio.StreamStreamClientInterceptor,
225):
226 def __init__(self):
227 self.prepare_func: Optional[Callable[[grpc.aio.ClientCallDetails, MessageOrStream], Awaitable[None]]] = None
228
229 async def intercept_unary_unary(self, continuation, client_call_details, request):
230 await self.prepare_func(client_call_details, request)
231 return await continuation(client_call_details, request)
232
233 async def intercept_unary_stream(self, continuation, client_call_details, request):
234 await self.prepare_func(client_call_details, request)
235 return continuation(client_call_details, next(request))
236
237 async def intercept_stream_unary(self, continuation, client_call_details, request_iterator):
238 await self.prepare_func(client_call_details, request_iterator)
239 return await continuation(client_call_details, request_iterator)
240
241 async def intercept_stream_stream(self, continuation, client_call_details, request_iterator):
242 await self.prepare_func(client_call_details, request_iterator)
243 return continuation(client_call_details, request_iterator)
244
245
246@pytest.fixture(scope='session')
247def _grpc_channel_interceptor(daemon_scoped_mark) -> _GenericClientInterceptor:
249
250
251class _AsyncExcClientInterceptor(grpc.aio.UnaryUnaryClientInterceptor, grpc.aio.StreamUnaryClientInterceptor):
252 def __init__(self):
253 self.asyncexc_check: Optional[_AsyncExcCheck] = None
254
255 async def intercept_unary_unary(self, continuation, client_call_details, request):
256 try:
257 return await continuation(client_call_details, request)
258 finally:
259 self.asyncexc_check()
260
261 async def intercept_stream_unary(self, continuation, client_call_details, request_iterator):
262 try:
263 return await continuation(client_call_details, request_iterator)
264 finally:
265 self.asyncexc_check()
266
267
268@pytest.fixture(scope='session')
269def _grpc_channel_interceptor_asyncexc() -> _AsyncExcClientInterceptor: