userver: /data/code/userver/testsuite/pytest_plugins/pytest_userver/plugins/grpc/mockserver.py Source File
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages Concepts
mockserver.py
1"""
2Mocks for the gRPC servers.
3
4@sa @ref scripts/docs/en/userver/tutorial/grpc_service.md
5"""
6
7# pylint: disable=no-member
8import asyncio
9import contextlib
10import dataclasses
11import functools
12import inspect
13from typing import Callable
14from typing import Collection
15from typing import Dict
16from typing import List
17from typing import Mapping
18from typing import NoReturn
19from typing import Optional
20from typing import Tuple
21
22import grpc
23import pytest
24from typing_extensions import Final # TODO use typing in Python 3.8
25
26from testsuite.utils import callinfo
27
28MockDecorator = Callable[[Callable], callinfo.AsyncCallQueue]
29
30
31# @cond
32
33
34DEFAULT_PORT = 8091
35
36USERVER_CONFIG_HOOKS = ['userver_config_grpc_mockserver']
37
38
39class GrpcServiceMock:
40 def __init__(
41 self,
42 *,
43 servicer: object,
44 adder: Callable,
45 service_name: str,
46 known_methods: Collection[str],
47 mocked_methods: Dict[str, Callable],
48 ) -> None:
49 self._servicer = servicer
50 self._adder = adder
51 self._service_name = service_name
52 self._known_methods = known_methods
53 # Note: methods of `servicer` look into `mocked_methods` for implementations.
54 # (Specifically into the instance our constructor is given.)
55 self._methods: Final[Dict[str, Callable]] = mocked_methods
56
57 @property
58 def servicer(self) -> object:
59 return self._servicer
60
61 @property
62 def service_name(self) -> str:
63 return self._service_name
64
65 def get(self, method_name, default, /) -> Callable:
66 return self._methods.get(method_name, default)
67
68 def add_to_server(self, server: grpc.aio.Server) -> None:
69 self._adder(self._servicer, server)
70
71 def reset_handlers(self) -> None:
72 self._methods.clear()
73
74 @contextlib.contextmanager
75 def mock(self):
76 try:
77 yield self.install_handler
78 finally:
79 self._methods.clear()
80
81 def install_handler(self, method_name: str, /) -> MockDecorator:
82 def decorator(func):
83 if method_name not in self._known_methods:
84 raise RuntimeError(
85 f'Trying to mock unknown grpc method {method_name} in service {self._service_name}',
86 )
87
88 wrapped = callinfo.acallqueue(func)
89 self._methods[method_name] = wrapped
90 return wrapped
91
92 return decorator
93
94
95# @endcond
96
97
98@pytest.fixture(scope='session')
99def grpc_mockserver_endpoint(pytestconfig, get_free_port) -> str:
100 """
101 Returns the gRPC endpoint to start the mocking server that is set by
102 command line `--grpc-mockserver-host` and `--grpc-mockserver-port` options.
103
104 For port 0, picks some free port.
105
106 Override this fixture to customize the endpoint used by gRPC mockserver.
107
108 @snippet samples/grpc_service/tests/conftest.py Prepare configs
109 @ingroup userver_testsuite_fixtures
110 """
111 port = pytestconfig.option.grpc_mockserver_port
112 if pytestconfig.option.service_wait or pytestconfig.option.service_disable:
113 port = port or DEFAULT_PORT
114 if port == 0:
115 port = get_free_port()
116 return f'{pytestconfig.option.grpc_mockserver_host}:{port}'
117
118
120 """
121 Allows to install mocks that are kept active between tests, see
122 @ref pytest_userver.plugins.grpc.mockserver.grpc_mockserver_session "grpc_mockserver_session".
123
124 @warning This is a sharp knife, use with caution! For most use-cases, prefer
125 @ref pytest_userver.plugins.grpc.mockserver.grpc_mockserver_new "grpc_mockserver_new" instead.
126 """
127
128 # @cond
129 def __init__(self, *, _server: grpc.aio.Server) -> None:
130 self._server = _server
131 self._auto_service_mocks: Dict[type, GrpcServiceMock] = {}
132
133 def get_auto_service_mock(self, servicer_class: type, /) -> GrpcServiceMock:
134 existing_mock = self._auto_service_mocks.get(servicer_class)
135 if existing_mock:
136 return existing_mock
137 new_mock = _create_servicer_mock(servicer_class)
138 self._auto_service_mocks[servicer_class] = new_mock
139 new_mock.add_to_server(self._server)
140 return new_mock
141
142 def reset_auto_service_mocks(self) -> None:
143 for mock in self._auto_service_mocks.values():
144 mock.reset_handlers()
145
146 # @endcond
147
148 @property
149 def server(self) -> grpc.aio.Server:
150 """
151 The underlying [grpc.aio.Server](https://grpc.github.io/grpc/python/grpc_asyncio.html#grpc.aio.Server).
152 """
153 return self._server
154
155
156@pytest.fixture(scope='session')
157async def grpc_mockserver_session(grpc_mockserver_endpoint) -> GrpcMockserverSession:
158 """
159 Returns the gRPC mocking server.
160
161 @warning This is a sharp knife, use with caution! For most use-cases, prefer
162 @ref pytest_userver.plugins.grpc.mockserver.grpc_mockserver_new "grpc_mockserver_new" instead.
163
164 @ingroup userver_testsuite_fixtures
165 """
166 server = grpc.aio.server()
167 server.add_insecure_port(grpc_mockserver_endpoint)
168 await server.start()
169
170 try:
171 yield GrpcMockserverSession(_server=server)
172 finally:
173
174 async def stop_server():
175 await server.stop(grace=None)
176 await server.wait_for_termination()
177
178 stop_server_task = asyncio.shield(asyncio.create_task(stop_server()))
179 # asyncio.shield does not protect our await from cancellations, and we
180 # really need to wait for the server stopping before continuing.
181 try:
182 await stop_server_task
183 except asyncio.CancelledError:
184 await stop_server_task
185 # Propagate cancellation when we are done
186 raise
187
188
190 """
191 Allows to install mocks that are reset between tests, see
192 @ref pytest_userver.plugins.grpc.mockserver.grpc_mockserver_new "grpc_mockserver_new".
193 """
194
195 # @cond
196 def __init__(self, *, _mockserver_session: GrpcMockserverSession) -> None:
197 self._mockserver_session = _mockserver_session
198
199 # @endcond
200
201 def __call__(self, servicer_method, /) -> MockDecorator:
202 """
203 Returns a decorator to mock the specified gRPC service method implementation.
204
205 Example:
206
207 @snippet samples/grpc_service/testsuite/test_grpc.py Prepare modules
208 @snippet samples/grpc_service/testsuite/test_grpc.py grpc client test
209 """
210 servicer_class = _get_class_from_method(servicer_method)
211 mock = self._mockserver_session.get_auto_service_mock(servicer_class)
212 return mock.install_handler(servicer_method.__name__)
213
214 def mock_factory(self, servicer_class, /) -> Callable[[str], MockDecorator]:
215 """
216 Allows to create a fixture as a shorthand for mocking methods of the specified gRPC service.
217
218 Example:
219
220 @snippet grpc/functional_tests/metrics/tests/conftest.py Prepare modules
221 @snippet grpc/functional_tests/metrics/tests/conftest.py Prepare server mock
222 @snippet grpc/functional_tests/metrics/tests/test_metrics.py grpc client test
223 """
224
225 def factory(method_name):
226 method = getattr(servicer_class, method_name, None)
227 if method is None:
228 raise ValueError(f'No method "{method_name}" in servicer class "{servicer_class.__name__}"')
229 return self(method)
230
231 _check_is_servicer_class(servicer_class)
232 return factory
233
234
235@pytest.fixture
236def grpc_mockserver_new(grpc_mockserver_session) -> GrpcMockserver:
237 """
238 Returns the gRPC mocking server.
239 In order for gRPC clients in your service to work, mock handlers need to be installed for them using this fixture.
240
241 Example:
242
243 @snippet samples/grpc_service/testsuite/test_grpc.py Prepare modules
244 @snippet samples/grpc_service/testsuite/test_grpc.py grpc client test
245
246 Alternatively, you can create a shorthand for mocking frequently-used services:
247
248 Mocks are only active within tests after their respective handler functions are created, not between tests.
249 If you need the service needs the mock during startup, add the fixture that defines your mock to
250 @ref pytest_userver.plugins.service.extra_client_deps "extra_client_deps".
251
252 @ingroup userver_testsuite_fixtures
253 """
254 try:
255 yield GrpcMockserver(_mockserver_session=grpc_mockserver_session)
256 finally:
257 grpc_mockserver_session.reset_auto_service_mocks()
258
259
260@pytest.fixture(scope='session')
261def userver_config_grpc_mockserver(grpc_mockserver_endpoint):
262 """
263 Returns a function that adjusts the static config for testsuite.
264 Walks through config_vars *values* equal to `$grpc_mockserver`,
265 and replaces them with @ref grpc_mockserver_endpoint.
266
267 @ingroup userver_testsuite_fixtures
268 """
269
270 def patch_config(_config_yaml, config_vars):
271 for name in config_vars:
272 if config_vars[name] == '$grpc_mockserver':
273 config_vars[name] = grpc_mockserver_endpoint
274
275 return patch_config
276
277
278@pytest.fixture(scope='session')
279def grpc_mockserver(grpc_mockserver_session) -> grpc.aio.Server:
280 """
281 Returns the gRPC mocking server.
282
283 @deprecated Please use @ref pytest_userver.plugins.grpc.mockserver.grpc_mockserver_new "grpc_mockserver_new"
284 instead. Some time soon, `grpc_mockserver` will be removed.
285
286 @ingroup userver_testsuite_fixtures
287 """
288 return grpc_mockserver_session.server
289
290
291@pytest.fixture(scope='session')
293 """
294 Creates the gRPC mock server for the provided type.
295
296 @deprecated Please use @ref pytest_userver.plugins.grpc.mockserver.grpc_mockserver_new "grpc_mockserver_new"
297 instead. Some time soon, `create_grpc_mock` will be removed.
298
299 @ingroup userver_testsuite_fixtures
300 """
301 return _create_servicer_mock
302
303
304# @cond
305
306
307def pytest_addoption(parser):
308 group = parser.getgroup('grpc-mockserver')
309 group.addoption(
310 '--grpc-mockserver-host',
311 default='[::]',
312 help='gRPC mockserver hostname, default is [::]',
313 )
314 group.addoption(
315 '--grpc-mockserver-port',
316 type=int,
317 default=0,
318 help='gRPC mockserver port, by default random port is used',
319 )
320
321
322def _raise_unimplemented_error(context: grpc.aio.ServicerContext, full_rpc_name: str) -> NoReturn:
323 message = f'Trying to call grpc_mockserver method "{full_rpc_name}", which is not mocked'
324 context.set_code(grpc.StatusCode.UNIMPLEMENTED)
325 context.set_details(message)
326 raise NotImplementedError(message)
327
328
329def _wrap_grpc_method(
330 python_method_name: str,
331 full_rpc_name: str,
332 default_unimplemented_method: Callable,
333 response_streaming: bool,
334 mocked_methods: Dict[str, Callable],
335):
336 if response_streaming:
337
338 @functools.wraps(default_unimplemented_method)
339 async def run_stream_response_method(self, request_or_stream, context: grpc.aio.ServicerContext):
340 method = mocked_methods.get(python_method_name)
341 if method is None:
342 _raise_unimplemented_error(context, full_rpc_name)
343
344 async for response in await method(request_or_stream, context):
345 yield response
346
347 return run_stream_response_method
348 else:
349
350 @functools.wraps(default_unimplemented_method)
351 async def run_unary_response_method(self, request_or_stream, context: grpc.aio.ServicerContext):
352 method = mocked_methods.get(python_method_name)
353 if method is None:
354 _raise_unimplemented_error(context, full_rpc_name)
355
356 response = method(request_or_stream, context)
357 if inspect.isawaitable(response):
358 return await response
359 else:
360 return response
361
362 return run_unary_response_method
363
364
365def _check_is_servicer_class(servicer_class: type) -> None:
366 if not isinstance(servicer_class, type):
367 raise ValueError(f'Expected *Servicer class (type), got: {servicer_class} ({type(servicer_class)})')
368 if not servicer_class.__name__.endswith('Servicer'):
369 raise ValueError(f'Expected *Servicer class, got: {servicer_class}')
370
371
372def _create_servicer_mock(
373 servicer_class: type,
374 *,
375 # TODO remove, these no longer do anything
376 stream_method_names: Optional[List[str]] = None,
377) -> GrpcServiceMock:
378 _check_is_servicer_class(servicer_class)
379 reflection = _reflect_servicer(servicer_class)
380
381 if reflection:
382 service_name, _ = _split_rpc_name(next(iter(reflection.values())).name)
383 else:
384 service_name = 'UNKNOWN'
385 adder = getattr(inspect.getmodule(servicer_class), f'add_{servicer_class.__name__}_to_server')
386
387 mocked_methods: Dict[str, Callable] = {}
388
389 methods = {}
390 for attname, value in servicer_class.__dict__.items():
391 if callable(value):
392 methods[attname] = _wrap_grpc_method(
393 python_method_name=attname,
394 full_rpc_name=reflection[attname].name,
395 default_unimplemented_method=value,
396 response_streaming=reflection[attname].response_streaming,
397 mocked_methods=mocked_methods,
398 )
399 mocked_servicer_class = type(
400 f'Mock{servicer_class.__name__}',
401 (servicer_class,),
402 methods,
403 )
404 servicer = mocked_servicer_class()
405
406 return GrpcServiceMock(
407 servicer=servicer,
408 adder=adder,
409 service_name=service_name,
410 known_methods=frozenset(methods),
411 mocked_methods=mocked_methods,
412 )
413
414
415@dataclasses.dataclass(frozen=True)
416class _MethodInfo:
417 name: str # in the format "/with.package.ServiceName/MethodName"
418 request_streaming: bool
419 response_streaming: bool
420
421
422class _FakeChannel:
423 def unary_unary(self, name, *args, **kwargs):
424 return _MethodInfo(name=name, request_streaming=False, response_streaming=False)
425
426 def unary_stream(self, name, *args, **kwargs):
427 return _MethodInfo(name=name, request_streaming=False, response_streaming=True)
428
429 def stream_unary(self, name, *args, **kwargs):
430 return _MethodInfo(name=name, request_streaming=True, response_streaming=False)
431
432 def stream_stream(self, name, *args, **kwargs):
433 return _MethodInfo(name=name, request_streaming=True, response_streaming=True)
434
435
436def _reflect_servicer(servicer_class: type) -> Mapping[str, _MethodInfo]:
437 service_name = servicer_class.__name__.removesuffix('Servicer')
438 stub_class = getattr(inspect.getmodule(servicer_class), f'{service_name}Stub')
439 return _reflect_stub(stub_class)
440
441
442def _reflect_stub(stub_class: type) -> Mapping[str, _MethodInfo]:
443 # HACK: relying on the implementation of generated *Stub classes.
444 return stub_class(_FakeChannel()).__dict__
445
446
447def _split_rpc_name(rpc_name: str) -> Tuple[str, str]:
448 normalized = rpc_name.removeprefix('/')
449 results = normalized.split('/')
450 if len(results) != 2:
451 raise ValueError(
452 f'Invalid RPC name: "{rpc_name}". RPC name must be of the form "with.package.ServiceName/MethodName"'
453 )
454 return results
455
456
457def _get_class_from_method(method) -> type:
458 # https://stackoverflow.com/a/54597033/5173839
459 assert inspect.isfunction(method), 'Expected an unbound method: foo(ClassName.MethodName)'
460 class_name = method.__qualname__.split('.<locals>', 1)[0].rsplit('.', 1)[0]
461 try:
462 cls = getattr(inspect.getmodule(method), class_name)
463 except AttributeError:
464 cls = method.__globals__.get(class_name)
465 assert isinstance(cls, type)
466 return cls
467
468
469# @endcond