userver: kafka::ConsumerComponent Class Reference
Loading...
Searching...
No Matches
kafka::ConsumerComponent Class Referencefinal

#include <userver/kafka/components/consumer_component.hpp>

Detailed Description

Apache Kafka Consumer client component.

Static configuration example:

# yaml
kafka-consumer:
env_pod_name: "HOSTNAME"
enable_auto_commit: false
group_id: "test-group"
auto_offset_reset: "smallest"
security_protocol: "PLAINTEXT"
topics:
- "test-topic-send"
- "test-topic-consume-1"
- "test-topic-consume-2"
- "test-topic-consume-produced-1"
- "test-topic-consume-produced-2"
max_batch_size: 10

Secdist format

A Kafka alias in secdist is described as a JSON object kafka_settings, containing credentials of Kafka brokers.

@pytest.fixture(scope='session')
def service_env():
single_setting = {
'brokers': os.getenv('KAFKA_RECIPE_BROKER_LIST'),
'username': '',
'password': '',
}
logging.info(f'Brokers are: {single_setting["brokers"]}')
secdist_config = {
'kafka_settings': {
'kafka-consumer': single_setting,
'kafka-producer-first': single_setting,
'kafka-producer-second': single_setting,
},
}
return {'SECDIST_CONFIG': json.dumps(secdist_config)}

Static options:

Name Description Default value
group_id consumer group id
topics list of topics consumer subscribes
enable_auto_commit whether to automatically and periodically commit offsets false
auto_offset_reset action to take when there is no initial offset in offset store
max_batch_size maximum batch size for one callback call
env_pod_name environment variable to substitute {pod_name} substring in group_id none
security_protocol protocol used to communicate with brokers
sasl_mechanisms SASL mechanism to use for authentication none
ssl_ca_location File or directory path to CA certificate(s) for verifying the broker's key 300000
topic_metadata_refresh_interval_ms period of time in milliseconds at which topic and broker metadata is refreshed 900000
metadata_max_age_ms metadata cache max age none

Definition at line 53 of file consumer_component.hpp.

+ Inheritance diagram for kafka::ConsumerComponent:
+ Collaboration diagram for kafka::ConsumerComponent:

Public Member Functions

 ConsumerComponent (const components::ComponentConfig &config, const components::ComponentContext &context)
 
ConsumerScope GetConsumer ()
 
- Public Member Functions inherited from components::LoggableComponentBase
 LoggableComponentBase (const ComponentConfig &, const ComponentContext &)
 
 LoggableComponentBase (LoggableComponentBase &&)=delete
 
 LoggableComponentBase (const LoggableComponentBase &)=delete
 
 ~LoggableComponentBase () override=default
 
ComponentHealth GetComponentHealth () const override
 
void OnLoadingCancelled () override
 
void OnAllComponentsLoaded () override
 
void OnAllComponentsAreStopping () override
 

Static Public Member Functions

static yaml_config::Schema GetStaticConfigSchema ()
 
- Static Public Member Functions inherited from components::LoggableComponentBase
static yaml_config::Schema GetStaticConfigSchema ()
 

Static Public Attributes

static constexpr std::string_view kName = "kafka-consumer"
 

Member Data Documentation

◆ kName

constexpr std::string_view kafka::ConsumerComponent::kName = "kafka-consumer"
staticconstexpr

Definition at line 55 of file consumer_component.hpp.


The documentation for this class was generated from the following file: