userver: kafka::ConsumerScope Class Reference
Loading...
Searching...
No Matches
kafka::ConsumerScope Class Referencefinal

#include <userver/kafka/consumer_scope.hpp>

Detailed Description

RAII class that used as interface for Apache Kafka Consumer interaction and proper lifetime management.

Its main purpose is to stop the message polling in user component (that holds ConsumerScope) destructor, because ConsumerScope::Callback often captures this pointer on user component.

Common usage:

HandlerKafkaConsumer::HandlerKafkaConsumer(
: server::handlers::HttpHandlerJsonBase(config, context),
consumer_(
context.FindComponent<kafka::ConsumerComponent>("kafka-consumer")
.GetConsumer()) {
consumer_.Start([this](kafka::MessageBatchView messages) {
Consume(messages);
consumer_.AsyncCommit();
});
}

Important implementation details

ConsumerScope holds reference to impl::Consumer that actually represents the Apache Kafka Balanced Consumer.

It exposes the API for asynchronous message batches processing that is polled from the subscribed topics partitions.

Consumer periodically polls the message batches from the subscribed topics partitions and invokes processing callback on each batch.

Also, Consumer maintains the per topic statistics including the broker connection errors.

Note
Each ConsumerScope instance is not thread-safe. To speed up the topic messages processing, create more consumers with the same group_id.
See also
https://docs.confluent.io/platform/current/clients/consumer.html for basic consumer concepts
https://docs.confluent.io/platform/current/clients/librdkafka/html/md_INTRODUCTION.html#autotoc_md62 for understanding of balanced consumer groups
Warning
ConsumerScope::Start and ConsumerScope::Stop maybe called multiple times, but only in "start-stop" order and NOT concurrently
Note
Must be placed as one of the last fields in the consumer component. Make sure to add a comment before the field:
// Subscription must be the last field! Add new fields above this comment.

Definition at line 66 of file consumer_scope.hpp.

Public Types

using Callback = std::function<void(MessageBatchView)>
 Callback that invokes on each polled message batch.
 

Public Member Functions

 ~ConsumerScope ()
 Stops the consumer (if not yet stopped).
 
 ConsumerScope (ConsumerScope &&) noexcept=delete
 
ConsumerScopeoperator= (ConsumerScope &&) noexcept=delete
 
void Start (Callback callback)
 Subscribes for configured topics and starts the consumer polling process.
 
void Stop () noexcept
 Revokes all topic partition consumer was subscribed on. Also closes the consumer, leaving the consumer balanced group.
 
void AsyncCommit ()
 Schedules the current assignment offsets committment task. Intended to be called after each message batch processing cycle.
 

Member Typedef Documentation

◆ Callback

using kafka::ConsumerScope::Callback = std::function<void(MessageBatchView)>

Callback that invokes on each polled message batch.

Warning
If callback throws, it called over and over again with the batch with the same messages, until successfull invokation. Though, user should consider idempotent message processing mechanism

Definition at line 72 of file consumer_scope.hpp.

Member Function Documentation

◆ AsyncCommit()

void kafka::ConsumerScope::AsyncCommit ( )

Schedules the current assignment offsets committment task. Intended to be called after each message batch processing cycle.

Warning
Commit does not ensure that messages do not come again – they do not come again also without the commit within the same process. Commit, indeed, restricts other consumers in consumers group from reading messages already processed (committed) by the current consumer if current has stopped and leaved the group
Note
Instead of calling AsyncCommit manually, consider setting enable_auto_commit: true in the static config. But read Kafka documentation carefully before to understand what auto committment mechanism actually mean

◆ Start()

void kafka::ConsumerScope::Start ( Callback callback)

Subscribes for configured topics and starts the consumer polling process.

Note
If callback throws an exception, entire message batch (also with successfully processed messages) come again, until callback succeedes

◆ Stop()

void kafka::ConsumerScope::Stop ( )
noexcept

Revokes all topic partition consumer was subscribed on. Also closes the consumer, leaving the consumer balanced group.

Called in the destructor of ConsumerScope automatically.

Can be called in the beginning of your destructor if some other actions in that destructor prevent the callback from functioning correctly.

After Stop call, subscribed topics parititions are destributed between other consumers with the same group_id.

Warning
Blocks until all kafka::Message destroyed.

The documentation for this class was generated from the following file: