userver: userver/kafka/consumer_scope.hpp Source File
Loading...
Searching...
No Matches
consumer_scope.hpp
1#pragma once
2
3#include <functional>
4
5#include <userver/kafka/message.hpp>
6#include <userver/kafka/offset_range.hpp>
7
8USERVER_NAMESPACE_BEGIN
9
10namespace kafka {
11
12namespace impl {
13
14class Consumer;
15
16} // namespace impl
17
18// clang-format off
19
20/// @ingroup userver_clients
21///
22/// @brief RAII class that used as interface for Apache Kafka Consumer interaction
23/// and proper lifetime management.
24///
25/// Its main purpose is to stop the message polling in user
26/// component (that holds ConsumerScope) destructor, because ConsumerScope::Callback
27/// often captures `this` pointer on user component.
28///
29/// Common usage:
30///
31/// @snippet samples/kafka_service/src/consumer_handler.cpp Kafka service sample - consumer usage
32///
33/// ## Important implementation details
34///
35/// ConsumerScope holds reference to kafka::impl::Consumer that actually
36/// represents the Apache Kafka Balanced Consumer.
37///
38/// It exposes the API for asynchronous message batches processing that is
39/// polled from the subscribed topics partitions.
40///
41/// Consumer periodically polls the message batches from the
42/// subscribed topics partitions and invokes processing callback on each batch.
43///
44/// Also, Consumer maintains per topic statistics including the broker
45/// connection errors.
46///
47/// @note Each ConsumerScope instance is not thread-safe. To speed up the topic
48/// messages processing, create more consumers with the same `group_id`.
49///
50/// @see https://docs.confluent.io/platform/current/clients/consumer.html for
51/// basic consumer concepts
52/// @see
53/// https://docs.confluent.io/platform/current/clients/librdkafka/html/md_INTRODUCTION.html#autotoc_md62
54/// for understanding of balanced consumer groups
55///
56/// @warning ConsumerScope::Start and ConsumerScope::Stop maybe called multiple
57/// times, but only in "start-stop" order and **NOT** concurrently.
58///
59/// @note Must be placed as one of the last fields in the consumer component.
60/// Make sure to add a comment before the field:
61///
62/// @code
63/// // Subscription must be the last field! Add new fields above this comment.
64/// @endcode
65
66// clang-format on
67
68class ConsumerScope final {
69public:
70 /// @brief Callback that is invoked on each polled message batch.
71 /// @warning If callback throws, it called over and over again with the batch
72 /// with the same messages, until successful invocation.
73 /// Though, user should consider idempotent message processing mechanism.
74 using Callback = std::function<void(MessageBatchView)>;
75
76 /// @brief Stops the consumer (if not yet stopped).
78
79 ConsumerScope(ConsumerScope&&) noexcept = delete;
80 ConsumerScope& operator=(ConsumerScope&&) noexcept = delete;
81
82 /// @brief Subscribes for configured topics and starts the consumer polling
83 /// process.
84 /// @note If `callback` throws an exception, entire message batch (also
85 /// with successfully processed messages) come again, until callback succeeds
86 /// @warning Each callback duration must not exceed the
87 /// `max_callback_duration` time. Otherwise, consumer may stop consuming the
88 /// message for unpredictable amount of time.
89 void Start(Callback callback);
90
91 /// @brief Revokes all topic partition consumer was subscribed on. Also closes
92 /// the consumer, leaving the consumer balanced group.
93 ///
94 /// Called in the destructor of ConsumerScope automatically.
95 ///
96 /// Can be called in the beginning of your destructor if some other
97 /// actions in that destructor prevent the callback from functioning
98 /// correctly.
99 ///
100 /// After ConsumerScope::Stop call, subscribed topics partitions are
101 /// distributed between other consumers with the same `group_id`.
102 ///
103 /// @warning Blocks until all kafka::Message destroyed (e.g. consumer cannot
104 /// be stopped until user-callback is executing).
105 void Stop() noexcept;
106
107 /// @brief Schedules the current assignment offsets commitment task.
108 /// Intended to be called after each message batch processing cycle (but not
109 /// necessarily).
110 ///
111 /// @warning Commit does not ensure that messages do not come again --
112 /// they do not come again also without the commit within the same process.
113 /// Commit, indeed, restricts other consumers in consumers group from reading
114 /// messages already processed (committed) by the current consumer if current
115 /// has stopped and leaved the group
116 void AsyncCommit();
117
118 /// @brief Retrieves the lowest and highest offsets for the specified topic and partition.
119 /// @throws OffsetRangeException if offsets could not be retrieved
120 /// @warning This is a blocking call
121 /// @param topic The name of the topic.
122 /// @param partition The partition number of the topic.
123 /// @returns Lowest and highest offsets for the given topic and partition.
124 /// @see OffsetRange for more explanation
125 OffsetRange GetOffsetRange(
126 const std::string& topic,
127 std::uint32_t partition,
128 std::optional<std::chrono::milliseconds> timeout = std::nullopt
129 ) const;
130
131 /// @brief Retrieves the partition IDs for the specified topic.
132 /// @throws GetMetadataException if failed to fetch metadata
133 /// @throws TopicNotFoundException if topic not found
134 /// @warning This is a blocking call
135 /// @param topic The name of the topic.
136 /// @returns A vector of partition IDs for the given topic.
139
140private:
141 friend class impl::Consumer;
142
143 explicit ConsumerScope(impl::Consumer& consumer) noexcept;
144
145 impl::Consumer& consumer_;
146};
147
148} // namespace kafka
149
150USERVER_NAMESPACE_END