userver: userver/kafka/consumer_scope.hpp Source File
Loading...
Searching...
No Matches
consumer_scope.hpp
1#pragma once
2
3#include <functional>
4
5#include <userver/kafka/message.hpp>
6
7USERVER_NAMESPACE_BEGIN
8
9namespace kafka {
10
11namespace impl {
12
13class Consumer;
14
15} // namespace impl
16
17// clang-format off
18
19/// @ingroup userver_clients
20///
21/// @brief RAII class that used as interface for Apache Kafka Consumer interaction
22/// and proper lifetime management.
23///
24/// Its main purpose is to stop the message polling in user
25/// component (that holds ConsumerScope) destructor, because ConsumerScope::Callback
26/// often captures `this` pointer on user component.
27///
28/// Common usage:
29///
30/// @snippet samples/kafka_service/src/consumer_handler.cpp Kafka service sample - consumer usage
31///
32/// ## Important implementation details
33///
34/// ConsumerScope holds reference to kafka::impl::Consumer that actually
35/// represents the Apache Kafka Balanced Consumer.
36///
37/// It exposes the API for asynchronous message batches processing that is
38/// polled from the subscribed topics partitions.
39///
40/// Consumer periodically polls the message batches from the
41/// subscribed topics partitions and invokes processing callback on each batch.
42///
43/// Also, Consumer maintains per topic statistics including the broker
44/// connection errors.
45///
46/// @note Each ConsumerScope instance is not thread-safe. To speed up the topic
47/// messages processing, create more consumers with the same `group_id`.
48///
49/// @see https://docs.confluent.io/platform/current/clients/consumer.html for
50/// basic consumer concepts
51/// @see
52/// https://docs.confluent.io/platform/current/clients/librdkafka/html/md_INTRODUCTION.html#autotoc_md62
53/// for understanding of balanced consumer groups
54///
55/// @warning ConsumerScope::Start and ConsumerScope::Stop maybe called multiple
56/// times, but only in "start-stop" order and **NOT** concurrently.
57///
58/// @note Must be placed as one of the last fields in the consumer component.
59/// Make sure to add a comment before the field:
60///
61/// @code
62/// // Subscription must be the last field! Add new fields above this comment.
63/// @endcode
64
65// clang-format on
66
67class ConsumerScope final {
68public:
69 /// @brief Callback that is invoked on each polled message batch.
70 /// @warning If callback throws, it called over and over again with the batch
71 /// with the same messages, until successful invocation.
72 /// Though, user should consider idempotent message processing mechanism.
73 using Callback = std::function<void(MessageBatchView)>;
74
75 /// @brief Stops the consumer (if not yet stopped).
77
78 ConsumerScope(ConsumerScope&&) noexcept = delete;
79 ConsumerScope& operator=(ConsumerScope&&) noexcept = delete;
80
81 /// @brief Subscribes for configured topics and starts the consumer polling
82 /// process.
83 /// @note If `callback` throws an exception, entire message batch (also
84 /// with successfully processed messages) come again, until callback succeeds
85 /// @warning Each callback duration must not exceed the
86 /// `max_callback_duration` time. Otherwise, consumer may stop consuming the
87 /// message for unpredictable amount of time.
88 void Start(Callback callback);
89
90 /// @brief Revokes all topic partition consumer was subscribed on. Also closes
91 /// the consumer, leaving the consumer balanced group.
92 ///
93 /// Called in the destructor of ConsumerScope automatically.
94 ///
95 /// Can be called in the beginning of your destructor if some other
96 /// actions in that destructor prevent the callback from functioning
97 /// correctly.
98 ///
99 /// After ConsumerScope::Stop call, subscribed topics partitions are
100 /// distributed between other consumers with the same `group_id`.
101 ///
102 /// @warning Blocks until all kafka::Message destroyed (e.g. consumer cannot
103 /// be stopped until user-callback is executing).
104 void Stop() noexcept;
105
106 /// @brief Schedules the current assignment offsets commitment task.
107 /// Intended to be called after each message batch processing cycle (but not
108 /// necessarily).
109 ///
110 /// @warning Commit does not ensure that messages do not come again --
111 /// they do not come again also without the commit within the same process.
112 /// Commit, indeed, restricts other consumers in consumers group from reading
113 /// messages already processed (committed) by the current consumer if current
114 /// has stopped and leaved the group
115 void AsyncCommit();
116
117private:
118 friend class impl::Consumer;
119
120 explicit ConsumerScope(impl::Consumer& consumer) noexcept;
121
122 impl::Consumer& consumer_;
123};
124
125} // namespace kafka
126
127USERVER_NAMESPACE_END