Apache Kafka Consumer client component.
Static configuration example:
# yaml
kafka-consumer:
env_pod_name: "HOSTNAME"
enable_auto_commit: false
group_id: "test-group"
auto_offset_reset: "smallest"
security_protocol: "PLAINTEXT"
topics:
- "test-topic-send"
- "test-topic-consume-1"
- "test-topic-consume-2"
- "test-topic-consume-produced-1"
- "test-topic-consume-produced-2"
max_batch_size: 10
Secdist format
A Kafka alias in secdist is described as a JSON object kafka_settings
, containing credentials of Kafka brokers.
@pytest.fixture(scope='session')
def service_env():
single_setting = {
'brokers': os.getenv('KAFKA_RECIPE_BROKER_LIST'),
'username': '',
'password': '',
}
logging.info(f'Brokers are: {single_setting["brokers"]}')
secdist_config = {
'kafka_settings': {
'kafka-consumer': single_setting,
'kafka-producer-first': single_setting,
'kafka-producer-second': single_setting,
},
}
return {'SECDIST_CONFIG': json.dumps(secdist_config)}
Static options:
Name | Description | Default value |
group_id | consumer group id | – |
topics | list of topics consumer subscribes | – |
enable_auto_commit | whether to automatically and periodically commit offsets | false |
auto_offset_reset | action to take when there is no initial offset in offset store | – |
max_batch_size | maximum batch size for one callback call | – |
env_pod_name | environment variable to substitute {pod_name} substring in group_id | none |
security_protocol | protocol used to communicate with brokers | – |
sasl_mechanisms | SASL mechanism to use for authentication | none |
ssl_ca_location | File or directory path to CA certificate(s) for verifying the broker's key | 300000 |
topic_metadata_refresh_interval_ms | period of time in milliseconds at which topic and broker metadata is refreshed | 900000 |
metadata_max_age_ms | metadata cache max age | none |
Definition at line 53 of file consumer_component.hpp.