userver: samples/kafka_service/testsuite/test_kafka.py
Loading...
Searching...
No Matches
samples/kafka_service/testsuite/test_kafka.py
1# /// [Kafka service sample - kafka functional test example]
2async def test_kafka_basic(service_client, testpoint):
3 received_key = ''
4
5 # register testpoint, which consumer calls
6 # after each message is consumed
7 @testpoint('message_consumed')
8 def message_consumed(data):
9 nonlocal received_key
10 received_key = data['key']
11
12 await service_client.update_server_state()
13
14 TOPIC1 = 'test-topic-1'
15 TOPIC2 = 'test-topic-2'
16 MESSAGE_COUNT = 10
17
18 for send in range(MESSAGE_COUNT):
19 topic = TOPIC1 if send % 2 == 0 else TOPIC2
20 send_key = f'test-key-{send}'
21 # send message and waits its delivery
22 response = await service_client.post(
23 '/produce',
24 json={
25 'topic': topic,
26 'key': send_key,
27 'payload': f'test-msg-{send}',
28 },
29 )
30 assert response.status_code == 200
31
32 # wait until consume read the message and call the testpoint
33 await message_consumed.wait_call()
34 # check message key
35 assert received_key == send_key
36 # /// [Kafka service sample - kafka functional test example]