#include <userver/kafka/consumer_scope.hpp>
RAII class that used as interface for Apache Kafka Consumer interaction and proper lifetime management.
Its main purpose is to stop the message polling in user component (that holds ConsumerScope) destructor, because ConsumerScope::Callback often captures this
pointer on user component.
Common usage:
ConsumerScope holds reference to kafka::impl::Consumer that actually represents the Apache Kafka Balanced Consumer.
It exposes the API for asynchronous message batches processing that is polled from the subscribed topics partitions.
Consumer periodically polls the message batches from the subscribed topics partitions and invokes processing callback on each batch.
Also, Consumer maintains per topic statistics including the broker connection errors.
group_id
.Definition at line 70 of file consumer_scope.hpp.
Public Types | |
using | Callback = std::function<void(MessageBatchView)> |
Callback that is invoked on each polled message batch. | |
Public Member Functions | |
~ConsumerScope () | |
Stops the consumer (if not yet stopped). | |
ConsumerScope (ConsumerScope &&) noexcept=delete | |
ConsumerScope & | operator= (ConsumerScope &&) noexcept=delete |
const std::vector< std::string > & | GetTopics () const |
Topics list consumer configured to subscribe. | |
void | Start (Callback callback) |
Subscribes for configured topics and starts the consumer polling process. | |
void | Stop () noexcept |
Revokes all topic partition consumer was subscribed on. Also closes the consumer, leaving the consumer balanced group. | |
void | AsyncCommit () |
Schedules the current assignment offsets commitment task. Intended to be called after each message batch processing cycle (but not necessarily). | |
OffsetRange | GetOffsetRange (utils::zstring_view topic, std::uint32_t partition, std::optional< std::chrono::milliseconds > timeout=std::nullopt) const |
Retrieves the lowest and highest offsets for the specified topic and partition. | |
std::vector< std::uint32_t > | GetPartitionIds (utils::zstring_view topic, std::optional< std::chrono::milliseconds > timeout=std::nullopt) const |
Retrieves the partition IDs for the specified topic. | |
void | SetRebalanceCallback (ConsumerRebalanceCallback rebalance_callback) |
Sets the rebalance callback for a consumer. | |
void | ResetRebalanceCallback () |
Resets the rebalance callback for a consumer. | |
void | Seek (utils::zstring_view topic, std::uint32_t partition_id, std::uint64_t offset, std::chrono::milliseconds timeout) const |
Seeks the specified topic partition to the given offset. | |
void | SeekToBeginning (utils::zstring_view topic, std::uint32_t partition_id, std::chrono::milliseconds timeout) const |
Seeks the specified topic partition to the beginning. | |
void | SeekToEnd (utils::zstring_view topic, std::uint32_t partition_id, std::chrono::milliseconds timeout) const |
Seeks the specified topic partition to the end. | |
using kafka::ConsumerScope::Callback = std::function<void(MessageBatchView)> |
Callback that is invoked on each polled message batch.
Definition at line 76 of file consumer_scope.hpp.
void kafka::ConsumerScope::AsyncCommit | ( | ) |
Schedules the current assignment offsets commitment task. Intended to be called after each message batch processing cycle (but not necessarily).
OffsetRange kafka::ConsumerScope::GetOffsetRange | ( | utils::zstring_view | topic, |
std::uint32_t | partition, | ||
std::optional< std::chrono::milliseconds > | timeout = std::nullopt ) const |
Retrieves the lowest and highest offsets for the specified topic and partition.
OffsetRangeException | if offsets could not be retrieved |
OffsetRangeTimeoutException | if timeout is set and is reached |
topic | The name of the topic. |
partition | The partition number of the topic. |
timeout | The optional timeout for the operation. |
std::vector< std::uint32_t > kafka::ConsumerScope::GetPartitionIds | ( | utils::zstring_view | topic, |
std::optional< std::chrono::milliseconds > | timeout = std::nullopt ) const |
Retrieves the partition IDs for the specified topic.
GetMetadataException | if failed to fetch metadata |
TopicNotFoundException | if topic not found |
OffsetRangeTimeoutException | if timeout is set and is reached |
topic | The name of the topic. |
timeout | The optional timeout for the operation. |
void kafka::ConsumerScope::ResetRebalanceCallback | ( | ) |
Resets the rebalance callback for a consumer.
void kafka::ConsumerScope::Seek | ( | utils::zstring_view | topic, |
std::uint32_t | partition_id, | ||
std::uint64_t | offset, | ||
std::chrono::milliseconds | timeout ) const |
Seeks the specified topic partition to the given offset.
TimeoutException | if the operation times out. |
SeekException | if an error occurs during the seek operation. |
SeekInvalidArgumentException | if invalid timeout or offset is passed. |
topic | The name of the topic. |
partition_id | The partition ID of the given topic. |
offset | The offset to seek to, must be <= std::int64_t::max() or SeekInvalidArgumentException occurs. |
timeout | The timeout duration for the operation, must be > 0 or SeekInvalidArgumentException occurs. |
void kafka::ConsumerScope::SeekToBeginning | ( | utils::zstring_view | topic, |
std::uint32_t | partition_id, | ||
std::chrono::milliseconds | timeout ) const |
Seeks the specified topic partition to the beginning.
SeekException | if an error occurs during the seek operation. |
SeekInvalidArgumentException | if invalid timeout is passed. |
topic | The name of the topic. |
partition_id | The partition ID of the given topic. |
timeout | The timeout duration for the operation, must be > 0 or SeekInvalidArgumentException occurs. |
void kafka::ConsumerScope::SeekToEnd | ( | utils::zstring_view | topic, |
std::uint32_t | partition_id, | ||
std::chrono::milliseconds | timeout ) const |
Seeks the specified topic partition to the end.
SeekException | if an error occurs during the seek operation. |
SeekInvalidArgumentException | if invalid timeout is passed. |
topic | The name of the topic. |
partition_id | The partition ID of the given topic. |
timeout | The timeout duration for the operation, must be > 0 or SeekInvalidArgumentException occurs. |
void kafka::ConsumerScope::SetRebalanceCallback | ( | ConsumerRebalanceCallback | rebalance_callback | ) |
Sets the rebalance callback for a consumer.
void kafka::ConsumerScope::Start | ( | Callback | callback | ) |
Subscribes for configured topics and starts the consumer polling process.
callback
throws an exception, entire message batch (also with successfully processed messages) come again, until callback succeeds max_callback_duration
time. Otherwise, consumer may stop consuming the message for unpredictable amount of time.
|
noexcept |
Revokes all topic partition consumer was subscribed on. Also closes the consumer, leaving the consumer balanced group.
Called in the destructor of ConsumerScope automatically.
Can be called in the beginning of your destructor if some other actions in that destructor prevent the callback from functioning correctly.
After ConsumerScope::Stop call, subscribed topics partitions are distributed between other consumers with the same group_id
.