userver: /data/code/userver/testsuite/pytest_plugins/pytest_userver/plugins/grpc/client.py Source File
Loading...
Searching...
No Matches
client.py
1"""
2Make gRPC requests to the service.
3
4@sa @ref scripts/docs/en/userver/tutorial/grpc_service.md
5"""
6
7# pylint: disable=no-member
8# pylint: disable=redefined-outer-name
9import asyncio
10from typing import Awaitable
11from typing import Callable
12from typing import Optional
13
14import grpc
15import pytest
16
17from pytest_userver import client
18
19DEFAULT_TIMEOUT = 15.0
20
21USERVER_CONFIG_HOOKS = ['userver_config_grpc_mockserver']
22
23
24@pytest.fixture(scope='session')
25def grpc_service_port(service_config) -> int:
26 """
27 Returns the gRPC listener port number of the service that is set in the
28 static configuration file.
29
30 Override this fixture to change the way the gRPC listener port number
31 is retrieved by the testsuite for tests.
32
33 @ingroup userver_testsuite_fixtures
34 """
35 components = service_config['components_manager']['components']
36 if 'grpc-server' not in components:
37 raise RuntimeError('No grpc-server component')
38 return int(components['grpc-server']['port'])
39
40
41@pytest.fixture(scope='session')
42def grpc_service_endpoint(service_config, grpc_service_port) -> str:
43 """
44 Returns the gRPC endpoint of the service.
45
46 Override this fixture to change the way the gRPC endpoint
47 is retrieved by the testsuite for tests.
48
49 @ingroup userver_testsuite_fixtures
50 """
51 components = service_config['components_manager']['components']
52 if 'grpc-server' not in components:
53 raise RuntimeError('No grpc-server component')
54 grpc_server_unix_socket = components['grpc-server'].get('unix-socket-path')
55 return (
56 f'unix:{grpc_server_unix_socket}'
57 if grpc_server_unix_socket is not None
58 else f'localhost:{grpc_service_port}'
59 )
60
61
62@pytest.fixture(scope='session')
63def grpc_service_timeout(pytestconfig) -> float:
64 """
65 Returns the gRPC timeout for the service that is set by the command
66 line option `--service-timeout`.
67
68 Override this fixture to change the way the gRPC timeout
69 is set.
70
71 @ingroup userver_testsuite_fixtures
72 """
73 return float(pytestconfig.option.service_timeout) or DEFAULT_TIMEOUT
74
75
76@pytest.fixture
78 service_client, _testsuite_client_config: client.TestsuiteClientConfig,
79) -> Callable[[grpc.aio.ClientCallDetails], Awaitable[None]]:
80 """
81 Returns the function that will be called in before each gRPC request,
82 client-side.
83
84 @ingroup userver_testsuite_fixtures
85 """
86
87 async def prepare(
88 _client_call_details: grpc.aio.ClientCallDetails, /,
89 ) -> None:
90 if isinstance(service_client, client.AiohttpClient):
91 await service_client.update_server_state()
92
93 return prepare
94
95
96@pytest.fixture(scope='session')
97async def grpc_session_channel(
98 grpc_service_endpoint, _grpc_channel_interceptor,
99):
100 async with grpc.aio.insecure_channel(
101 grpc_service_endpoint, interceptors=[_grpc_channel_interceptor],
102 ) as channel:
103 yield channel
104
105
106@pytest.fixture
107async def grpc_channel(
108 grpc_service_endpoint,
109 grpc_service_deps,
110 grpc_service_timeout,
111 grpc_session_channel,
112 _grpc_channel_interceptor,
113 grpc_client_prepare,
114):
115 """
116 Returns the gRPC channel configured by the parameters from the
117 @ref plugins.grpc.grpc_service_endpoint "grpc_service_endpoint" fixture.
118
119 @ingroup userver_testsuite_fixtures
120 """
121 _grpc_channel_interceptor.prepare_func = grpc_client_prepare
122 try:
123 await asyncio.wait_for(
124 grpc_session_channel.channel_ready(), timeout=grpc_service_timeout,
125 )
126 except asyncio.TimeoutError:
127 raise RuntimeError(
128 f'Failed to connect to remote gRPC server by '
129 f'address {grpc_service_endpoint}',
130 )
131 return grpc_session_channel
132
133
134@pytest.fixture
135def grpc_service_deps(service_client):
136 """
137 gRPC service dependencies hook. Feel free to override it.
138
139 @ingroup userver_testsuite_fixtures
140 """
141
142
143@pytest.fixture(scope='session')
144def userver_config_grpc_mockserver(grpc_mockserver_endpoint):
145 """
146 Returns a function that adjusts the static config for testsuite.
147 Walks through config_vars *values* equal to `$grpc_mockserver`,
148 and replaces them with @ref grpc_mockserver_endpoint.
149
150 @ingroup userver_testsuite_fixtures
151 """
152
153 def patch_config(_config_yaml, config_vars):
154 for name in config_vars:
155 if config_vars[name] == '$grpc_mockserver':
156 config_vars[name] = grpc_mockserver_endpoint
157
158 return patch_config
159
160
161# Taken from
162# https://github.com/grpc/grpc/blob/master/examples/python/interceptors/headers/generic_client_interceptor.py
164 grpc.aio.UnaryUnaryClientInterceptor,
165 grpc.aio.UnaryStreamClientInterceptor,
166 grpc.aio.StreamUnaryClientInterceptor,
167 grpc.aio.StreamStreamClientInterceptor,
168):
169 def __init__(self):
170 self.prepare_func: Optional[
171 Callable[[grpc.aio.ClientCallDetails], Awaitable[None]]
172 ] = None
173
174 async def intercept_unary_unary(
175 self, continuation, client_call_details, request,
176 ):
177 await self.prepare_func(client_call_details)
178 return await continuation(client_call_details, request)
179
180 async def intercept_unary_stream(
181 self, continuation, client_call_details, request,
182 ):
183 await self.prepare_func(client_call_details)
184 return await continuation(client_call_details, next(request))
185
186 async def intercept_stream_unary(
187 self, continuation, client_call_details, request_iterator,
188 ):
189 await self.prepare_func(client_call_details)
190 return await continuation(client_call_details, request_iterator)
191
192 async def intercept_stream_stream(
193 self, continuation, client_call_details, request_iterator,
194 ):
195 await self.prepare_func(client_call_details)
196 return await continuation(client_call_details, request_iterator)
197
198
199@pytest.fixture(scope='session')
200def _grpc_channel_interceptor() -> _GenericClientInterceptor: