userver: grpc/functional_tests/basic_chaos/tests-grpcclient/service.py
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages Concepts
grpc/functional_tests/basic_chaos/tests-grpcclient/service.py

Installs as a mock servicer, the class of which should inherit from a generated *Servicer class.

Installs as a mock servicer, the class of which should inherit from a generated *Servicer class.For example, this servicer class can be installed as follows:

@pytest.fixture(name='greeter_mock')
def _greeter_mock(grpc_mockserver):
mock = GreeterService()
grpc_mockserver.install_servicer(mock)
return mock
Note
Inheritance from multiple *Servicer classes at once is allowed.
1import asyncio
2
3import samples.greeter_pb2 as greeter_pb2
4import samples.greeter_pb2_grpc as greeter_pb2_grpc
5
6
7class GreeterService(greeter_pb2_grpc.GreeterServiceServicer):
8 async def SayHello(self, request, context):
9 return greeter_pb2.GreetingResponse(greeting=f'Hello, {request.name}!')
10
11 async def SayHelloResponseStream(self, request, context):
12 time_interval = 0.2
13 message = f'Hello, {request.name}'
14 for i in range(5):
15 message += '!'
16 await asyncio.sleep(time_interval)
17 yield greeter_pb2.GreetingResponse(greeting=message)
18 return
19
20 async def SayHelloRequestStream(self, request_iterator, context):
21 message = 'Hello, '
22 async for request in request_iterator:
23 message += f'{request.name}'
24 return greeter_pb2.GreetingResponse(greeting=message)
25
26 async def SayHelloStreams(self, request_iterator, context):
27 time_interval = 0.2
28 income = ''
29 async for request in request_iterator:
30 income += f'{request.name}'
31 await asyncio.sleep(time_interval)
32 yield greeter_pb2.GreetingResponse(greeting=f'Hello, {income}')
33 return
34
35 async def SayHelloIndependentStreams(self, request_iterator, context):
36 time_interval_read = 0.2
37 time_interval_write = 0.3
38 final_string = ''
39 async for request in request_iterator:
40 await asyncio.sleep(time_interval_read)
41 final_string += f'{request.name}'
42 values = [
43 'Python',
44 'C++',
45 'linux',
46 'userver',
47 'grpc',
48 'kernel',
49 'developer',
50 'core',
51 'anonymous',
52 'user',
53 ]
54 for val in values:
55 await asyncio.sleep(time_interval_write)
56 yield greeter_pb2.GreetingResponse(greeting=f'Hello, {val}')
57 yield greeter_pb2.GreetingResponse(greeting=final_string)