userver: /data/code/userver/testsuite/pytest_plugins/pytest_userver/plugins/grpc/mockserver.py Source File
Loading...
Searching...
No Matches
mockserver.py
1"""
2Mocks for the gRPC servers.
3
4@sa @ref scripts/docs/en/userver/tutorial/grpc_service.md
5"""
6
7# pylint: disable=no-member
8import asyncio
9import contextlib
10import functools
11from typing import Callable
12from typing import List
13from typing import Optional
14
15import grpc
16import pytest
17
18from testsuite.utils import callinfo
19
20DEFAULT_PORT = 8091
21
22
24 def __init__(self, servicer, methods):
25 self.servicer = servicer
26 self._known_methods = methods
27 self._methods = {}
28
29 def get(self, method, default):
30 return self._methods.get(method, default)
31
32 def reset_handlers(self):
33 self._methods = {}
34
35 @contextlib.contextmanager
36 def mock(self):
37 try:
38 yield self.install_handler
39 finally:
40 self._methods = {}
41
42 def install_handler(self, method: str):
43 def decorator(func):
44 if method not in self._known_methods:
45 raise RuntimeError(
46 f'Trying to mock unknown grpc method {method}',
47 )
48
49 wrapped = callinfo.acallqueue(func)
50 self._methods[method] = wrapped
51 return wrapped
52
53 return decorator
54
55
56@pytest.fixture(scope='session')
57def grpc_mockserver_endpoint(pytestconfig, get_free_port) -> str:
58 """
59 Returns the gRPC endpoint to start the mocking server that is set by
60 command line `--grpc-mockserver-host` and `--grpc-mockserver-port` options.
61
62 For port 0, picks some free port.
63
64 Override this fixture to customize the endpoint used by gRPC mockserver.
65
66 @snippet samples/grpc_service/tests/conftest.py Prepare configs
67 @ingroup userver_testsuite_fixtures
68 """
69 port = pytestconfig.option.grpc_mockserver_port
70 if pytestconfig.option.service_wait or pytestconfig.option.service_disable:
71 port = port or DEFAULT_PORT
72 if port == 0:
73 port = get_free_port()
74 return f'{pytestconfig.option.grpc_mockserver_host}:{port}'
75
76
77@pytest.fixture(scope='session')
78async def grpc_mockserver(grpc_mockserver_endpoint) -> grpc.aio.Server:
79 """
80 Returns the gRPC mocking server.
81
82 @snippet samples/grpc_service/tests/conftest.py Prepare server mock
83 @ingroup userver_testsuite_fixtures
84 """
85 server = grpc.aio.server()
86 server.add_insecure_port(grpc_mockserver_endpoint)
87 await server.start()
88
89 try:
90 yield server
91 finally:
92
93 async def stop_server():
94 await server.stop(grace=None)
95 await server.wait_for_termination()
96
97 stop_server_task = asyncio.shield(asyncio.create_task(stop_server()))
98 # asyncio.shield does not protect our await from cancellations, and we
99 # really need to wait for the server stopping before continuing.
100 try:
101 await stop_server_task
102 except asyncio.CancelledError:
103 await stop_server_task
104 # Propagate cancellation when we are done
105 raise
106
107
108@pytest.fixture(scope='session')
110 """
111 Creates the gRPC mock server for the provided type.
112
113 @snippet samples/grpc_service/tests/conftest.py Prepare server mock
114 @ingroup userver_testsuite_fixtures
115 """
116 return _create_servicer_mock
117
118
119def pytest_addoption(parser):
120 group = parser.getgroup('grpc-mockserver')
121 group.addoption(
122 '--grpc-mockserver-host',
123 default='[::]',
124 help='gRPC mockserver hostname, default is [::]',
125 )
126 group.addoption(
127 '--grpc-mockserver-port',
128 type=int,
129 default=0,
130 help='gRPC mockserver port, by default random port is used',
131 )
132
133
134def _create_servicer_mock(
135 servicer_class: type,
136 *,
137 stream_method_names: Optional[List[str]] = None,
138 before_call_hook: Optional[Callable[..., None]] = None,
139) -> GrpcServiceMock:
140 def wrap_grpc_method(name, default_method, is_stream):
141 @functools.wraps(default_method)
142 async def run_method(self, *args, **kwargs):
143 if before_call_hook is not None:
144 before_call_hook(*args, **kwargs)
145
146 method = mock.get(name, None)
147 if method is not None:
148 return await method(*args, **kwargs)
149 else:
150 return default_method(self, *args, **kwargs)
151
152 @functools.wraps(default_method)
153 async def run_stream_method(self, *args, **kwargs):
154 if before_call_hook is not None:
155 before_call_hook(*args, **kwargs)
156
157 method = mock.get(name, None)
158 async for response in await method(*args, **kwargs):
159 yield response
160
161 return run_stream_method if is_stream else run_method
162
163 methods = {}
164 for attname, value in servicer_class.__dict__.items():
165 if callable(value):
166 methods[attname] = wrap_grpc_method(
167 name=attname,
168 default_method=value,
169 is_stream=attname in (stream_method_names or []),
170 )
171
172 mocked_servicer_class = type(
173 f'Mock{servicer_class.__name__}', (servicer_class,), methods,
174 )
175 servicer = mocked_servicer_class()
176 mock = GrpcServiceMock(servicer, frozenset(methods))
177 return mock