userver: kafka::ConsumerScope Class Reference
Loading...
Searching...
No Matches
kafka::ConsumerScope Class Referencefinal

#include <userver/kafka/consumer_scope.hpp>

Detailed Description

RAII class that used as interface for Apache Kafka Consumer interaction and proper lifetime management.

Its main purpose is to stop the message polling in user component (that holds ConsumerScope) destructor, because ConsumerScope::Callback often captures this pointer on user component.

Common usage:

ConsumerHandler::ConsumerHandler(const components::ComponentConfig& config, const components::ComponentContext& context)
: components::ComponentBase{config, context},
consumer_{context.FindComponent<kafka::ConsumerComponent>().GetConsumer()} {
consumer_.Start([this](kafka::MessageBatchView messages) {
Consume(messages);
consumer_.AsyncCommit();
});
}

Important implementation details

ConsumerScope holds reference to kafka::impl::Consumer that actually represents the Apache Kafka Balanced Consumer.

It exposes the API for asynchronous message batches processing that is polled from the subscribed topics partitions.

Consumer periodically polls the message batches from the subscribed topics partitions and invokes processing callback on each batch.

Also, Consumer maintains per topic statistics including the broker connection errors.

Note
Each ConsumerScope instance is not thread-safe. To speed up the topic messages processing, create more consumers with the same group_id.
See also
https://docs.confluent.io/platform/current/clients/consumer.html for basic consumer concepts
https://docs.confluent.io/platform/current/clients/librdkafka/html/md_INTRODUCTION.html#autotoc_md62 for understanding of balanced consumer groups
Warning
ConsumerScope::Start and ConsumerScope::Stop maybe called multiple times, but only in "start-stop" order and NOT concurrently.
Note
Must be placed as one of the last fields in the consumer component. Make sure to add a comment before the field:
// Subscription must be the last field! Add new fields above this comment.
Examples
samples/kafka_service/src/consumer_handler.hpp.

Definition at line 68 of file consumer_scope.hpp.

Public Types

using Callback = std::function<void(MessageBatchView)>
 Callback that is invoked on each polled message batch.
 

Public Member Functions

 ~ConsumerScope ()
 Stops the consumer (if not yet stopped).
 
 ConsumerScope (ConsumerScope &&) noexcept=delete
 
ConsumerScopeoperator= (ConsumerScope &&) noexcept=delete
 
void Start (Callback callback)
 Subscribes for configured topics and starts the consumer polling process.
 
void Stop () noexcept
 Revokes all topic partition consumer was subscribed on. Also closes the consumer, leaving the consumer balanced group.
 
void AsyncCommit ()
 Schedules the current assignment offsets commitment task. Intended to be called after each message batch processing cycle (but not necessarily).
 
OffsetRange GetOffsetRange (const std::string &topic, std::uint32_t partition, std::optional< std::chrono::milliseconds > timeout=std::nullopt) const
 Retrieves the lowest and highest offsets for the specified topic and partition.
 
std::vector< std::uint32_t > GetPartitionIds (const std::string &topic, std::optional< std::chrono::milliseconds > timeout=std::nullopt) const
 Retrieves the partition IDs for the specified topic.
 

Member Typedef Documentation

◆ Callback

using kafka::ConsumerScope::Callback = std::function<void(MessageBatchView)>

Callback that is invoked on each polled message batch.

Warning
If callback throws, it called over and over again with the batch with the same messages, until successful invocation. Though, user should consider idempotent message processing mechanism.

Definition at line 74 of file consumer_scope.hpp.

Member Function Documentation

◆ AsyncCommit()

void kafka::ConsumerScope::AsyncCommit ( )

Schedules the current assignment offsets commitment task. Intended to be called after each message batch processing cycle (but not necessarily).

Warning
Commit does not ensure that messages do not come again – they do not come again also without the commit within the same process. Commit, indeed, restricts other consumers in consumers group from reading messages already processed (committed) by the current consumer if current has stopped and leaved the group

◆ GetOffsetRange()

OffsetRange kafka::ConsumerScope::GetOffsetRange ( const std::string & topic,
std::uint32_t partition,
std::optional< std::chrono::milliseconds > timeout = std::nullopt ) const

Retrieves the lowest and highest offsets for the specified topic and partition.

Exceptions
OffsetRangeExceptionif offsets could not be retrieved
Warning
This is a blocking call
Parameters
topicThe name of the topic.
partitionThe partition number of the topic.
Returns
Lowest and highest offsets for the given topic and partition.
See also
OffsetRange for more explanation

◆ GetPartitionIds()

std::vector< std::uint32_t > kafka::ConsumerScope::GetPartitionIds ( const std::string & topic,
std::optional< std::chrono::milliseconds > timeout = std::nullopt ) const

Retrieves the partition IDs for the specified topic.

Exceptions
GetMetadataExceptionif failed to fetch metadata
TopicNotFoundExceptionif topic not found
Warning
This is a blocking call
Parameters
topicThe name of the topic.
Returns
A vector of partition IDs for the given topic.

◆ Start()

void kafka::ConsumerScope::Start ( Callback callback)

Subscribes for configured topics and starts the consumer polling process.

Note
If callback throws an exception, entire message batch (also with successfully processed messages) come again, until callback succeeds
Warning
Each callback duration must not exceed the max_callback_duration time. Otherwise, consumer may stop consuming the message for unpredictable amount of time.

◆ Stop()

void kafka::ConsumerScope::Stop ( )
noexcept

Revokes all topic partition consumer was subscribed on. Also closes the consumer, leaving the consumer balanced group.

Called in the destructor of ConsumerScope automatically.

Can be called in the beginning of your destructor if some other actions in that destructor prevent the callback from functioning correctly.

After ConsumerScope::Stop call, subscribed topics partitions are distributed between other consumers with the same group_id.

Warning
Blocks until all kafka::Message destroyed (e.g. consumer cannot be stopped until user-callback is executing).

The documentation for this class was generated from the following file: