userver: userver/kafka/consumer_scope.hpp Source File
Loading...
Searching...
No Matches
consumer_scope.hpp
1#pragma once
2
3#include <functional>
4
5#include <userver/kafka/message.hpp>
6#include <userver/kafka/offset_range.hpp>
7#include <userver/kafka/rebalance_types.hpp>
8#include <userver/utils/zstring_view.hpp>
9
10USERVER_NAMESPACE_BEGIN
11
12namespace kafka {
13
14namespace impl {
15
16class Consumer;
17
18} // namespace impl
19
20// clang-format off
21
22/// @ingroup userver_clients
23///
24/// @brief RAII class that used as interface for Apache Kafka Consumer interaction
25/// and proper lifetime management.
26///
27/// Its main purpose is to stop the message polling in user
28/// component (that holds ConsumerScope) destructor, because ConsumerScope::Callback
29/// often captures `this` pointer on user component.
30///
31/// Common usage:
32///
33/// @snippet samples/kafka_service/src/consumer_handler.cpp Kafka service sample - consumer usage
34///
35/// ## Important implementation details
36///
37/// ConsumerScope holds reference to kafka::impl::Consumer that actually
38/// represents the Apache Kafka Balanced Consumer.
39///
40/// It exposes the API for asynchronous message batches processing that is
41/// polled from the subscribed topics partitions.
42///
43/// Consumer periodically polls the message batches from the
44/// subscribed topics partitions and invokes processing callback on each batch.
45///
46/// Also, Consumer maintains per topic statistics including the broker
47/// connection errors.
48///
49/// @note Each ConsumerScope instance is not thread-safe. To speed up the topic
50/// messages processing, create more consumers with the same `group_id`.
51///
52/// @see https://docs.confluent.io/platform/current/clients/consumer.html for
53/// basic consumer concepts
54/// @see
55/// https://docs.confluent.io/platform/current/clients/librdkafka/html/md_INTRODUCTION.html#autotoc_md62
56/// for understanding of balanced consumer groups
57///
58/// @warning ConsumerScope::Start and ConsumerScope::Stop maybe called multiple
59/// times, but only in "start-stop" order and **NOT** concurrently.
60///
61/// @note Must be placed as one of the last fields in the consumer component.
62/// Make sure to add a comment before the field:
63///
64/// @code
65/// // Subscription must be the last field! Add new fields above this comment.
66/// @endcode
67
68// clang-format on
69
70class ConsumerScope final {
71public:
72 /// @brief Callback that is invoked on each polled message batch.
73 /// @warning If callback throws, it called over and over again with the batch
74 /// with the same messages, until successful invocation.
75 /// Though, user should consider idempotent message processing mechanism.
76 using Callback = std::function<void(MessageBatchView)>;
77
78 /// @brief Stops the consumer (if not yet stopped).
80
81 ConsumerScope(ConsumerScope&&) noexcept = delete;
82 ConsumerScope& operator=(ConsumerScope&&) noexcept = delete;
83
84 /// @brief Topics list consumer configured to subscribe.
85 const std::vector<std::string>& GetTopics() const;
86
87 /// @brief Subscribes for configured topics and starts the consumer polling
88 /// process.
89 /// @note If `callback` throws an exception, entire message batch (also
90 /// with successfully processed messages) come again, until callback succeeds
91 /// @warning Each callback duration must not exceed the
92 /// `max_callback_duration` time. Otherwise, consumer may stop consuming the
93 /// message for unpredictable amount of time.
94 void Start(Callback callback);
95
96 /// @brief Revokes all topic partition consumer was subscribed on. Also closes
97 /// the consumer, leaving the consumer balanced group.
98 ///
99 /// Called in the destructor of ConsumerScope automatically.
100 ///
101 /// Can be called in the beginning of your destructor if some other
102 /// actions in that destructor prevent the callback from functioning
103 /// correctly.
104 ///
105 /// After ConsumerScope::Stop call, subscribed topics partitions are
106 /// distributed between other consumers with the same `group_id`.
107 ///
108 /// @warning Blocks until all kafka::Message destroyed (e.g. consumer cannot
109 /// be stopped until user-callback is executing).
110 void Stop() noexcept;
111
112 /// @brief Schedules the current assignment offsets commitment task.
113 /// Intended to be called after each message batch processing cycle (but not
114 /// necessarily).
115 ///
116 /// @warning Commit does not ensure that messages do not come again --
117 /// they do not come again also without the commit within the same process.
118 /// Commit, indeed, restricts other consumers in consumers group from reading
119 /// messages already processed (committed) by the current consumer if current
120 /// has stopped and leaved the group
122
123 /// @brief Retrieves the lowest and highest offsets for the specified topic and partition.
124 /// @throws OffsetRangeException if offsets could not be retrieved
125 /// @throws OffsetRangeTimeoutException if `timeout` is set and is reached
126 /// @warning This is a blocking call
127 /// @param topic The name of the topic.
128 /// @param partition The partition number of the topic.
129 /// @param timeout The optional timeout for the operation.
130 /// @returns Lowest and highest offsets for the given topic and partition.
131 /// @see OffsetRange for more explanation
132 OffsetRange GetOffsetRange(
133 utils::zstring_view topic,
134 std::uint32_t partition,
135 std::optional<std::chrono::milliseconds> timeout = std::nullopt
136 ) const;
137
138 /// @brief Retrieves the partition IDs for the specified topic.
139 /// @throws GetMetadataException if failed to fetch metadata
140 /// @throws TopicNotFoundException if topic not found
141 /// @throws OffsetRangeTimeoutException if `timeout` is set and is reached
142 /// @warning This is a blocking call
143 /// @param topic The name of the topic.
144 /// @param timeout The optional timeout for the operation.
145 /// @returns A vector of partition IDs for the given topic.
146 std::vector<std::uint32_t>
147 GetPartitionIds(utils::zstring_view topic, std::optional<std::chrono::milliseconds> timeout = std::nullopt) const;
148
149 /// @brief Sets the rebalance callback for a consumer.
150 /// @warning The rebalance callback must be set before calling ConsumerScope::Start or after calling
151 /// ConsumerScope::Stop. The callback must not throw exceptions; any thrown exceptions will be caught and logged
152 /// by the consumer implementation.
153 /// @note The callback is invoked after the assign or revoke event has been successfully processed.
154 void SetRebalanceCallback(ConsumerRebalanceCallback rebalance_callback);
155
156 /// @brief Resets the rebalance callback for a consumer.
157 /// @warning The rebalance callback must be set before calling ConsumerScope::Start or after calling
158 /// ConsumerScope::Stop. The callback must not throw exceptions; any thrown exceptions will be caught and logged
159 /// by the consumer implementation.
160 /// @note The callback is invoked after the assign or revoke event has been successfully processed.
162
163 /// @brief Seeks the specified topic partition to the given \b offset.
164 /// @throws TimeoutException if the operation times out.
165 /// @throws SeekException if an error occurs during the seek operation.
166 /// @throws SeekInvalidArgumentException if invalid \b timeout or \b offset is passed.
167 /// @warning This is a blocking call and should only be invoked after ConsumerScope::Start call and before
168 /// ConsumerScope::Stop call. It works only when the consumer has assigned partitions; otherwise, it throws
169 /// SeekException.
170 /// @warning Currently, it is required to call this from within the ConsumerRebalanceCallback.
171 /// @ref ConsumerScope::SetRebalanceCallback
172 /// @param topic The name of the topic.
173 /// @param partition_id The partition ID of the given topic.
174 /// @param offset The offset to seek to, must be <= std::int64_t::max() or SeekInvalidArgumentException occurs.
175 /// @param timeout The timeout duration for the operation, must be > 0 or SeekInvalidArgumentException occurs.
176 void Seek(
177 utils::zstring_view topic,
178 std::uint32_t partition_id,
179 std::uint64_t offset,
180 std::chrono::milliseconds timeout
181 ) const;
182
183 /// @brief Seeks the specified topic partition to the beginning.
184 /// @throws SeekException if an error occurs during the seek operation.
185 /// @throws SeekInvalidArgumentException if invalid \b timeout is passed.
186 /// @warning This is a blocking call and should only be invoked after ConsumerScope::Start call and before
187 /// ConsumerScope::Stop call. It works only when the consumer has assigned partitions; otherwise, it throws
188 /// SeekException.
189 /// @warning Currently, it is required to call this from within the ConsumerRebalanceCallback.
190 /// @ref ConsumerScope::SetRebalanceCallback
191 /// @param topic The name of the topic.
192 /// @param partition_id The partition ID of the given topic.
193 /// @param timeout The timeout duration for the operation, must be > 0 or SeekInvalidArgumentException occurs.
194 void SeekToBeginning(utils::zstring_view topic, std::uint32_t partition_id, std::chrono::milliseconds timeout)
195 const;
196
197 /// @brief Seeks the specified topic partition to the end.
198 /// @throws SeekException if an error occurs during the seek operation.
199 /// @throws SeekInvalidArgumentException if invalid \b timeout is passed.
200 /// @warning This is a blocking call and should only be invoked after ConsumerScope::Start call and before
201 /// ConsumerScope::Stop call. It works only when the consumer has assigned partitions; otherwise, it throws
202 /// SeekException.
203 /// @warning Currently, it is required to call this from within the ConsumerRebalanceCallback.
204 /// @ref ConsumerScope::SetRebalanceCallback
205 /// @param topic The name of the topic.
206 /// @param partition_id The partition ID of the given topic.
207 /// @param timeout The timeout duration for the operation, must be > 0 or SeekInvalidArgumentException occurs.
208 void SeekToEnd(utils::zstring_view topic, std::uint32_t partition_id, std::chrono::milliseconds timeout) const;
209
210private:
211 friend class impl::Consumer;
212
213 explicit ConsumerScope(impl::Consumer& consumer) noexcept;
214
215 impl::Consumer& consumer_;
216};
217
218} // namespace kafka
219
220USERVER_NAMESPACE_END