userver: userver/kafka/producer.hpp Source File
Loading...
Searching...
No Matches
producer.hpp
1#pragma once
2
3#include <cstdint>
4#include <type_traits>
5#include <utility>
6
7#include <userver/engine/task/task_processor_fwd.hpp>
8#include <userver/engine/task/task_with_result.hpp>
9#include <userver/kafka/exceptions.hpp>
10#include <userver/kafka/headers.hpp>
11#include <userver/kafka/impl/messages.hpp>
12#include <userver/utils/fast_pimpl.hpp>
13#include <userver/utils/statistics/writer.hpp>
14
15USERVER_NAMESPACE_BEGIN
16
17namespace kafka {
18
19/// @brief Unassigned partition.
20///
21/// The unassigned partition is used by the producer API for messages
22/// that should be partitioned using the default partitioner.
23///
24/// @note By default partitions are distributed uniformly.
25extern const std::optional<std::uint32_t> kUnassignedPartition;
26
27namespace impl {
28
29class Configuration;
30class ProducerImpl;
31
32struct ProducerConfiguration;
33struct Secret;
34
35class HeadersHolder;
36
37} // namespace impl
38
39/// @ingroup userver_clients
40///
41/// @brief Apache Kafka Producer Client.
42///
43/// It exposes the API to send an arbitrary message to the Kafka Broker to
44/// certain topic by given key and optional partition.
45///
46/// ## Important implementation details
47///
48/// Message send tasks handling other messages' delivery reports, suspending
49/// their execution while no events exist.
50/// This makes message production parallel and leads to high Producer
51/// scalability.
52///
53/// Producer maintains per topic statistics including the broker
54/// connection errors.
55///
56/// @remark Destructor may wait for no more than a 2 x `delivery_timeout` to
57/// ensure all sent messages are properly delivered.
58///
59/// @see https://docs.confluent.io/platform/current/clients/producer.html
60class Producer final {
61public:
62 /// @cond
63 // @param producer_task_processor is task processor where producer creates
64 // tasks for message delivery scheduling and waiting.
65 Producer(
66 const std::string& name,
67 engine::TaskProcessor& producer_task_processor,
68 const impl::ProducerConfiguration& configuration,
69 const impl::Secret& secrets
70 );
71 /// @endcond
72
73 /// @brief Waits until all messages are sent for at most 2 x
74 /// `delivery_timeout` and destroys the producer.
75 ///
76 /// @remark In a basic producer use cases, the destructor returns immediately.
78
79 Producer(const Producer&) = delete;
80 Producer(Producer&&) = delete;
81
82 Producer& operator=(const Producer&) = delete;
83 Producer& operator=(Producer&&) = delete;
84
85 /// @brief Sends given message to topic `topic_name` by given `key`
86 /// and `partition` (if passed) with payload contains the `message`
87 /// data. Asynchronously waits until the message is delivered or the delivery error occurred.
88 ///
89 /// No payload data is copied. Method holds the data until message is delivered.
90 ///
91 /// Thread-safe and can be called from any number of threads concurrently.
92 ///
93 /// If `partition` not passed, partition is chosen by internal Kafka partitioner.
94 ///
95 /// @warning if `enable_idempotence` option is enabled, do not use both explicit partitions and Kafka-chosen ones.
96 ///
97 /// @throws SendException and its descendants if message is not delivered
98 /// and acked by Kafka Broker in configured timeout.
99 ///
100 /// @note Use SendException::IsRetryable method to understand whether there is a sense to retry the message sending.
101 /// @snippet kafka/tests/producer_kafkatest.cpp Producer retryable error
102 void Send(
103 utils::zstring_view topic_name,
104 std::string_view key,
105 std::string_view message,
106 std::optional<std::uint32_t> partition = kUnassignedPartition,
107 HeaderViews headers = {}
108 ) const;
109
110 /// @brief Sends given messages to topic `topic_name` by given `key`
111 /// and `partition` (if passed) with payload contains the `messages`
112 /// data. Asynchronously waits until the messages are delivered or the delivery error occurred.
113 ///
114 /// No payload data is copied. Method holds the data until messages are delivered.
115 ///
116 /// Thread-safe and can be called from any number of threads concurrently.
117 ///
118 /// If `partition` not passed, partition is chosen by internal Kafka partitioner.
119 ///
120 /// @warning if `enable_idempotence` option is enabled, do not use both explicit partitions and Kafka-chosen ones.
121 ///
122 /// @throws BulkSendException if some messages was not delivered
123 /// and acked by Kafka Broker in configured timeout.
124 ///
125 /// @note Use BulkSendException::GetExceptions method to get a list of occured nested exceptions.
126 template <typename Messages>
127 std::enable_if_t<
128 std::is_convertible_v<decltype(std::declval<const Messages&>()[0]), std::string_view> &&
129 std::is_integral_v<decltype(std::declval<const Messages&>().size())> >
131 utils::zstring_view topic_name,
132 std::string_view key,
133 const Messages& messages,
134 std::optional<std::uint32_t> partition = kUnassignedPartition,
135 HeaderViews headers = {}
136 ) const {
137 SendWrapper(topic_name, key, impl::MessagesAdapter{messages}, partition, std::move(headers));
138 }
139
140 /// @brief Same as Producer::Send, but returns the task which can be
141 /// used to wait the message delivery manually.
142 ///
143 /// @warning If user schedules a batch of send requests with Producer::SendAsync, some send
144 /// requests may be retried by the library (for instance, in case of network
145 /// blink). Though, the order messages are written to partition may differ
146 /// from the order messages are initially sent
147 /// @snippet kafka/tests/producer_kafkatest.cpp Producer batch send async
148 [[nodiscard]] engine::TaskWithResult<void> SendAsync(
149 std::string topic_name,
150 std::string key,
151 std::string message,
152 std::optional<std::uint32_t> partition = kUnassignedPartition,
153 HeaderViews headers = {}
154 ) const;
155
156 /// @brief Dumps per topic messages produce statistics. No expected to be
157 /// called manually.
158 /// @see kafka/impl/stats.hpp
159 void DumpMetric(utils::statistics::Writer& writer) const;
160
161private:
162 void SendImpl(
163 utils::zstring_view topic_name,
164 std::string_view key,
165 std::string_view message,
166 std::optional<std::uint32_t> partition,
167 impl::HeadersHolder&& headers_holder
168 ) const;
169
170 void SendImpl(
171 utils::zstring_view topic_name,
172 std::string_view key,
173 const impl::Messages& messages,
174 std::optional<std::uint32_t> partition,
175 std::vector<impl::HeadersHolder>&& headers_holders
176 ) const;
177
178 void SendWrapper(
179 utils::zstring_view topic_name,
180 std::string_view key,
181 const impl::Messages& messages,
182 std::optional<std::uint32_t> partition,
183 HeaderViews headers
184 ) const;
185
186 const std::string name_;
187 engine::TaskProcessor& producer_task_processor_;
188
189 static constexpr std::size_t kImplSize{944};
190 static constexpr std::size_t kImplAlign{16};
191 utils::FastPimpl<impl::ProducerImpl, kImplSize, kImplAlign> producer_;
192};
193
194} // namespace kafka
195
196USERVER_NAMESPACE_END