userver: /data/code/userver/testsuite/pytest_plugins/pytest_userver/grpc/__init__.py Source File
Loading...
Searching...
No Matches
__init__.py
1"""
2Mocks for the gRPC servers, a.k.a. `pytest_userver.grpc.
3
4@sa @ref scripts/docs/en/userver/tutorial/grpc_service.md
5@sa @ref pytest_userver.plugins.grpc.mockserver
6"""
7
8from __future__ import annotations
9
10import asyncio
11from collections.abc import Callable
12import contextlib
13import inspect
14from typing import Any
15from typing import TypeAlias
16
17import grpc
18
19import testsuite.utils.callinfo
20
21from ._mocked_errors import MockedError # noqa: F401
22from ._mocked_errors import NetworkError # noqa: F401
23from ._mocked_errors import TimeoutError # noqa: F401
24from ._servicer_mock import _check_is_servicer_class
25from ._servicer_mock import _create_servicer_mock
26from ._servicer_mock import _ServiceMock
27
28Handler: TypeAlias = Callable[[Any, grpc.aio.ServicerContext], Any]
29MockDecorator: TypeAlias = Callable[[Handler], testsuite.utils.callinfo.AsyncCallQueue]
30AsyncExcAppend: TypeAlias = Callable[[Exception], None]
31
32
34 """
35 Allows to install mocks that are kept active between tests, see
36 @ref pytest_userver.plugins.grpc.mockserver.grpc_mockserver_session "grpc_mockserver_session".
37
38 @warning This is a sharp knife, use with caution! For most use-cases, prefer
39 @ref pytest_userver.plugins.grpc.mockserver.grpc_mockserver "grpc_mockserver" instead.
40 """
41
42 def __init__(self, *, server: grpc.aio.Server, experimental: bool = False) -> None:
43 """
44 @warning This initializer is an **experimental API**, likely to break in the future. Consider using
45 @ref pytest_userver.plugins.grpc.mockserver.grpc_mockserver_session "grpc_mockserver_session" instead.
46
47 Initializes `MockserverSession`. Takes ownership of `server`.
48 To properly start and stop the server, use `MockserverSession` as an async contextmanager:
49
50 @code{.py}
51 async with pytest_userver.grpc.mockserver.MockserverSession(server=server) as mockserver:
52 # ...
53 @endcode
54
55 @note `MockserverSession` is usually obtained from
56 @ref pytest_userver.plugins.grpc.mockserver.grpc_mockserver_session "grpc_mockserver_session".
57 """
58 assert experimental
59 self._server = server
60 self._auto_service_mocks: dict[type, _ServiceMock] = {}
61 self._asyncexc_append: AsyncExcAppend | None = None
62
63 def _get_auto_service_mock(self, servicer_class: type, /) -> _ServiceMock:
64 existing_mock = self._auto_service_mocks.get(servicer_class)
65 if existing_mock:
66 return existing_mock
67 new_mock = _create_servicer_mock(servicer_class)
68 new_mock.set_asyncexc_append(self._asyncexc_append)
69 self._auto_service_mocks[servicer_class] = new_mock
70 new_mock.add_to_server(self._server)
71 return new_mock
72
73 def _set_asyncexc_append(self, asyncexc_append: AsyncExcAppend | None, /) -> None:
74 self._asyncexc_append = asyncexc_append
75 for mock in self._auto_service_mocks.values():
76 mock.set_asyncexc_append(asyncexc_append)
77
78 def reset_mocks(self) -> None:
79 """
80 @brief Removes all mocks for this mockserver that have been installed using
81 `MockserverSession` or @ref pytest_userver.grpc.Mockserver "Mockserver" API.
82 @note Mocks installed manually using @ref MockserverSession.server will not be removed, though.
83 """
84 for mock in self._auto_service_mocks.values():
85 mock.reset_handlers()
86
87 @contextlib.contextmanager
88 def asyncexc_append_scope(self, asyncexc_append: AsyncExcAppend | None, /):
89 """
90 Sets testsuite's `asyncexc_append` for use in the returned scope.
91 """
92 self._set_asyncexc_append(asyncexc_append)
93 try:
94 yield
95 finally:
96 self._set_asyncexc_append(None)
97
98 @property
99 def server(self) -> grpc.aio.Server:
100 """
101 The underlying [grpc.aio.Server](https://grpc.github.io/grpc/python/grpc_asyncio.html#grpc.aio.Server).
102 """
103 return self._server
104
105 async def start(self) -> None:
106 """
107 Starts the server.
108 @note Prefer starting mockserver using the async contextmanager syntax if possible.
109 """
110 await self._server.start()
111
112 async def stop(self) -> None:
113 """
114 Stops the server properly. Prefer this method to stopping `server` manually.
115 """
116 await _stop_server(self._server)
117
118 async def __aenter__(self) -> MockserverSession:
119 await self.start()
120 return self
121
122 async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
123 await self.stop()
124
125
127 """
128 Allows to install mocks that are reset between tests, see
129 @ref pytest_userver.plugins.grpc.mockserver.grpc_mockserver "grpc_mockserver".
130 """
131
132 def __init__(self, *, mockserver_session: MockserverSession, experimental: bool = False) -> None:
133 """
134 @warning This initializer is an **experimental API**, likely to break in the future. Consider using
135 @ref pytest_userver.plugins.grpc.mockserver.grpc_mockserver "grpc_mockserver" instead.
136
137 Initializes Mockserver.
138
139 @note `Mockserver` is usually obtained from
140 @ref pytest_userver.plugins.grpc.mockserver.grpc_mockserver "grpc_mockserver".
141 """
142 assert experimental
143 self._mockserver_session = mockserver_session
144
145 def __call__(self, servicer_method, /) -> MockDecorator:
146 """
147 Returns a decorator to mock the specified gRPC service method implementation.
148
149 Example:
150
151 @snippet samples/grpc_service/testsuite/test_grpc.py Prepare modules
152 @snippet samples/grpc_service/testsuite/test_grpc.py grpc client test
153 """
154 servicer_class = _get_class_from_method(servicer_method)
155 mock = self._mockserver_session._get_auto_service_mock(servicer_class) # pylint: disable=protected-access
156 return mock.install_handler(servicer_method.__name__)
157
158 def mock_factory(self, servicer_class, /) -> Callable[[str], MockDecorator]:
159 """
160 Allows to create a fixture as a shorthand for mocking methods of the specified gRPC service.
161
162 Example:
163
164 @snippet grpc/functional_tests/metrics/tests/conftest.py Prepare modules
165 @snippet grpc/functional_tests/metrics/tests/conftest.py Prepare server mock
166 @snippet grpc/functional_tests/metrics/tests/test_metrics.py grpc client test
167 """
168
169 def factory(method_name):
170 method = getattr(servicer_class, method_name, None)
171 if method is None:
172 raise ValueError(f'No method "{method_name}" in servicer class "{servicer_class.__name__}"')
173 return self(method)
174
175 _check_is_servicer_class(servicer_class)
176 return factory
177
178 def install_servicer(self, servicer: object, /) -> None:
179 """
180 Installs as a mock `servicer`, the class of which should inherit from a generated `*Servicer` class.
181
182 For example, @ref grpc/functional_tests/basic_chaos/tests-grpcclient/service.py "this servicer class"
183 can be installed as follows:
184
185 @snippet grpc/functional_tests/basic_chaos/tests-grpcclient/conftest.py installing mockserver servicer
186
187 @note Inheritance from multiple `*Servicer` classes at once is allowed.
188
189 @example grpc/functional_tests/basic_chaos/tests-grpcclient/service.py
190 """
191 base_servicer_classes = [cls for cls in inspect.getmro(type(servicer))[1:] if cls.__name__.endswith('Servicer')]
192 if not base_servicer_classes:
193 raise ValueError(f"Given object's type ({type(servicer)}) is not inherited from any grpc *Servicer class")
194 for servicer_class in base_servicer_classes:
195 # pylint: disable=protected-access
196 mock = self._mockserver_session._get_auto_service_mock(servicer_class)
197 for python_method_name in mock.known_methods:
198 handler_func = getattr(servicer, python_method_name)
199 mock.install_handler(python_method_name)(handler_func)
200
201
202# @cond
203
204
205async def _stop_server(server: grpc.aio.Server, /) -> None:
206 async def stop_server():
207 await server.stop(grace=None)
208 await server.wait_for_termination()
209
210 stop_server_task = asyncio.shield(asyncio.create_task(stop_server()))
211 # asyncio.shield does not protect our await from cancellations, and we
212 # really need to wait for the server stopping before continuing.
213 try:
214 await stop_server_task
215 except asyncio.CancelledError:
216 await stop_server_task
217 # Propagate cancellation when we are done
218 raise
219
220
221def _get_class_from_method(method) -> type:
222 # https://stackoverflow.com/a/54597033/5173839
223 assert inspect.isfunction(method), f'Expected an unbound method: foo(ClassName.MethodName), got: {method}'
224 class_name = method.__qualname__.split('.<locals>', 1)[0].rsplit('.', 1)[0]
225 try:
226 cls = getattr(inspect.getmodule(method), class_name)
227 except AttributeError:
228 cls = method.__globals__.get(class_name)
229 assert isinstance(cls, type)
230 return cls
231
232
233# @endcond