userver: userver/kafka/producer.hpp Source File
Loading...
Searching...
No Matches
producer.hpp
1#pragma once
2
3#include <chrono>
4#include <cstdint>
5
6#include <userver/engine/task/task.hpp>
7#include <userver/engine/task/task_processor_fwd.hpp>
8#include <userver/engine/task/task_with_result.hpp>
9#include <userver/utils/statistics/writer.hpp>
10
11USERVER_NAMESPACE_BEGIN
12
13namespace kafka {
14
15namespace impl {
16
17class Configuration;
18class ProducerImpl;
19
20struct ProducerConfiguration;
21struct Secret;
22
23} // namespace impl
24
25/// @brief Parameters `Producer` uses in runtime.
26/// The struct is used only for documentation purposes, `Producer` can be
27/// created through `ProducerComponent`.
28struct ProducerExecutionParams final {
29 /// @brief Time producer waits for new delivery events.
30 std::chrono::milliseconds poll_timeout{10};
31
32 /// @brief How many times `Produce::Send*` retries when delivery
33 /// failures. Retries take place only when errors are transient.
34 ///
35 /// @remark `librdkafka` already has a retry mechanism. Moreover, user-retried
36 /// requests may lead to messages reordering or duplication. Nevertheless, the
37 /// library retries a small list of delivery errors (such as message
38 /// guaranteed timeouts), including errors those are not retried by
39 /// `librdkafka` and errors that may occur when the Kafka cluster or topic
40 /// have just been created (for instance, in tests)
41 /// @see impl/producer_impl.cpp for the list of errors retryable by library
42 std::size_t send_retries{5};
43};
44
45/// @ingroup userver_clients
46///
47/// @brief Apache Kafka Producer Client.
48///
49/// It exposes the API to send an arbitrary message to the Kafka Broker to
50/// certain topic by given key and optional partition.
51///
52/// ## Important implementation details
53///
54/// Implementation does not block on any send to Kafka and asynchronously
55/// waits for each message to be delivered.
56///
57/// `Producer` periodically polls the metadata about delivered messages
58/// from Kafka Broker, blocking for some time, in separate task processor.
59///
60/// `Producer` maintains the per topic statistics including the broker
61/// connection errors.
62///
63/// @remark Destructor may block for no more than a couple of seconds to ensure
64/// all sent messages are properly delivered
65///
66/// @see https://docs.confluent.io/platform/current/clients/producer.html
67class Producer final {
68 public:
69 /// @brief Creates the Kafka Producer.
70 ///
71 /// @param producer_task_processor where producer polls for delivery reports
72 /// and creates tasks for message delivery scheduling.
73 /// Currently, producer_task_processor **must contain at least 2
74 /// threads for each producer**
75 Producer(const std::string& name,
76 engine::TaskProcessor& producer_task_processor,
77 const impl::ProducerConfiguration& configuration,
78 const impl::Secret& secrets, ProducerExecutionParams params);
79
80 /// @brief Waits until all messages are sent for a certain timeout and destroy
81 /// the inner producer.
82 ~Producer();
83
84 Producer(const Producer&) = delete;
85 Producer(Producer&&) = delete;
86
87 Producer& operator=(const Producer&) = delete;
88 Producer& operator=(Producer&&) = delete;
89
90 /// @brief Sends given message to topic `topic_name` by given `key`
91 /// and `partition` (if passed) with payload contains the `message`
92 /// data. Asynchronously waits until the message is delivered or the delivery
93 /// error occurred.
94 ///
95 /// No payload data is copied. Method holds the data until message is
96 /// delivered.
97 ///
98 /// thread-safe and can be called from any number of threads
99 /// simultaneously.
100 ///
101 /// `Producer::Send` call may take at most
102 /// `delivery_timeout` x `send_retries_count` milliseconds.
103 ///
104 /// If `partition` not passed, partition is chosen by internal
105 /// Kafka partitioner.
106 ///
107 /// @warning if `enable_idempotence` option is enabled, do not use both
108 /// explicit partitions and Kafka-chosen ones
109 /// @throws std::runtime_error if message is not delivery and acked by Kafka
110 /// Broker
111 void Send(const std::string& topic_name, std::string_view key,
112 std::string_view message,
113 std::optional<std::uint32_t> partition = std::nullopt) const;
114
115 /// @brief Same as `Producer::Send`, but returns the task which can be
116 /// used to wait the message delivery.
117 ///
118 /// @warning If user schedules a batch of send requests with
119 /// `Producer::SendAsync`, some send
120 /// requests may be retried by the library (for instance, in case of network
121 /// blink). Though, the order messages are written to partition may differ
122 /// from the order messages are initially sent
123 [[nodiscard]] engine::TaskWithResult<void> SendAsync(
124 std::string topic_name, std::string key, std::string message,
125 std::optional<std::uint32_t> partition = std::nullopt) const;
126
127 /// @brief Dumps per topic messages produce statistics.
128 /// @see impl/stats.hpp
129 void DumpMetric(utils::statistics::Writer& writer) const;
130
131 private:
132 void InitProducerAndStartPollingIfFirstSend() const;
133
134 void VerifyNotFinished() const;
135
136 /// @note for testsuite
137 void SendToTestPoint(std::string_view topic_name, std::string_view key,
138 std::string_view message) const;
139
140 /// @brief Adds consumer name to current span.
141 void ExtendCurrentSpan() const;
142
143 private:
144 const std::string component_name_;
145 engine::TaskProcessor& producer_task_processor_;
146
147 const std::chrono::milliseconds poll_timeout_{};
148 const std::size_t send_retries_{};
149
150 mutable std::atomic<bool> first_send_{true};
151 mutable std::unique_ptr<impl::Configuration> configuration_;
152 mutable std::unique_ptr<impl::ProducerImpl>
153 producer_; // mutable to be created on first send
154 mutable engine::Task poll_task_; // mutable to be created on first send
155};
156
157} // namespace kafka
158
159USERVER_NAMESPACE_END