#include <userver/kafka/consumer_scope.hpp>
RAII class that used as interface for Apache Kafka Consumer interaction and proper lifetime management.
Its main purpose is to stop the message polling in user component (that holds ConsumerScope
) destructor, because ConsumerScope::Callback
often captures this
pointer on user component.
Common usage:
ConsumerScope
holds reference to impl::Consumer
that actually represents the Apache Kafka Balanced Consumer.
It exposes the API for asynchronous message batches processing that is polled from the subscribed topics partitions.
Consumer periodically polls the message batches from the subscribed topics partitions and invokes processing callback on each batch.
Also, Consumer maintains the per topic statistics including the broker connection errors.
ConsumerScope
instance is not thread-safe. To speed up the topic messages processing, create more consumers with the same group_id
.ConsumerScope::Start
and ConsumerScope::Stop
maybe called multiple times, but only in "start-stop" order and NOT concurrentlyDefinition at line 66 of file consumer_scope.hpp.
Public Types | |
using | Callback = std::function<void(MessageBatchView)> |
Callback that invokes on each polled message batch. | |
Public Member Functions | |
~ConsumerScope () | |
Stops the consumer (if not yet stopped). | |
ConsumerScope (ConsumerScope &&) noexcept=delete | |
ConsumerScope & | operator= (ConsumerScope &&) noexcept=delete |
void | Start (Callback callback) |
Subscribes for configured topics and starts the consumer polling process. | |
void | Stop () noexcept |
Revokes all topic partition consumer was subscribed on. Also closes the consumer, leaving the consumer balanced group. | |
void | AsyncCommit () |
Schedules the current assignment offsets committment task. Intended to be called after each message batch processing cycle. | |
using kafka::ConsumerScope::Callback = std::function<void(MessageBatchView)> |
Callback that invokes on each polled message batch.
Definition at line 72 of file consumer_scope.hpp.
void kafka::ConsumerScope::AsyncCommit | ( | ) |
Schedules the current assignment offsets committment task. Intended to be called after each message batch processing cycle.
AsyncCommit
manually, consider setting enable_auto_commit: true
in the static config. But read Kafka documentation carefully before to understand what auto committment mechanism actually mean void kafka::ConsumerScope::Start | ( | Callback | callback | ) |
Subscribes for configured topics and starts the consumer polling process.
callback
throws an exception, entire message batch (also with successfully processed messages) come again, until callback succeedes
|
noexcept |
Revokes all topic partition consumer was subscribed on. Also closes the consumer, leaving the consumer balanced group.
Called in the destructor of ConsumerScope automatically.
Can be called in the beginning of your destructor if some other actions in that destructor prevent the callback from functioning correctly.
After Stop
call, subscribed topics parititions are destributed between other consumers with the same group_id
.
kafka::Message
destroyed.