userver: kafka::ConsumerScope Class Reference
Loading...
Searching...
No Matches
kafka::ConsumerScope Class Referencefinal

#include <userver/kafka/consumer_scope.hpp>

Detailed Description

RAII class that used as interface for Apache Kafka Consumer interaction and proper lifetime management.

Its main purpose is to stop the message polling in user component (that holds ConsumerScope) destructor, because ConsumerScope::Callback often captures this pointer on user component.

Common usage:

ConsumerHandler::ConsumerHandler(const components::ComponentConfig& config, const components::ComponentContext& context)
: components::ComponentBase{config, context},
consumer_{context.FindComponent<kafka::ConsumerComponent>().GetConsumer()} {
consumer_.Start([this](kafka::MessageBatchView messages) {
Consume(messages);
consumer_.AsyncCommit();
});
}

Important implementation details

ConsumerScope holds reference to kafka::impl::Consumer that actually represents the Apache Kafka Balanced Consumer.

It exposes the API for asynchronous message batches processing that is polled from the subscribed topics partitions.

Consumer periodically polls the message batches from the subscribed topics partitions and invokes processing callback on each batch.

Also, Consumer maintains per topic statistics including the broker connection errors.

Note
Each ConsumerScope instance is not thread-safe. To speed up the topic messages processing, create more consumers with the same group_id.
See also
https://docs.confluent.io/platform/current/clients/consumer.html for basic consumer concepts
https://docs.confluent.io/platform/current/clients/librdkafka/html/md_INTRODUCTION.html#autotoc_md62 for understanding of balanced consumer groups
Warning
ConsumerScope::Start and ConsumerScope::Stop maybe called multiple times, but only in "start-stop" order and NOT concurrently.
Note
Must be placed as one of the last fields in the consumer component. Make sure to add a comment before the field:
// Subscription must be the last field! Add new fields above this comment.
Examples
samples/kafka_service/src/consumer_handler.hpp.

Definition at line 67 of file consumer_scope.hpp.

Public Types

using Callback = std::function<void(MessageBatchView)>
 Callback that is invoked on each polled message batch.
 

Public Member Functions

 ~ConsumerScope ()
 Stops the consumer (if not yet stopped).
 
 ConsumerScope (ConsumerScope &&) noexcept=delete
 
ConsumerScopeoperator= (ConsumerScope &&) noexcept=delete
 
void Start (Callback callback)
 Subscribes for configured topics and starts the consumer polling process.
 
void Stop () noexcept
 Revokes all topic partition consumer was subscribed on. Also closes the consumer, leaving the consumer balanced group.
 
void AsyncCommit ()
 Schedules the current assignment offsets commitment task. Intended to be called after each message batch processing cycle (but not necessarily).
 

Member Typedef Documentation

◆ Callback

using kafka::ConsumerScope::Callback = std::function<void(MessageBatchView)>

Callback that is invoked on each polled message batch.

Warning
If callback throws, it called over and over again with the batch with the same messages, until successful invocation. Though, user should consider idempotent message processing mechanism.

Definition at line 73 of file consumer_scope.hpp.

Member Function Documentation

◆ AsyncCommit()

void kafka::ConsumerScope::AsyncCommit ( )

Schedules the current assignment offsets commitment task. Intended to be called after each message batch processing cycle (but not necessarily).

Warning
Commit does not ensure that messages do not come again – they do not come again also without the commit within the same process. Commit, indeed, restricts other consumers in consumers group from reading messages already processed (committed) by the current consumer if current has stopped and leaved the group

◆ Start()

void kafka::ConsumerScope::Start ( Callback callback)

Subscribes for configured topics and starts the consumer polling process.

Note
If callback throws an exception, entire message batch (also with successfully processed messages) come again, until callback succeeds
Warning
Each callback duration must not exceed the max_callback_duration time. Otherwise, consumer may stop consuming the message for unpredictable amount of time.

◆ Stop()

void kafka::ConsumerScope::Stop ( )
noexcept

Revokes all topic partition consumer was subscribed on. Also closes the consumer, leaving the consumer balanced group.

Called in the destructor of ConsumerScope automatically.

Can be called in the beginning of your destructor if some other actions in that destructor prevent the callback from functioning correctly.

After ConsumerScope::Stop call, subscribed topics partitions are distributed between other consumers with the same group_id.

Warning
Blocks until all kafka::Message destroyed (e.g. consumer cannot be stopped until user-callback is executing).

The documentation for this class was generated from the following file: