userver: /data/code/userver/testsuite/pytest_plugins/pytest_userver/plugins/grpc/client.py Source File
Loading...
Searching...
No Matches
client.py
1"""
2Make gRPC requests to the service.
3
4@sa @ref scripts/docs/en/userver/tutorial/grpc_service.md
5"""
6
7# pylint: disable=redefined-outer-name
8import asyncio
9from collections.abc import AsyncIterator
10from collections.abc import Awaitable
11from collections.abc import Callable
12from collections.abc import Generator
13from collections.abc import Sequence
14import itertools
15import pathlib
16import tempfile
17from typing import Any
18from typing import TypeAlias
19
20import grpc
21import grpc.aio
22import pytest
23
25from . import _hookspec
26
27DEFAULT_TIMEOUT = 15.0
28USERVER_CONFIG_HOOKS = ['userver_config_grpc_endpoint']
29
30_AsyncExcCheck: TypeAlias = Callable[[], None]
31
32
33@pytest.fixture(scope='session')
35 """
36 Returns the gRPC port that should be used in service runner mode in case
37 no port is specified in the source config_yaml.
38
39 @ingroup userver_testsuite_fixtures
40 """
41 return 11080
42
43
44@pytest.fixture(scope='session')
45def grpc_service_endpoint(service_config) -> str:
46 """
47 Returns the gRPC endpoint of the service.
48 Used by @ref pytest_userver.plugins.grpc.client.grpc_channel "grpc_channel" fixture.
49
50 By default, gets the actual gRPC endpoint from `service_config`.
51 Override this fixture if you add gRPC server listening ports in a custom way.
52
53 @ingroup userver_testsuite_fixtures
54 """
55 components = service_config['components_manager']['components']
56 if 'grpc-server' not in components:
57 raise RuntimeError('No grpc-server component')
58 grpc_server = components['grpc-server']
59
60 if unix_socket_path := grpc_server.get('unix-socket-path'):
61 return f'unix:{unix_socket_path}'
62 if port := grpc_server.get('port'):
63 return f'localhost:{port}'
64 raise RuntimeError("No 'port' or 'unix-socket-path' found in 'grpc-server' component config")
65
66
67@pytest.fixture(scope='session')
68def grpc_service_timeout(pytestconfig) -> float:
69 """
70 Returns the gRPC timeout for the service that is set by the command
71 line option `--service-timeout`.
72
73 Override this fixture to change the way the gRPC timeout
74 is set.
75
76 @ingroup userver_testsuite_fixtures
77 """
78 return float(pytestconfig.option.service_timeout) or DEFAULT_TIMEOUT
79
80
81@pytest.fixture(scope='session')
82async def grpc_session_channel(grpc_service_endpoint):
83 async with grpc.aio.insecure_channel(grpc_service_endpoint) as channel:
84 yield channel
85
86
87@pytest.fixture
88async def grpc_channel(
89 service_client, # For daemon setup and userver_client_cleanup
90 grpc_service_endpoint,
91 grpc_service_timeout,
92 grpc_session_channel,
93 request,
94):
95 """
96 Returns the gRPC channel configured by the parameters from the
97 @ref pytest_userver.plugins.grpc.client.grpc_service_endpoint "grpc_service_endpoint" fixture.
98
99 You can add interceptors to the channel by implementing the
100 @ref pytest_userver.plugins.grpc._hookspec.pytest_grpc_client_interceptors "pytest_grpc_client_interceptors"
101 hook in your pytest plugin or initial (root) conftest.
102
103 @ingroup userver_testsuite_fixtures
104 """
105 interceptors = request.config.hook.pytest_grpc_client_interceptors(request=request)
106 interceptors_list = list(itertools.chain.from_iterable(interceptors))
107 # Sanity check: we have at least one "builtin" interceptor.
108 assert len(interceptors_list) != 0
109
110 try:
111 await asyncio.wait_for(
112 grpc_session_channel.channel_ready(),
113 timeout=grpc_service_timeout,
114 )
115 except asyncio.TimeoutError:
116 raise RuntimeError(
117 f'Failed to connect to remote gRPC server by address {grpc_service_endpoint}',
118 )
119
120 _set_client_interceptors(grpc_session_channel, interceptors_list)
121 try:
122 yield grpc_session_channel
123 finally:
124 _set_client_interceptors(grpc_session_channel, [])
125
126
127@pytest.fixture(scope='session')
128def grpc_socket_path() -> Generator[pathlib.Path, None, None]:
129 """
130 Path for the UNIX socket over which testsuite will talk to the gRPC service, if it chooses to use a UNIX socket.
131
132 @see pytest_userver.plugins.grpc.client.userver_config_grpc_endpoint "userver_config_grpc_endpoint"
133
134 @ingroup userver_testsuite_fixtures
135 """
136 # Path must be as short as possible due to 108 character limitation.
137 # 'service_tempdir', for example, is typically too long.
138 with tempfile.TemporaryDirectory(prefix='userver-grpc-socket-') as name:
139 yield pathlib.Path(name) / 'grpc.sock'
140
141
142@pytest.fixture(scope='session')
144 pytestconfig,
145 grpc_service_port_fallback,
146 substitute_config_vars,
147 request,
148 choose_free_port,
149):
150 """
151 Returns a function that adjusts the static config for testsuite.
152
153 * if the original service config specifies `grpc-server.port`, and that port is taken,
154 then adjusts it to a free port;
155 * if the original service config specifies `grpc-server.unix-socket-path`,
156 then adjusts it to a tmp path
157 (see @ref pytest_userver.plugins.grpc.client.grpc_socket_path "grpc_socket_path");
158 * in service runner mode, uses the original grpc port from config or
159 @ref pytest_userver.plugins.grpc.client.grpc_service_port_fallback "grpc_service_port_fallback".
160
161 Override this fixture to change the way `grpc-server` endpoint config is patched for tests.
162
163 @ingroup userver_testsuite_fixtures
164 """
165
166 def patch_config(config_yaml, config_vars):
167 components = config_yaml['components_manager']['components']
168 grpc_server = components.get('grpc-server', None)
169 if not grpc_server:
170 return
171
172 original_grpc_server = substitute_config_vars(grpc_server, config_vars)
173
174 if pytestconfig.option.service_runner_mode:
175 grpc_server.pop('unix-socket-path', None)
176 if 'port' not in original_grpc_server:
177 grpc_server['port'] = grpc_service_port_fallback
178 config_vars['grpc_server_port'] = grpc_service_port_fallback
179 elif 'unix-socket-path' in original_grpc_server:
180 grpc_server.pop('port', None)
181 grpc_socket_path = request.getfixturevalue('grpc_socket_path')
182 grpc_server['unix-socket-path'] = str(grpc_socket_path)
183 else:
184 grpc_server.pop('unix-socket-path', None)
185 grpc_server['port'] = choose_free_port(original_grpc_server.get('port', None))
186
187 return patch_config
188
189
190# @cond
191
192
193def pytest_addhooks(pluginmanager: pytest.PytestPluginManager):
194 pluginmanager.add_hookspecs(_hookspec)
195
196
197class _UpdateServerStateInterceptor(
198 grpc.aio.UnaryUnaryClientInterceptor,
199 grpc.aio.UnaryStreamClientInterceptor,
200 grpc.aio.StreamUnaryClientInterceptor,
201 grpc.aio.StreamStreamClientInterceptor,
202):
203 def __init__(self, service_client: pytest_userver.client.Client, asyncexc_check: _AsyncExcCheck):
204 self._service_client = service_client
205 self._asyncexc_check = asyncexc_check
206
207 async def _before_call_hook(self) -> None:
208 self._asyncexc_check()
209 if hasattr(self._service_client, 'update_server_state'):
210 await self._service_client.update_server_state()
211
212 async def intercept_unary_unary(
213 self,
214 continuation: Callable[[grpc.aio.ClientCallDetails, Any], Awaitable[grpc.aio.UnaryUnaryCall]],
215 client_call_details: grpc.aio.ClientCallDetails,
216 request: Any,
217 ) -> grpc.aio.UnaryUnaryCall:
218 await self._before_call_hook()
219 try:
220 return await continuation(client_call_details, request)
221 finally:
222 self._asyncexc_check()
223
224 # Note: full type of this function is Callable[[...], Awaitable[AsyncIterator[Any]]]
225 async def intercept_unary_stream(
226 self,
227 continuation: Callable[[grpc.aio.ClientCallDetails, Any], grpc.aio.UnaryStreamCall],
228 client_call_details: grpc.aio.ClientCallDetails,
229 request: Any,
230 ) -> AsyncIterator[Any]:
231 await self._before_call_hook()
232 call = await continuation(client_call_details, request)
233
234 async def response_stream() -> AsyncIterator[Any]:
235 try:
236 async for response in call:
237 yield response
238 finally:
239 self._asyncexc_check()
240
241 return response_stream()
242
243 async def intercept_stream_unary(
244 self,
245 continuation: Callable[[grpc.aio.ClientCallDetails, AsyncIterator[Any]], Awaitable[grpc.aio.StreamUnaryCall]],
246 client_call_details: grpc.aio.ClientCallDetails,
247 request_iterator: AsyncIterator[Any],
248 ) -> grpc.aio.StreamUnaryCall:
249 await self._before_call_hook()
250 try:
251 return await continuation(client_call_details, request_iterator)
252 finally:
253 self._asyncexc_check()
254
255 # Note: full type of this function is Callable[[...], Awaitable[AsyncIterator[Any]]]
256 async def intercept_stream_stream(
257 self,
258 continuation: Callable[[grpc.aio.ClientCallDetails, AsyncIterator[Any]], grpc.aio.StreamStreamCall],
259 client_call_details: grpc.aio.ClientCallDetails,
260 request_iterator: AsyncIterator[Any],
261 ) -> AsyncIterator[Any]:
262 self._asyncexc_check()
263 await self._before_call_hook()
264 call = await continuation(client_call_details, request_iterator)
265
266 async def response_stream() -> AsyncIterator[Any]:
267 try:
268 async for response in call:
269 yield response
270 finally:
271 self._asyncexc_check()
272
273 return response_stream()
274
275
276def pytest_grpc_client_interceptors(request: pytest.FixtureRequest) -> Sequence[grpc.aio.ClientInterceptor]:
277 return [
278 _UpdateServerStateInterceptor(
279 request.getfixturevalue('service_client'),
280 request.getfixturevalue('asyncexc_check'),
281 ),
282 ]
283
284
285def _filter_interceptors(
286 interceptors: Sequence[grpc.aio.ClientInterceptor], desired_type: type[grpc.aio.ClientInterceptor]
287) -> list[grpc.aio.ClientInterceptor]:
288 return [interceptor for interceptor in interceptors if isinstance(interceptor, desired_type)]
289
290
291def _set_client_interceptors(channel: grpc.aio.Channel, interceptors: Sequence[grpc.aio.ClientInterceptor]) -> None:
292 """
293 Allows to set interceptors dynamically while reusing the same underlying channel,
294 which is something grpc-io currently doesn't support.
295
296 Also fixes the bug: multi-inheritance interceptors are only registered for first matching type
297 https://github.com/grpc/grpc/issues/31442
298 """
299 channel._unary_unary_interceptors = _filter_interceptors(interceptors, grpc.aio.UnaryUnaryClientInterceptor)
300 channel._unary_stream_interceptors = _filter_interceptors(interceptors, grpc.aio.UnaryStreamClientInterceptor)
301 channel._stream_unary_interceptors = _filter_interceptors(interceptors, grpc.aio.StreamUnaryClientInterceptor)
302 channel._stream_stream_interceptors = _filter_interceptors(interceptors, grpc.aio.StreamStreamClientInterceptor)
303
304
305# @endcond