userver: userver/kafka/consumer_scope.hpp Source File
Loading...
Searching...
No Matches
consumer_scope.hpp
1#pragma once
2
3#include <functional>
4
5#include <userver/kafka/message.hpp>
6
7USERVER_NAMESPACE_BEGIN
8
9namespace kafka {
10
11namespace impl {
12
13class Consumer;
14
15} // namespace impl
16
17// clang-format off
18
19/// @ingroup userver_clients
20///
21/// @brief RAII class that used as interface for Apache Kafka Consumer interaction
22/// and proper lifetime management.
23///
24/// Its main purpose is to stop the message polling in user
25/// component (that holds `ConsumerScope`) destructor, because `ConsumerScope::Callback`
26/// often captures `this` pointer on user component.
27///
28/// Common usage:
29///
30/// @snippet kafka/functional_tests/integrational_tests/kafka_service.cpp Kafka service sample - consumer usage
31///
32/// ## Important implementation details
33///
34/// `ConsumerScope` holds reference to `impl::Consumer` that actually
35/// represents the Apache Kafka Balanced Consumer.
36///
37/// It exposes the API for asynchronous message batches processing that is
38/// polled from the subscribed topics partitions.
39///
40/// Consumer periodically polls the message batches from the
41/// subscribed topics partitions and invokes processing callback on each batch.
42///
43/// Also, Consumer maintains the per topic statistics including the broker
44/// connection errors.
45///
46/// @note Each `ConsumerScope` instance is not thread-safe. To speed up the topic
47/// messages processing, create more consumers with the same `group_id`.
48///
49/// @see https://docs.confluent.io/platform/current/clients/consumer.html for
50/// basic consumer concepts
51/// @see
52/// https://docs.confluent.io/platform/current/clients/librdkafka/html/md_INTRODUCTION.html#autotoc_md62
53/// for understanding of balanced consumer groups
54/// @warning `ConsumerScope::Start` and `ConsumerScope::Stop` maybe called multiple
55/// times, but only in "start-stop" order and **NOT** concurrently
56///
57/// @note Must be placed as one of the last fields in the consumer component.
58/// Make sure to add a comment before the field:
59///
60/// @code
61/// // Subscription must be the last field! Add new fields above this comment.
62/// @endcode
63
64// clang-format on
65
66class ConsumerScope final {
67 public:
68 /// @brief Callback that invokes on each polled message batch.
69 /// @warning If callback throws, it called over and over again with the batch
70 /// with the same messages, until successfull invokation.
71 /// Though, user should consider idempotent message processing mechanism
72 using Callback = std::function<void(MessageBatchView)>;
73
74 /// @brief Stops the consumer (if not yet stopped).
76
77 ConsumerScope(ConsumerScope&&) noexcept = delete;
78 ConsumerScope& operator=(ConsumerScope&&) noexcept = delete;
79
80 /// @brief Subscribes for configured topics and starts the consumer polling
81 /// process.
82 /// @note If `callback` throws an exception, entire message batch (also
83 /// with successfully processed messages) come again, until callback succeedes
84 void Start(Callback callback);
85
86 /// @brief Revokes all topic partition consumer was subscribed on. Also closes
87 /// the consumer, leaving the consumer balanced group.
88 ///
89 /// Called in the destructor of ConsumerScope automatically.
90 ///
91 /// Can be called in the beginning of your destructor if some other
92 /// actions in that destructor prevent the callback from functioning
93 /// correctly.
94 ///
95 /// After `Stop` call, subscribed topics parititions are destributed
96 /// between other consumers with the same `group_id`.
97 ///
98 /// @warning Blocks until all `kafka::Message` destroyed.
99 void Stop() noexcept;
100
101 /// @brief Schedules the current assignment offsets committment task.
102 /// Intended to be called after each message batch processing cycle.
103 ///
104 /// @warning Commit does not ensure that messages do not come again --
105 /// they do not come again also without the commit within the same process.
106 /// Commit, indeed, restricts other consumers in consumers group from reading
107 /// messages already processed (committed) by the current consumer if current
108 /// has stopped and leaved the group
109 ///
110 /// @note Instead of calling `AsyncCommit` manually, consider setting
111 /// `enable_auto_commit: true` in the static config. But read Kafka
112 /// documentation carefully before to understand what auto committment
113 /// mechanism actually mean
114 void AsyncCommit();
115
116 private:
117 friend class impl::Consumer;
118
119 explicit ConsumerScope(impl::Consumer& consumer) noexcept;
120
121 impl::Consumer& consumer_;
122};
123
124} // namespace kafka
125
126USERVER_NAMESPACE_END