1
2async def test_kafka_basic(service_client, testpoint):
3 received_key = ''
4
5
6
7 @testpoint('message_consumed')
8 def message_consumed(data):
9 nonlocal received_key
10 received_key = data['key']
11
12 await service_client.update_server_state()
13
14 TOPIC1 = 'test-topic-1'
15 TOPIC2 = 'test-topic-2'
16 MESSAGE_COUNT = 10
17
18 for send in range(MESSAGE_COUNT):
19 topic = TOPIC1 if send % 2 == 0 else TOPIC2
20 send_key = f'test-key-{send}'
21
22 response = await service_client.post(
23 '/produce',
24 json={
25 'topic': topic,
26 'key': send_key,
27 'payload': f'test-msg-{send}',
28 },
29 )
30 assert response.status_code == 200
31
32
33 await message_consumed.wait_call()
34
35 assert received_key == send_key
36
37
38