userver: /data/code/userver/testsuite/pytest_plugins/pytest_userver/plugins/kafka.py Source File
Loading...
Searching...
No Matches
kafka.py
1"""
2Plugin that imports the required fixtures to start the broker.
3"""
4
5import json
6import logging
7import os
8
9import pytest
10
11from testsuite.databases.kafka.classes import BootstrapServers
12
13pytest_plugins = [
14 'testsuite.databases.kafka.pytest_plugin',
15 'pytest_userver.plugins.core',
16]
17
18
19@pytest.fixture(scope='session')
20def _patched_bootstrap_servers_internal() -> BootstrapServers:
21 """Used for internal testing purposes"""
22
23 brokers_list = os.getenv('KAFKA_RECIPE_BROKER_LIST')
24 if brokers_list:
25 return brokers_list.split(',')
26
27 return []
28
29
30@pytest.fixture(scope='session')
31def kafka_secdist(_bootstrap_servers, service_config) -> str:
32 """
33 Automatically generates secdist config from user static config.
34 `_bootstrap_servers` is testsuite's fixture that determines current
35 bootstrap servers list depends on Kafka testsuite plugin's settings.
36
37 @snippet samples/kafka_service/testsuite/conftest.py Kafka service sample - secdist
38
39 @ingroup userver_testsuite_fixtures
40 """
41
42 single_setting = {
43 'brokers': _bootstrap_servers,
44 'username': '',
45 'password': '',
46 }
47 logging.info(f'Kafka brokers are: {single_setting["brokers"]}')
48
49 secdist_config = {}
50 secdist_config['kafka_settings'] = {}
51
52 components = service_config['components_manager']['components']
53 for component_name in components:
54 is_kafka_producer = component_name.startswith('kafka-producer')
55 is_kafka_consumer = component_name.startswith('kafka-consumer')
56 if is_kafka_producer or is_kafka_consumer:
57 secdist_config['kafka_settings'][component_name] = single_setting
58
59 return json.dumps(secdist_config)