userver: /home/user/userver/testsuite/pytest_plugins/pytest_userver/plugins/grpc/client.py Source File
Loading...
Searching...
No Matches
client.py
1"""
2Make gRPC requests to the service.
3
4@sa @ref scripts/docs/en/userver/tutorial/grpc_service.md
5"""
6
7# pylint: disable=redefined-outer-name
8import asyncio
9from collections.abc import AsyncIterator
10from collections.abc import Awaitable
11from collections.abc import Callable
12from collections.abc import Generator
13from collections.abc import Sequence
14import itertools
15import pathlib
16import tempfile
17from typing import Any
18from typing import TypeAlias
19
20import grpc
21import grpc.aio
22import pytest
23from typing_extensions import override
24
27from . import _hookspec
28
29DEFAULT_TIMEOUT = 15.0
30USERVER_CONFIG_HOOKS = ['userver_config_grpc_endpoint']
31
32_AsyncExcCheck: TypeAlias = Callable[[], None]
33
34
35@pytest.fixture(scope='session')
37 """
38 Returns the gRPC port that should be used in service runner mode in case
39 no port is specified in the source config_yaml.
40
41 @ingroup userver_testsuite_fixtures
42 """
43 return 11080
44
45
46@pytest.fixture(scope='session')
47def grpc_service_endpoint(service_config) -> str:
48 """
49 Returns the gRPC endpoint of the service.
50 Used by @ref pytest_userver.plugins.grpc.client.grpc_channel "grpc_channel" fixture.
51
52 By default, gets the actual gRPC endpoint from `service_config`.
53 Override this fixture if you add gRPC server listening ports in a custom way.
54
55 @ingroup userver_testsuite_fixtures
56 """
57 components = service_config['components_manager']['components']
58 if 'grpc-server' not in components:
59 raise RuntimeError('No grpc-server component')
60 grpc_server = components['grpc-server']
61
62 if unix_socket_path := grpc_server.get('unix-socket-path'):
63 return f'unix:{unix_socket_path}'
64 if port := grpc_server.get('port'):
65 return f'localhost:{port}'
66 raise RuntimeError("No 'port' or 'unix-socket-path' found in 'grpc-server' component config")
67
68
69@pytest.fixture(scope='session')
70def grpc_service_timeout(pytestconfig) -> float:
71 """
72 Returns the gRPC timeout for the service that is set by the command
73 line option `--service-timeout`.
74
75 Override this fixture to change the way the gRPC timeout
76 is set.
77
78 @ingroup userver_testsuite_fixtures
79 """
80 return float(pytestconfig.option.service_timeout) or DEFAULT_TIMEOUT
81
82
83@pytest.fixture(scope='session')
84async def grpc_session_channel(grpc_service_endpoint):
85 async with grpc.aio.insecure_channel(grpc_service_endpoint) as channel:
86 yield channel
87
88
89@pytest.fixture
90async def grpc_channel(
91 service_client, # For daemon setup and userver_client_cleanup
92 grpc_service_endpoint,
93 grpc_service_timeout,
94 grpc_session_channel,
95 request,
96):
97 """
98 Returns the gRPC channel configured by the parameters from the
99 @ref pytest_userver.plugins.grpc.client.grpc_service_endpoint "grpc_service_endpoint" fixture.
100
101 You can add interceptors to the channel by implementing the
102 @ref pytest_userver.plugins.grpc._hookspec.pytest_grpc_client_interceptors "pytest_grpc_client_interceptors"
103 hook in your pytest plugin or initial (root) conftest.
104
105 @ingroup userver_testsuite_fixtures
106 """
107 interceptors = request.config.hook.pytest_grpc_client_interceptors(request=request)
108 interceptors_list = list(itertools.chain.from_iterable(interceptors))
109 # Sanity check: we have at least one "builtin" interceptor.
110 assert len(interceptors_list) != 0
111
112 try:
113 await asyncio.wait_for(
114 grpc_session_channel.channel_ready(),
115 timeout=grpc_service_timeout,
116 )
117 except asyncio.TimeoutError:
118 raise RuntimeError(
119 f'Failed to connect to remote gRPC server by address {grpc_service_endpoint}',
120 )
121
122 _set_client_interceptors(grpc_session_channel, interceptors_list)
123 try:
124 yield grpc_session_channel
125 finally:
126 _set_client_interceptors(grpc_session_channel, [])
127
128
129@pytest.fixture(scope='session')
130def grpc_socket_path() -> Generator[pathlib.Path, None, None]:
131 """
132 Path for the UNIX socket over which testsuite will talk to the gRPC service, if it chooses to use a UNIX socket.
133
134 @see pytest_userver.plugins.grpc.client.userver_config_grpc_endpoint "userver_config_grpc_endpoint"
135
136 @ingroup userver_testsuite_fixtures
137 """
138 # Path must be as short as possible due to 108 character limitation.
139 # 'service_tempdir', for example, is typically too long.
140 with tempfile.TemporaryDirectory(prefix='userver-grpc-socket-') as name:
141 yield pathlib.Path(name) / 'grpc.sock'
142
143
144@pytest.fixture(scope='session')
146 pytestconfig,
147 grpc_service_port_fallback,
148 substitute_config_vars,
149 request,
150 choose_free_port,
151):
152 """
153 Returns a function that adjusts the static config for testsuite.
154
155 * if the original service config specifies `grpc-server.port`, and that port is taken,
156 then adjusts it to a free port;
157 * if the original service config specifies `grpc-server.unix-socket-path`,
158 then adjusts it to a tmp path
159 (see @ref pytest_userver.plugins.grpc.client.grpc_socket_path "grpc_socket_path");
160 * in service runner mode, uses the original grpc port from config or
161 @ref pytest_userver.plugins.grpc.client.grpc_service_port_fallback "grpc_service_port_fallback".
162
163 Override this fixture to change the way `grpc-server` endpoint config is patched for tests.
164
165 @ingroup userver_testsuite_fixtures
166 """
167
168 def patch_config(config_yaml, config_vars):
169 components = config_yaml['components_manager']['components']
170 grpc_server = components.get('grpc-server', None)
171 if not grpc_server:
172 return
173
174 original_grpc_server = substitute_config_vars(grpc_server, config_vars)
175
176 if pytestconfig.option.service_runner_mode:
177 grpc_server.pop('unix-socket-path', None)
178 if 'port' not in original_grpc_server:
179 grpc_server['port'] = grpc_service_port_fallback
180 config_vars['grpc_server_port'] = grpc_service_port_fallback
181 elif 'unix-socket-path' in original_grpc_server:
182 grpc_server.pop('port', None)
183 grpc_socket_path = request.getfixturevalue('grpc_socket_path')
184 grpc_server['unix-socket-path'] = str(grpc_socket_path)
185 else:
186 grpc_server.pop('unix-socket-path', None)
187 grpc_server['port'] = choose_free_port(original_grpc_server.get('port', None))
188
189 return patch_config
190
191
192# @cond
193
194
195def pytest_addhooks(pluginmanager: pytest.PytestPluginManager):
196 pluginmanager.add_hookspecs(_hookspec)
197
198
199class _AsyncExcCheckInterceptor(
200 grpc.aio.UnaryUnaryClientInterceptor,
201 grpc.aio.UnaryStreamClientInterceptor,
202 grpc.aio.StreamUnaryClientInterceptor,
203 grpc.aio.StreamStreamClientInterceptor,
204):
205 def __init__(self, asyncexc_check: _AsyncExcCheck):
206 self._asyncexc_check = asyncexc_check
207
208 @override
209 async def intercept_unary_unary(
210 self,
211 continuation: Callable[[grpc.aio.ClientCallDetails, Any], Awaitable[grpc.aio.UnaryUnaryCall]],
212 client_call_details: grpc.aio.ClientCallDetails,
213 request: Any,
214 ) -> grpc.aio.UnaryUnaryCall:
215 self._asyncexc_check()
216 try:
217 return await continuation(client_call_details, request)
218 finally:
219 self._asyncexc_check()
220
221 # Note: full type of this function is Callable[[...], Awaitable[AsyncIterator[Any]]]
222 @override
223 async def intercept_unary_stream(
224 self,
225 continuation: Callable[[grpc.aio.ClientCallDetails, Any], grpc.aio.UnaryStreamCall],
226 client_call_details: grpc.aio.ClientCallDetails,
227 request: Any,
228 ) -> AsyncIterator[Any]:
229 self._asyncexc_check()
230 call = await continuation(client_call_details, request)
231
232 async def response_stream() -> AsyncIterator[Any]:
233 try:
234 async for response in call:
235 yield response
236 finally:
237 self._asyncexc_check()
238
239 return response_stream()
240
241 @override
242 async def intercept_stream_unary(
243 self,
244 continuation: Callable[[grpc.aio.ClientCallDetails, AsyncIterator[Any]], Awaitable[grpc.aio.StreamUnaryCall]],
245 client_call_details: grpc.aio.ClientCallDetails,
246 request_iterator: AsyncIterator[Any],
247 ) -> grpc.aio.StreamUnaryCall:
248 self._asyncexc_check()
249 try:
250 return await continuation(client_call_details, request_iterator)
251 finally:
252 self._asyncexc_check()
253
254 # Note: full type of this function is Callable[[...], Awaitable[AsyncIterator[Any]]]
255 @override
256 async def intercept_stream_stream(
257 self,
258 continuation: Callable[[grpc.aio.ClientCallDetails, AsyncIterator[Any]], grpc.aio.StreamStreamCall],
259 client_call_details: grpc.aio.ClientCallDetails,
260 request_iterator: AsyncIterator[Any],
261 ) -> AsyncIterator[Any]:
262 self._asyncexc_check()
263 call = await continuation(client_call_details, request_iterator)
264
265 async def response_stream() -> AsyncIterator[Any]:
266 try:
267 async for response in call:
268 yield response
269 finally:
270 self._asyncexc_check()
271
272 return response_stream()
273
274
275class _UpdateServerStateInterceptor(pytest_userver.grpc.PreCallClientInterceptor):
276 def __init__(self, service_client: pytest_userver.client.Client):
277 self._service_client = service_client
278
279 @override
280 async def pre_call_hook(self, client_call_details: grpc.aio.ClientCallDetails) -> None:
281 if hasattr(self._service_client, 'update_server_state'):
282 await self._service_client.update_server_state()
283
284
285def pytest_grpc_client_interceptors(request: pytest.FixtureRequest) -> Sequence[grpc.aio.ClientInterceptor]:
286 return [
287 _AsyncExcCheckInterceptor(request.getfixturevalue('asyncexc_check')),
288 _UpdateServerStateInterceptor(request.getfixturevalue('service_client')),
289 ]
290
291
292def _filter_interceptors(
293 interceptors: Sequence[grpc.aio.ClientInterceptor], desired_type: type[grpc.aio.ClientInterceptor]
294) -> list[grpc.aio.ClientInterceptor]:
295 return [interceptor for interceptor in interceptors if isinstance(interceptor, desired_type)]
296
297
298def _set_client_interceptors(channel: grpc.aio.Channel, interceptors: Sequence[grpc.aio.ClientInterceptor]) -> None:
299 """
300 Allows to set interceptors dynamically while reusing the same underlying channel,
301 which is something grpc-io currently doesn't support.
302
303 Also fixes the bug: multi-inheritance interceptors are only registered for first matching type
304 https://github.com/grpc/grpc/issues/31442
305 """
306 channel._unary_unary_interceptors = _filter_interceptors(interceptors, grpc.aio.UnaryUnaryClientInterceptor)
307 channel._unary_stream_interceptors = _filter_interceptors(interceptors, grpc.aio.UnaryStreamClientInterceptor)
308 channel._stream_unary_interceptors = _filter_interceptors(interceptors, grpc.aio.StreamUnaryClientInterceptor)
309 channel._stream_stream_interceptors = _filter_interceptors(interceptors, grpc.aio.StreamStreamClientInterceptor)
310
311
312# @endcond