userver: userver/kafka/producer.hpp Source File
Loading...
Searching...
No Matches
producer.hpp
Go to the documentation of this file.
1#pragma once
2
3/// @file userver/kafka/producer.hpp
4/// @brief @copybrief kafka::Producer
5
6#include <cstdint>
7#include <type_traits>
8#include <utility>
9
10#include <userver/engine/task/task_processor_fwd.hpp>
11#include <userver/engine/task/task_with_result.hpp>
12#include <userver/kafka/exceptions.hpp>
13#include <userver/kafka/headers.hpp>
14#include <userver/kafka/impl/messages.hpp>
15#include <userver/utils/fast_pimpl.hpp>
16#include <userver/utils/statistics/writer.hpp>
17
18USERVER_NAMESPACE_BEGIN
19
20namespace kafka {
21
22/// @brief Unassigned partition.
23///
24/// The unassigned partition is used by the producer API for messages
25/// that should be partitioned using the default partitioner.
26///
27/// @note By default partitions are distributed uniformly.
28extern const std::optional<std::uint32_t> kUnassignedPartition;
29
30namespace impl {
31
32class Configuration;
33class ProducerImpl;
34
35struct ProducerConfiguration;
36struct Secret;
37
38class HeadersHolder;
39
40} // namespace impl
41
42/// @ingroup userver_clients
43///
44/// @brief Apache Kafka Producer Client.
45///
46/// It exposes the API to send an arbitrary message to the Kafka Broker to
47/// certain topic by given key and optional partition.
48///
49/// ## Important implementation details
50///
51/// Message send tasks handling other messages' delivery reports, suspending
52/// their execution while no events exist.
53/// This makes message production parallel and leads to high Producer
54/// scalability.
55///
56/// Producer maintains per topic statistics including the broker
57/// connection errors.
58///
59/// @remark Destructor may wait for no more than a 2 x `delivery_timeout` to
60/// ensure all sent messages are properly delivered.
61///
62/// @see https://docs.confluent.io/platform/current/clients/producer.html
63class Producer final {
64public:
65 /// @cond
66 // @param producer_task_processor is task processor where producer creates
67 // tasks for message delivery scheduling and waiting.
68 Producer(
69 const std::string& name,
70 engine::TaskProcessor& producer_task_processor,
71 const impl::ProducerConfiguration& configuration,
72 const impl::Secret& secrets
73 );
74 /// @endcond
75
76 /// @brief Waits until all messages are sent for at most 2 x
77 /// `delivery_timeout` and destroys the producer.
78 ///
79 /// @remark In a basic producer use cases, the destructor returns immediately.
81
82 Producer(const Producer&) = delete;
83 Producer(Producer&&) = delete;
84
85 Producer& operator=(const Producer&) = delete;
86 Producer& operator=(Producer&&) = delete;
87
88 /// @brief Sends given message to topic `topic_name` by given `key`
89 /// and `partition` (if passed) with payload contains the `message`
90 /// data. Asynchronously waits until the message is delivered or the delivery error occurred.
91 ///
92 /// No payload data is copied. Method holds the data until message is delivered.
93 ///
94 /// Thread-safe and can be called from any number of threads concurrently.
95 ///
96 /// If `partition` not passed, partition is chosen by internal Kafka partitioner.
97 ///
98 /// @warning if `enable_idempotence` option is enabled, do not use both explicit partitions and Kafka-chosen ones.
99 ///
100 /// @throws SendException and its descendants if message is not delivered
101 /// and acked by Kafka Broker in configured timeout.
102 ///
103 /// @note Use SendException::IsRetryable method to understand whether there is a sense to retry the message sending.
104 /// @snippet kafka/tests/producer_kafkatest.cpp Producer retryable error
105 void Send(
106 utils::zstring_view topic_name,
107 std::string_view key,
108 std::string_view message,
109 std::optional<std::uint32_t> partition = kUnassignedPartition,
110 HeaderViews headers = {}
111 ) const;
112
113 /// @brief Sends given messages to topic `topic_name` by given `key`
114 /// and `partition` (if passed) with payload contains the `messages`
115 /// data. Asynchronously waits until the messages are delivered or the delivery error occurred.
116 ///
117 /// No payload data is copied. Method holds the data until messages are delivered.
118 ///
119 /// Thread-safe and can be called from any number of threads concurrently.
120 ///
121 /// If `partition` not passed, partition is chosen by internal Kafka partitioner.
122 ///
123 /// @warning if `enable_idempotence` option is enabled, do not use both explicit partitions and Kafka-chosen ones.
124 ///
125 /// @throws BulkSendException if some messages was not delivered
126 /// and acked by Kafka Broker in configured timeout.
127 ///
128 /// @note Use BulkSendException::GetExceptions method to get a list of occured nested exceptions.
129 template <typename Messages>
130 requires(
131 std::is_convertible_v<decltype(std::declval<const Messages&>()[0]), std::string_view> &&
132 std::is_integral_v<decltype(std::declval<const Messages&>().size())>
133 )
134 void Send(
135 utils::zstring_view topic_name,
136 std::string_view key,
137 const Messages& messages,
138 std::optional<std::uint32_t> partition = kUnassignedPartition,
139 HeaderViews headers = {}
140 ) const {
141 SendWrapper(topic_name, key, impl::MessagesAdapter{messages}, partition, std::move(headers));
142 }
143
144 /// @brief Same as Producer::Send, but returns the task which can be
145 /// used to wait the message delivery manually.
146 ///
147 /// @warning If user schedules a batch of send requests with Producer::SendAsync, some send
148 /// requests may be retried by the library (for instance, in case of network
149 /// blink). Though, the order messages are written to partition may differ
150 /// from the order messages are initially sent
151 /// @snippet kafka/tests/producer_kafkatest.cpp Producer batch send async
152 [[nodiscard]] engine::TaskWithResult<void> SendAsync(
153 std::string topic_name,
154 std::string key,
155 std::string message,
156 std::optional<std::uint32_t> partition = kUnassignedPartition,
157 HeaderViews headers = {}
158 ) const;
159
160 /// @brief Dumps per topic messages produce statistics. No expected to be
161 /// called manually.
162 /// @see kafka/impl/stats.hpp
163 void DumpMetric(utils::statistics::Writer& writer) const;
164
165private:
166 void SendImpl(
167 utils::zstring_view topic_name,
168 std::string_view key,
169 std::string_view message,
170 std::optional<std::uint32_t> partition,
171 impl::HeadersHolder&& headers_holder
172 ) const;
173
174 void SendImpl(
175 utils::zstring_view topic_name,
176 std::string_view key,
177 const impl::Messages& messages,
178 std::optional<std::uint32_t> partition,
179 std::vector<impl::HeadersHolder>&& headers_holders
180 ) const;
181
182 void SendWrapper(
183 utils::zstring_view topic_name,
184 std::string_view key,
185 const impl::Messages& messages,
186 std::optional<std::uint32_t> partition,
187 HeaderViews headers
188 ) const;
189
190 const std::string name_;
191 engine::TaskProcessor& producer_task_processor_;
192
193 static constexpr std::size_t kImplSize{944};
194 static constexpr std::size_t kImplAlign{16};
195 utils::FastPimpl<impl::ProducerImpl, kImplSize, kImplAlign> producer_;
196};
197
198} // namespace kafka
199
200USERVER_NAMESPACE_END