userver: userver/kafka/consumer_scope.hpp Source File
⚠️ This is the documentation for an old userver version. Click here to switch to the latest version.
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages Concepts
consumer_scope.hpp
1#pragma once
2
3#include <functional>
4
5#include <userver/kafka/message.hpp>
6
7USERVER_NAMESPACE_BEGIN
8
9namespace kafka {
10
11namespace impl {
12
13class Consumer;
14
15} // namespace impl
16
17// clang-format off
18
19/// @ingroup userver_clients
20///
21/// @brief RAII class that used as interface for Apache Kafka Consumer interaction
22/// and proper lifetime management.
23///
24/// Its main purpose is to stop the message polling in user
25/// component (that holds `ConsumerScope`) destructor, because `ConsumerScope::Callback`
26/// often captures `this` pointer on user component.
27///
28/// Common usage:
29///
30/// @snippet kafka/functional_tests/integrational_tests/kafka_service.cpp Kafka service sample - consumer usage
31///
32/// ## Important implementation details
33///
34/// `ConsumerScope` holds reference to `impl::Consumer` that actually
35/// represents the Apache Kafka Balanced Consumer.
36///
37/// It exposes the API for asynchronous message batches processing that is
38/// polled from the subscribed topics partitions.
39///
40/// Consumer periodically polls the message batches from the
41/// subscribed topics partitions and invokes processing callback on each batch.
42///
43/// Also, Consumer maintains the per topic statistics including the broker
44/// connection errors.
45///
46/// @note Each `ConsumerScope` instance is not thread-safe. To speed up the topic
47/// messages processing, create more consumers with the same `group_id`.
48///
49/// @see https://docs.confluent.io/platform/current/clients/consumer.html for
50/// basic consumer concepts
51/// @see
52/// https://docs.confluent.io/platform/current/clients/librdkafka/html/md_INTRODUCTION.html#autotoc_md62
53/// for understanding of balanced consumer groups
54/// @warning `ConsumerScope::Start` and `ConsumerScope::Stop` maybe called multiple
55/// times, but only in "start-stop" order and **NOT** concurrently
56///
57/// @note Must be placed as one of the last fields in the consumer component.
58/// Make sure to add a comment before the field:
59///
60/// @code
61/// // Subscription must be the last field! Add new fields above this comment.
62/// @endcode
63
64// clang-format on
65
66class ConsumerScope final {
67 public:
68 /// @brief Callback that invokes on each polled message batch.
69 /// @warning If callback throws, it called over and over again with the batch
70 /// with the same messages, until successfull invokation.
71 /// Though, user should consider idempotent message processing mechanism
72 using Callback = std::function<void(MessageBatchView)>;
73
74 /// @brief Stops the consumer (if not yet stopped).
76
77 ConsumerScope(ConsumerScope&&) noexcept = delete;
78 ConsumerScope& operator=(ConsumerScope&&) noexcept = delete;
79
80 /// @brief Subscribes for configured topics and starts the consumer polling
81 /// process.
82 /// @note If `callback` throws an exception, entire message batch (also
83 /// with successfully processed messages) come again, until callback succeedes
84 void Start(Callback callback);
85
86 /// @brief Revokes all topic partition consumer was subscribed on. Also closes
87 /// the consumer, leaving the consumer balanced group.
88 ///
89 /// Called in the destructor of ConsumerScope automatically.
90 ///
91 /// Can be called in the beginning of your destructor if some other
92 /// actions in that destructor prevent the callback from functioning
93 /// correctly.
94 ///
95 /// After `Stop` call, subscribed topics parititions are destributed
96 /// between other consumers with the same `group_id`.
97 ///
98 /// @warning Blocks until all `kafka::Message` destroyed.
99 void Stop() noexcept;
100
101 /// @brief Schedules the current assignment offsets committment task.
102 /// Intended to be called after each message batch processing cycle.
103 ///
104 /// @warning Commit does not ensure that messages do not come again --
105 /// they do not come again also without the commit within the same process.
106 /// Commit, indeed, restricts other consumers in consumers group from reading
107 /// messages already processed (committed) by the current consumer if current
108 /// has stopped and leaved the group
109 ///
110 /// @note Instead of calling `AsyncCommit` manually, consider setting
111 /// `enable_auto_commit: true` in the static config. But read Kafka
112 /// documentation carefully before to understand what auto committment
113 /// mechanism actually mean
114 void AsyncCommit();
115
116 private:
117 friend class impl::Consumer;
118
119 explicit ConsumerScope(impl::Consumer& consumer) noexcept;
120
121 impl::Consumer& consumer_;
122};
123
124} // namespace kafka
125
126USERVER_NAMESPACE_END