userver: userver/kafka/consumer_scope.hpp Source File
Loading...
Searching...
No Matches
consumer_scope.hpp
Go to the documentation of this file.
1#pragma once
2
3/// @file userver/kafka/consumer_scope.hpp
4/// @brief @copybrief kafka::ConsumerScope
5
6#include <functional>
7
8#include <userver/kafka/message.hpp>
9#include <userver/kafka/offset_range.hpp>
10#include <userver/kafka/rebalance_types.hpp>
11#include <userver/utils/zstring_view.hpp>
12
13USERVER_NAMESPACE_BEGIN
14
15namespace kafka {
16
17namespace impl {
18
19class Consumer;
20
21} // namespace impl
22
23/// @ingroup userver_clients
24///
25/// @brief RAII class that used as interface for Apache Kafka Consumer interaction
26/// and proper lifetime management.
27///
28/// Its main purpose is to stop the message polling in user
29/// component (that holds ConsumerScope) destructor, because ConsumerScope::Callback
30/// often captures `this` pointer on user component.
31///
32/// Common usage:
33///
34/// @snippet samples/kafka_service/src/consumer_handler.cpp Kafka service sample - consumer usage
35///
36/// ## Important implementation details
37///
38/// ConsumerScope holds reference to kafka::impl::Consumer that actually
39/// represents the Apache Kafka Balanced Consumer.
40///
41/// It exposes the API for asynchronous message batches processing that is
42/// polled from the subscribed topics partitions.
43///
44/// Consumer periodically polls the message batches from the
45/// subscribed topics partitions and invokes processing callback on each batch.
46///
47/// Also, Consumer maintains per topic statistics including the broker
48/// connection errors.
49///
50/// @note Each ConsumerScope instance is not thread-safe. To speed up the topic
51/// messages processing, create more consumers with the same `group_id`.
52///
53/// @see https://docs.confluent.io/platform/current/clients/consumer.html for
54/// basic consumer concepts
55/// @see
56/// https://docs.confluent.io/platform/current/clients/librdkafka/html/md_INTRODUCTION.html#autotoc_md62
57/// for understanding of balanced consumer groups
58///
59/// @warning ConsumerScope::Start and ConsumerScope::Stop maybe called multiple
60/// times, but only in "start-stop" order and **NOT** concurrently.
61///
62/// @note Must be placed as one of the last fields in the consumer component.
63/// Make sure to add a comment before the field:
64///
65/// @code
66/// // Subscription must be the last field! Add new fields above this comment.
67/// @endcode
68class ConsumerScope final {
69public:
70 /// @brief Callback that is invoked on each polled message batch.
71 /// @warning If callback throws, it called over and over again with the batch
72 /// with the same messages, until successful invocation.
73 /// Though, user should consider idempotent message processing mechanism.
74 using Callback = std::function<void(MessageBatchView)>;
75
76 /// @brief Stops the consumer (if not yet stopped).
78
79 ConsumerScope(ConsumerScope&&) noexcept = delete;
80 ConsumerScope& operator=(ConsumerScope&&) noexcept = delete;
81
82 /// @brief Topics list consumer configured to subscribe.
83 const std::vector<std::string>& GetTopics() const;
84
85 /// @brief Subscribes for configured topics and starts the consumer polling
86 /// process.
87 /// @note If `callback` throws an exception, entire message batch (also
88 /// with successfully processed messages) come again, until callback succeeds
89 /// @warning Each callback duration must not exceed the
90 /// `max_callback_duration` time. Otherwise, consumer may stop consuming the
91 /// message for unpredictable amount of time.
92 void Start(Callback callback);
93
94 /// @brief Revokes all topic partition consumer was subscribed on. Also closes
95 /// the consumer, leaving the consumer balanced group.
96 ///
97 /// Called in the destructor of ConsumerScope automatically.
98 ///
99 /// Can be called in the beginning of your destructor if some other
100 /// actions in that destructor prevent the callback from functioning
101 /// correctly.
102 ///
103 /// After ConsumerScope::Stop call, subscribed topics partitions are
104 /// distributed between other consumers with the same `group_id`.
105 ///
106 /// @warning Blocks until all kafka::Message destroyed (e.g. consumer cannot
107 /// be stopped until user-callback is executing).
108 void Stop() noexcept;
109
110 /// @brief Schedules the current assignment offsets commitment task.
111 /// Intended to be called after each message batch processing cycle (but not
112 /// necessarily).
113 ///
114 /// @warning Commit does not ensure that messages do not come again --
115 /// they do not come again also without the commit within the same process.
116 /// Commit, indeed, restricts other consumers in consumers group from reading
117 /// messages already processed (committed) by the current consumer if current
118 /// has stopped and leaved the group
120
121 /// @brief Retrieves the lowest and highest offsets for the specified topic and partition.
122 /// @throws OffsetRangeException if offsets could not be retrieved
123 /// @throws OffsetRangeTimeoutException if `timeout` is set and is reached
124 /// @warning This is a blocking call
125 /// @param topic The name of the topic.
126 /// @param partition The partition number of the topic.
127 /// @param timeout The optional timeout for the operation.
128 /// @returns Lowest and highest offsets for the given topic and partition.
129 /// @see OffsetRange for more explanation
130 OffsetRange GetOffsetRange(
131 utils::zstring_view topic,
132 std::uint32_t partition,
133 std::optional<std::chrono::milliseconds> timeout = std::nullopt
134 ) const;
135
136 /// @brief Retrieves the partition IDs for the specified topic.
137 /// @throws GetMetadataException if failed to fetch metadata
138 /// @throws TopicNotFoundException if topic not found
139 /// @throws OffsetRangeTimeoutException if `timeout` is set and is reached
140 /// @warning This is a blocking call
141 /// @param topic The name of the topic.
142 /// @param timeout The optional timeout for the operation.
143 /// @returns A vector of partition IDs for the given topic.
144 std::vector<std::uint32_t> GetPartitionIds(
145 utils::zstring_view topic,
146 std::optional<std::chrono::milliseconds> timeout = std::nullopt
147 ) const;
148
149 /// @brief Sets the rebalance callback for a consumer.
150 /// @warning The rebalance callback must be set before calling ConsumerScope::Start or after calling
151 /// ConsumerScope::Stop. The callback must not throw exceptions; any thrown exceptions will be caught and logged
152 /// by the consumer implementation.
153 /// @note The callback is invoked after the assign or revoke event has been successfully processed.
154 void SetRebalanceCallback(ConsumerRebalanceCallback rebalance_callback);
155
156 /// @brief Resets the rebalance callback for a consumer.
157 /// @warning The rebalance callback must be set before calling ConsumerScope::Start or after calling
158 /// ConsumerScope::Stop. The callback must not throw exceptions; any thrown exceptions will be caught and logged
159 /// by the consumer implementation.
160 /// @note The callback is invoked after the assign or revoke event has been successfully processed.
162
163 /// @brief Seeks the specified topic partition to the given \b offset.
164 /// @throws TimeoutException if the operation times out.
165 /// @throws SeekException if an error occurs during the seek operation.
166 /// @throws SeekInvalidArgumentException if invalid \b timeout or \b offset is passed.
167 /// @warning This is a blocking call and should only be invoked after ConsumerScope::Start call and before
168 /// ConsumerScope::Stop call. It works only when the consumer has assigned partitions; otherwise, it throws
169 /// SeekException.
170 /// @warning Currently, it is required to call this from within the ConsumerRebalanceCallback.
171 /// @ref ConsumerScope::SetRebalanceCallback
172 /// @param topic The name of the topic.
173 /// @param partition_id The partition ID of the given topic.
174 /// @param offset The offset to seek to, must be <= std::int64_t::max() or SeekInvalidArgumentException occurs.
175 /// @param timeout The timeout duration for the operation, must be > 0 or SeekInvalidArgumentException occurs.
176 void Seek(
177 utils::zstring_view topic,
178 std::uint32_t partition_id,
179 std::uint64_t offset,
180 std::chrono::milliseconds timeout
181 ) const;
182
183 /// @brief Seeks the specified topic partition to the beginning.
184 /// @throws SeekException if an error occurs during the seek operation.
185 /// @throws SeekInvalidArgumentException if invalid \b timeout is passed.
186 /// @warning This is a blocking call and should only be invoked after ConsumerScope::Start call and before
187 /// ConsumerScope::Stop call. It works only when the consumer has assigned partitions; otherwise, it throws
188 /// SeekException.
189 /// @warning Currently, it is required to call this from within the ConsumerRebalanceCallback.
190 /// @ref ConsumerScope::SetRebalanceCallback
191 /// @param topic The name of the topic.
192 /// @param partition_id The partition ID of the given topic.
193 /// @param timeout The timeout duration for the operation, must be > 0 or SeekInvalidArgumentException occurs.
194 void SeekToBeginning(utils::zstring_view topic, std::uint32_t partition_id, std::chrono::milliseconds timeout)
195 const;
196
197 /// @brief Seeks the specified topic partition to the end.
198 /// @throws SeekException if an error occurs during the seek operation.
199 /// @throws SeekInvalidArgumentException if invalid \b timeout is passed.
200 /// @warning This is a blocking call and should only be invoked after ConsumerScope::Start call and before
201 /// ConsumerScope::Stop call. It works only when the consumer has assigned partitions; otherwise, it throws
202 /// SeekException.
203 /// @warning Currently, it is required to call this from within the ConsumerRebalanceCallback.
204 /// @ref ConsumerScope::SetRebalanceCallback
205 /// @param topic The name of the topic.
206 /// @param partition_id The partition ID of the given topic.
207 /// @param timeout The timeout duration for the operation, must be > 0 or SeekInvalidArgumentException occurs.
208 void SeekToEnd(utils::zstring_view topic, std::uint32_t partition_id, std::chrono::milliseconds timeout) const;
209
210private:
211 friend class impl::Consumer;
212
213 explicit ConsumerScope(impl::Consumer& consumer) noexcept;
214
215 impl::Consumer& consumer_;
216};
217
218} // namespace kafka
219
220USERVER_NAMESPACE_END