userver: grpc/functional_tests/basic_chaos/tests-grpcclient/service.py
Loading...
Searching...
No Matches
grpc/functional_tests/basic_chaos/tests-grpcclient/service.py

Installs as a mock servicer, the class of which should inherit from a generated *Servicer class.

Installs as a mock servicer, the class of which should inherit from a generated *Servicer class.For example, this servicer class can be installed as follows:

@pytest.fixture(name='greeter_mock')
def _greeter_mock(grpc_mockserver):
mock = GreeterService()
grpc_mockserver.install_servicer(mock)
return mock
Note
Inheritance from multiple *Servicer classes at once is allowed.
1import asyncio
2
3import samples.greeter_pb2 as greeter_pb2
4import samples.greeter_pb2_grpc as greeter_pb2_grpc
5
6
7class GreeterService(greeter_pb2_grpc.GreeterServiceServicer):
8 async def SayHello(self, request, context):
9 return greeter_pb2.GreetingResponse(greeting=f'Hello, {request.name}!')
10
11 async def SayHelloResponseStream(self, request, context):
12 time_interval = 0.2
13 message = f'Hello, {request.name}'
14 for i in range(5):
15 message += '!'
16 await asyncio.sleep(time_interval)
17 yield greeter_pb2.GreetingResponse(greeting=message)
18
19 async def SayHelloRequestStream(self, request_iterator, context):
20 message = 'Hello, '
21 async for request in request_iterator:
22 message += f'{request.name}'
23 return greeter_pb2.GreetingResponse(greeting=message)
24
25 async def SayHelloStreams(self, request_iterator, context):
26 time_interval = 0.2
27 income = ''
28 async for request in request_iterator:
29 income += f'{request.name}'
30 await asyncio.sleep(time_interval)
31 yield greeter_pb2.GreetingResponse(greeting=f'Hello, {income}')
32
33 async def SayHelloIndependentStreams(self, request_iterator, context):
34 time_interval_read = 0.2
35 time_interval_write = 0.3
36 final_string = ''
37 async for request in request_iterator:
38 await asyncio.sleep(time_interval_read)
39 final_string += f'{request.name}'
40 values = [
41 'Python',
42 'C++',
43 'linux',
44 'userver',
45 'grpc',
46 'kernel',
47 'developer',
48 'core',
49 'anonymous',
50 'user',
51 ]
52 for val in values:
53 await asyncio.sleep(time_interval_write)
54 yield greeter_pb2.GreetingResponse(greeting=f'Hello, {val}')
55 yield greeter_pb2.GreetingResponse(greeting=final_string)