294 Creates the gRPC mock server for the provided type.
296 @deprecated Please use @ref pytest_userver.plugins.grpc.mockserver.grpc_mockserver_new "grpc_mockserver_new"
297 instead. Some time soon, `create_grpc_mock` will be removed.
299 @ingroup userver_testsuite_fixtures
301 return _create_servicer_mock
307def pytest_addoption(parser):
308 group = parser.getgroup(
'grpc-mockserver')
310 '--grpc-mockserver-host',
312 help=
'gRPC mockserver hostname, default is [::]',
315 '--grpc-mockserver-port',
318 help=
'gRPC mockserver port, by default random port is used',
322def _raise_unimplemented_error(context: grpc.aio.ServicerContext, full_rpc_name: str) -> NoReturn:
323 message = f
'Trying to call grpc_mockserver method "{full_rpc_name}", which is not mocked'
324 context.set_code(grpc.StatusCode.UNIMPLEMENTED)
325 context.set_details(message)
326 raise NotImplementedError(message)
329def _wrap_grpc_method(
330 python_method_name: str,
332 default_unimplemented_method: Callable,
333 response_streaming: bool,
334 mocked_methods: Dict[str, Callable],
336 if response_streaming:
338 @functools.wraps(default_unimplemented_method)
339 async def run_stream_response_method(self, request_or_stream, context: grpc.aio.ServicerContext):
340 method = mocked_methods.get(python_method_name)
342 _raise_unimplemented_error(context, full_rpc_name)
344 async for response
in await method(request_or_stream, context):
347 return run_stream_response_method
350 @functools.wraps(default_unimplemented_method)
351 async def run_unary_response_method(self, request_or_stream, context: grpc.aio.ServicerContext):
352 method = mocked_methods.get(python_method_name)
354 _raise_unimplemented_error(context, full_rpc_name)
356 response = method(request_or_stream, context)
357 if inspect.isawaitable(response):
358 return await response
362 return run_unary_response_method
365def _check_is_servicer_class(servicer_class: type) ->
None:
366 if not isinstance(servicer_class, type):
367 raise ValueError(f
'Expected *Servicer class (type), got: {servicer_class} ({type(servicer_class)})')
368 if not servicer_class.__name__.endswith(
'Servicer'):
369 raise ValueError(f
'Expected *Servicer class, got: {servicer_class}')
372def _create_servicer_mock(
373 servicer_class: type,
376 stream_method_names: Optional[List[str]] =
None,
378 _check_is_servicer_class(servicer_class)
379 reflection = _reflect_servicer(servicer_class)
382 service_name, _ = _split_rpc_name(next(iter(reflection.values())).name)
384 service_name =
'UNKNOWN'
385 adder = getattr(inspect.getmodule(servicer_class), f
'add_{servicer_class.__name__}_to_server')
387 mocked_methods: Dict[str, Callable] = {}
390 for attname, value
in servicer_class.__dict__.items():
392 methods[attname] = _wrap_grpc_method(
393 python_method_name=attname,
394 full_rpc_name=reflection[attname].name,
395 default_unimplemented_method=value,
396 response_streaming=reflection[attname].response_streaming,
397 mocked_methods=mocked_methods,
399 mocked_servicer_class = type(
400 f
'Mock{servicer_class.__name__}',
404 servicer = mocked_servicer_class()
406 return GrpcServiceMock(
409 service_name=service_name,
410 known_methods=frozenset(methods),
411 mocked_methods=mocked_methods,
415@dataclasses.dataclass(frozen=True)
418 request_streaming: bool
419 response_streaming: bool
423 def unary_unary(self, name, *args, **kwargs):
424 return _MethodInfo(name=name, request_streaming=
False, response_streaming=
False)
426 def unary_stream(self, name, *args, **kwargs):
427 return _MethodInfo(name=name, request_streaming=
False, response_streaming=
True)
429 def stream_unary(self, name, *args, **kwargs):
430 return _MethodInfo(name=name, request_streaming=
True, response_streaming=
False)
432 def stream_stream(self, name, *args, **kwargs):
433 return _MethodInfo(name=name, request_streaming=
True, response_streaming=
True)
436def _reflect_servicer(servicer_class: type) -> Mapping[str, _MethodInfo]:
437 service_name = servicer_class.__name__.removesuffix(
'Servicer')
438 stub_class = getattr(inspect.getmodule(servicer_class), f
'{service_name}Stub')
439 return _reflect_stub(stub_class)
442def _reflect_stub(stub_class: type) -> Mapping[str, _MethodInfo]:
444 return stub_class(_FakeChannel()).__dict__
447def _split_rpc_name(rpc_name: str) -> Tuple[str, str]:
448 normalized = rpc_name.removeprefix(
'/')
449 results = normalized.split(
'/')
450 if len(results) != 2:
452 f
'Invalid RPC name: "{rpc_name}". RPC name must be of the form "with.package.ServiceName/MethodName"'
457def _get_class_from_method(method) -> type:
459 assert inspect.isfunction(method),
'Expected an unbound method: foo(ClassName.MethodName)'
460 class_name = method.__qualname__.split(
'.<locals>', 1)[0].rsplit(
'.', 1)[0]
462 cls = getattr(inspect.getmodule(method), class_name)
463 except AttributeError:
464 cls = method.__globals__.get(class_name)
465 assert isinstance(cls, type)