userver: kafka::ConsumerScope Class Reference
Loading...
Searching...
No Matches
kafka::ConsumerScope Class Referencefinal

#include <userver/kafka/consumer_scope.hpp>

Detailed Description

RAII class that used as interface for Apache Kafka Consumer interaction and proper lifetime management.

Its main purpose is to stop the message polling in user component (that holds ConsumerScope) destructor, because ConsumerScope::Callback often captures this pointer on user component.

Common usage:

ConsumerHandler::ConsumerHandler(const components::ComponentConfig& config, const components::ComponentContext& context)
: components::ComponentBase{config, context},
consumer_{context.FindComponent<kafka::ConsumerComponent>().GetConsumer()} {
consumer_.Start([this](kafka::MessageBatchView messages) {
Consume(messages);
consumer_.AsyncCommit();
});
}

Important implementation details

ConsumerScope holds reference to kafka::impl::Consumer that actually represents the Apache Kafka Balanced Consumer.

It exposes the API for asynchronous message batches processing that is polled from the subscribed topics partitions.

Consumer periodically polls the message batches from the subscribed topics partitions and invokes processing callback on each batch.

Also, Consumer maintains per topic statistics including the broker connection errors.

Note
Each ConsumerScope instance is not thread-safe. To speed up the topic messages processing, create more consumers with the same group_id.
See also
https://docs.confluent.io/platform/current/clients/consumer.html for basic consumer concepts
https://docs.confluent.io/platform/current/clients/librdkafka/html/md_INTRODUCTION.html#autotoc_md62 for understanding of balanced consumer groups
Warning
ConsumerScope::Start and ConsumerScope::Stop maybe called multiple times, but only in "start-stop" order and NOT concurrently.
Note
Must be placed as one of the last fields in the consumer component. Make sure to add a comment before the field:
// Subscription must be the last field! Add new fields above this comment.
Examples
samples/kafka_service/src/consumer_handler.hpp.

Definition at line 70 of file consumer_scope.hpp.

Public Types

using Callback = std::function<void(MessageBatchView)>
 Callback that is invoked on each polled message batch.
 

Public Member Functions

 ~ConsumerScope ()
 Stops the consumer (if not yet stopped).
 
 ConsumerScope (ConsumerScope &&) noexcept=delete
 
ConsumerScopeoperator= (ConsumerScope &&) noexcept=delete
 
const std::vector< std::string > & GetTopics () const
 Topics list consumer configured to subscribe.
 
void Start (Callback callback)
 Subscribes for configured topics and starts the consumer polling process.
 
void Stop () noexcept
 Revokes all topic partition consumer was subscribed on. Also closes the consumer, leaving the consumer balanced group.
 
void AsyncCommit ()
 Schedules the current assignment offsets commitment task. Intended to be called after each message batch processing cycle (but not necessarily).
 
OffsetRange GetOffsetRange (utils::zstring_view topic, std::uint32_t partition, std::optional< std::chrono::milliseconds > timeout=std::nullopt) const
 Retrieves the lowest and highest offsets for the specified topic and partition.
 
std::vector< std::uint32_t > GetPartitionIds (utils::zstring_view topic, std::optional< std::chrono::milliseconds > timeout=std::nullopt) const
 Retrieves the partition IDs for the specified topic.
 
void SetRebalanceCallback (ConsumerRebalanceCallback rebalance_callback)
 Sets the rebalance callback for a consumer.
 
void ResetRebalanceCallback ()
 Resets the rebalance callback for a consumer.
 
void Seek (utils::zstring_view topic, std::uint32_t partition_id, std::uint64_t offset, std::chrono::milliseconds timeout) const
 Seeks the specified topic partition to the given offset.
 
void SeekToBeginning (utils::zstring_view topic, std::uint32_t partition_id, std::chrono::milliseconds timeout) const
 Seeks the specified topic partition to the beginning.
 
void SeekToEnd (utils::zstring_view topic, std::uint32_t partition_id, std::chrono::milliseconds timeout) const
 Seeks the specified topic partition to the end.
 

Member Typedef Documentation

◆ Callback

using kafka::ConsumerScope::Callback = std::function<void(MessageBatchView)>

Callback that is invoked on each polled message batch.

Warning
If callback throws, it called over and over again with the batch with the same messages, until successful invocation. Though, user should consider idempotent message processing mechanism.

Definition at line 76 of file consumer_scope.hpp.

Member Function Documentation

◆ AsyncCommit()

void kafka::ConsumerScope::AsyncCommit ( )

Schedules the current assignment offsets commitment task. Intended to be called after each message batch processing cycle (but not necessarily).

Warning
Commit does not ensure that messages do not come again – they do not come again also without the commit within the same process. Commit, indeed, restricts other consumers in consumers group from reading messages already processed (committed) by the current consumer if current has stopped and leaved the group

◆ GetOffsetRange()

OffsetRange kafka::ConsumerScope::GetOffsetRange ( utils::zstring_view topic,
std::uint32_t partition,
std::optional< std::chrono::milliseconds > timeout = std::nullopt ) const

Retrieves the lowest and highest offsets for the specified topic and partition.

Exceptions
OffsetRangeExceptionif offsets could not be retrieved
OffsetRangeTimeoutExceptionif timeout is set and is reached
Warning
This is a blocking call
Parameters
topicThe name of the topic.
partitionThe partition number of the topic.
timeoutThe optional timeout for the operation.
Returns
Lowest and highest offsets for the given topic and partition.
See also
OffsetRange for more explanation

◆ GetPartitionIds()

std::vector< std::uint32_t > kafka::ConsumerScope::GetPartitionIds ( utils::zstring_view topic,
std::optional< std::chrono::milliseconds > timeout = std::nullopt ) const

Retrieves the partition IDs for the specified topic.

Exceptions
GetMetadataExceptionif failed to fetch metadata
TopicNotFoundExceptionif topic not found
OffsetRangeTimeoutExceptionif timeout is set and is reached
Warning
This is a blocking call
Parameters
topicThe name of the topic.
timeoutThe optional timeout for the operation.
Returns
A vector of partition IDs for the given topic.

◆ ResetRebalanceCallback()

void kafka::ConsumerScope::ResetRebalanceCallback ( )

Resets the rebalance callback for a consumer.

Warning
The rebalance callback must be set before calling ConsumerScope::Start or after calling ConsumerScope::Stop. The callback must not throw exceptions; any thrown exceptions will be caught and logged by the consumer implementation.
Note
The callback is invoked after the assign or revoke event has been successfully processed.

◆ Seek()

void kafka::ConsumerScope::Seek ( utils::zstring_view topic,
std::uint32_t partition_id,
std::uint64_t offset,
std::chrono::milliseconds timeout ) const

Seeks the specified topic partition to the given offset.

Exceptions
TimeoutExceptionif the operation times out.
SeekExceptionif an error occurs during the seek operation.
SeekInvalidArgumentExceptionif invalid timeout or offset is passed.
Warning
This is a blocking call and should only be invoked after ConsumerScope::Start call and before ConsumerScope::Stop call. It works only when the consumer has assigned partitions; otherwise, it throws SeekException.
Currently, it is required to call this from within the ConsumerRebalanceCallback. ConsumerScope::SetRebalanceCallback
Parameters
topicThe name of the topic.
partition_idThe partition ID of the given topic.
offsetThe offset to seek to, must be <= std::int64_t::max() or SeekInvalidArgumentException occurs.
timeoutThe timeout duration for the operation, must be > 0 or SeekInvalidArgumentException occurs.

◆ SeekToBeginning()

void kafka::ConsumerScope::SeekToBeginning ( utils::zstring_view topic,
std::uint32_t partition_id,
std::chrono::milliseconds timeout ) const

Seeks the specified topic partition to the beginning.

Exceptions
SeekExceptionif an error occurs during the seek operation.
SeekInvalidArgumentExceptionif invalid timeout is passed.
Warning
This is a blocking call and should only be invoked after ConsumerScope::Start call and before ConsumerScope::Stop call. It works only when the consumer has assigned partitions; otherwise, it throws SeekException.
Currently, it is required to call this from within the ConsumerRebalanceCallback. ConsumerScope::SetRebalanceCallback
Parameters
topicThe name of the topic.
partition_idThe partition ID of the given topic.
timeoutThe timeout duration for the operation, must be > 0 or SeekInvalidArgumentException occurs.

◆ SeekToEnd()

void kafka::ConsumerScope::SeekToEnd ( utils::zstring_view topic,
std::uint32_t partition_id,
std::chrono::milliseconds timeout ) const

Seeks the specified topic partition to the end.

Exceptions
SeekExceptionif an error occurs during the seek operation.
SeekInvalidArgumentExceptionif invalid timeout is passed.
Warning
This is a blocking call and should only be invoked after ConsumerScope::Start call and before ConsumerScope::Stop call. It works only when the consumer has assigned partitions; otherwise, it throws SeekException.
Currently, it is required to call this from within the ConsumerRebalanceCallback. ConsumerScope::SetRebalanceCallback
Parameters
topicThe name of the topic.
partition_idThe partition ID of the given topic.
timeoutThe timeout duration for the operation, must be > 0 or SeekInvalidArgumentException occurs.

◆ SetRebalanceCallback()

void kafka::ConsumerScope::SetRebalanceCallback ( ConsumerRebalanceCallback rebalance_callback)

Sets the rebalance callback for a consumer.

Warning
The rebalance callback must be set before calling ConsumerScope::Start or after calling ConsumerScope::Stop. The callback must not throw exceptions; any thrown exceptions will be caught and logged by the consumer implementation.
Note
The callback is invoked after the assign or revoke event has been successfully processed.

◆ Start()

void kafka::ConsumerScope::Start ( Callback callback)

Subscribes for configured topics and starts the consumer polling process.

Note
If callback throws an exception, entire message batch (also with successfully processed messages) come again, until callback succeeds
Warning
Each callback duration must not exceed the max_callback_duration time. Otherwise, consumer may stop consuming the message for unpredictable amount of time.

◆ Stop()

void kafka::ConsumerScope::Stop ( )
noexcept

Revokes all topic partition consumer was subscribed on. Also closes the consumer, leaving the consumer balanced group.

Called in the destructor of ConsumerScope automatically.

Can be called in the beginning of your destructor if some other actions in that destructor prevent the callback from functioning correctly.

After ConsumerScope::Stop call, subscribed topics partitions are distributed between other consumers with the same group_id.

Warning
Blocks until all kafka::Message destroyed (e.g. consumer cannot be stopped until user-callback is executing).

The documentation for this class was generated from the following file: