userver: /data/code/userver/testsuite/pytest_plugins/pytest_userver/plugins/grpc/client.py Source File
Loading...
Searching...
No Matches
client.py
1"""
2Make gRPC requests to the service.
3
4@sa @ref scripts/docs/en/userver/tutorial/grpc_service.md
5"""
6
7# pylint: disable=no-member
8# pylint: disable=redefined-outer-name
9import asyncio
10from typing import Awaitable
11from typing import Callable
12from typing import Optional
13
14import grpc
15import pytest
16
17from pytest_userver import client
18
19DEFAULT_TIMEOUT = 15.0
20
21USERVER_CONFIG_HOOKS = ['userver_config_grpc_mockserver']
22
23
24@pytest.fixture(scope='session')
25def grpc_service_port(service_config) -> int:
26 """
27 Returns the gRPC listener port number of the service that is set in the
28 static configuration file.
29
30 Override this fixture to change the way the gRPC listener port number
31 is retrieved by the testsuite for tests.
32
33 @ingroup userver_testsuite_fixtures
34 """
35 components = service_config['components_manager']['components']
36 if 'grpc-server' not in components:
37 raise RuntimeError('No grpc-server component')
38 return int(components['grpc-server']['port'])
39
40
41@pytest.fixture(scope='session')
42def grpc_service_endpoint(service_config, grpc_service_port) -> str:
43 """
44 Returns the gRPC endpoint of the service.
45
46 Override this fixture to change the way the gRPC endpoint
47 is retrieved by the testsuite for tests.
48
49 @ingroup userver_testsuite_fixtures
50 """
51 components = service_config['components_manager']['components']
52 if 'grpc-server' not in components:
53 raise RuntimeError('No grpc-server component')
54 grpc_server_unix_socket = components['grpc-server'].get('unix-socket-path')
55 return (
56 f'unix:{grpc_server_unix_socket}' if grpc_server_unix_socket is not None else f'localhost:{grpc_service_port}'
57 )
58
59
60@pytest.fixture(scope='session')
61def grpc_service_timeout(pytestconfig) -> float:
62 """
63 Returns the gRPC timeout for the service that is set by the command
64 line option `--service-timeout`.
65
66 Override this fixture to change the way the gRPC timeout
67 is set.
68
69 @ingroup userver_testsuite_fixtures
70 """
71 return float(pytestconfig.option.service_timeout) or DEFAULT_TIMEOUT
72
73
74@pytest.fixture
76 service_client,
77 _testsuite_client_config: client.TestsuiteClientConfig,
78) -> Callable[[grpc.aio.ClientCallDetails], Awaitable[None]]:
79 """
80 Returns the function that will be called in before each gRPC request,
81 client-side.
82
83 @ingroup userver_testsuite_fixtures
84 """
85
86 async def prepare(
87 _client_call_details: grpc.aio.ClientCallDetails,
88 /,
89 ) -> None:
90 if isinstance(service_client, client.AiohttpClient):
91 await service_client.update_server_state()
92
93 return prepare
94
95
96@pytest.fixture(scope='session')
97async def grpc_session_channel(
98 grpc_service_endpoint,
99 _grpc_channel_interceptor,
100):
101 async with grpc.aio.insecure_channel(
102 grpc_service_endpoint,
103 interceptors=[_grpc_channel_interceptor],
104 ) as channel:
105 yield channel
106
107
108@pytest.fixture
109async def grpc_channel(
110 grpc_service_endpoint,
111 grpc_service_deps,
112 grpc_service_timeout,
113 grpc_session_channel,
114 _grpc_channel_interceptor,
115 grpc_client_prepare,
116):
117 """
118 Returns the gRPC channel configured by the parameters from the
119 @ref plugins.grpc.grpc_service_endpoint "grpc_service_endpoint" fixture.
120
121 @ingroup userver_testsuite_fixtures
122 """
123 _grpc_channel_interceptor.prepare_func = grpc_client_prepare
124 try:
125 await asyncio.wait_for(
126 grpc_session_channel.channel_ready(),
127 timeout=grpc_service_timeout,
128 )
129 except asyncio.TimeoutError:
130 raise RuntimeError(
131 f'Failed to connect to remote gRPC server by ' f'address {grpc_service_endpoint}',
132 )
133 return grpc_session_channel
134
135
136@pytest.fixture
137def grpc_service_deps(service_client):
138 """
139 gRPC service dependencies hook. Feel free to override it.
140
141 @ingroup userver_testsuite_fixtures
142 """
143
144
145@pytest.fixture(scope='session')
146def userver_config_grpc_mockserver(grpc_mockserver_endpoint):
147 """
148 Returns a function that adjusts the static config for testsuite.
149 Walks through config_vars *values* equal to `$grpc_mockserver`,
150 and replaces them with @ref grpc_mockserver_endpoint.
151
152 @ingroup userver_testsuite_fixtures
153 """
154
155 def patch_config(_config_yaml, config_vars):
156 for name in config_vars:
157 if config_vars[name] == '$grpc_mockserver':
158 config_vars[name] = grpc_mockserver_endpoint
159
160 return patch_config
161
162
163# Taken from
164# https://github.com/grpc/grpc/blob/master/examples/python/interceptors/headers/generic_client_interceptor.py
166 grpc.aio.UnaryUnaryClientInterceptor,
167 grpc.aio.UnaryStreamClientInterceptor,
168 grpc.aio.StreamUnaryClientInterceptor,
169 grpc.aio.StreamStreamClientInterceptor,
170):
171 def __init__(self):
172 self.prepare_func: Optional[Callable[[grpc.aio.ClientCallDetails], Awaitable[None]]] = None
173
174 async def intercept_unary_unary(
175 self,
176 continuation,
177 client_call_details,
178 request,
179 ):
180 await self.prepare_func(client_call_details)
181 return await continuation(client_call_details, request)
182
183 async def intercept_unary_stream(
184 self,
185 continuation,
186 client_call_details,
187 request,
188 ):
189 await self.prepare_func(client_call_details)
190 return await continuation(client_call_details, next(request))
191
192 async def intercept_stream_unary(
193 self,
194 continuation,
195 client_call_details,
196 request_iterator,
197 ):
198 await self.prepare_func(client_call_details)
199 return await continuation(client_call_details, request_iterator)
200
201 async def intercept_stream_stream(
202 self,
203 continuation,
204 client_call_details,
205 request_iterator,
206 ):
207 await self.prepare_func(client_call_details)
208 return await continuation(client_call_details, request_iterator)
209
210
211@pytest.fixture(scope='session')
212def _grpc_channel_interceptor() -> _GenericClientInterceptor: