userver: userver/kafka/producer.hpp Source File
⚠️ This is the documentation for an old userver version. Click here to switch to the latest version.
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages Concepts
producer.hpp
1#pragma once
2
3#include <chrono>
4#include <cstdint>
5
6#include <userver/engine/task/task.hpp>
7#include <userver/engine/task/task_processor_fwd.hpp>
8#include <userver/engine/task/task_with_result.hpp>
9#include <userver/utils/statistics/writer.hpp>
10
11USERVER_NAMESPACE_BEGIN
12
13namespace kafka {
14
15namespace impl {
16
17class Configuration;
18class ProducerImpl;
19
20} // namespace impl
21
22/// @ingroup userver_clients
23///
24/// @brief Apache Kafka Producer Client.
25///
26/// It exposes the API to send an arbitrary message to the Kafka Broker to
27/// certain topic by given key and optional partition.
28///
29/// ## Important implementation details
30///
31/// Implementation does not block on any send to Kafka and asynchronously
32/// waits for each message to be delivered.
33///
34/// `Producer` periodically polls the metadata about delivered messages
35/// from Kafka Broker, blocking for some time, in separate task processor.
36///
37/// `Producer` maintains the per topic statistics including the broker
38/// connection errors.
39///
40/// @remark Destructor may block for no more than a couple of seconds to ensure
41/// all sent messages are properly delivered
42///
43/// @see https://docs.confluent.io/platform/current/clients/producer.html
44class Producer final {
45 public:
46 /// @brief Time producer waits for new delivery events.
47 static constexpr std::chrono::milliseconds kDefaultPollTimeout{10};
48
49 /// @brief How many times `Produce::Send*` retries when delivery
50 /// failures. Retries take place only when errors are transient.
51 ///
52 /// @remark `librdkafka` already has a retry mechanism. Moreover, user-retried
53 /// requests may lead to messages reordering or duplication. Nevertheless, the
54 /// library retries a small list of delivery errors (such as message
55 /// guaranteed timeouts), including errors those are not retried by
56 /// `librdkafka` and errors that may occure when the Kafka cluster or topic
57 /// have just been created (for instance, in tests)
58 /// @see impl/producer_impl.cpp for the list of errors retryable by library
59 static constexpr std::size_t kDefaultSendRetries = 5;
60
61 /// @brief Creates the Kafka Producer.
62 Producer(std::unique_ptr<impl::Configuration> configuration,
63 engine::TaskProcessor& producer_task_processor,
64 std::chrono::milliseconds poll_timeout, std::size_t send_retries);
65
66 /// @brief Waits until all messages are sent for a certain timeout and destroy
67 /// the inner producer.
68 ~Producer();
69
70 Producer(const Producer&) = delete;
71 Producer(Producer&&) = delete;
72
73 Producer& operator=(const Producer&) = delete;
74 Producer& operator=(Producer&&) = delete;
75
76 /// @brief Sends given message to topic `topic_name` by given `key`
77 /// and `partition` (if passed) with payload contains the `message`
78 /// data. Asynchronously waits until the message is delivered or the delivery
79 /// error occured.
80 ///
81 /// No payload data is copied. Method holds the data until message is
82 /// delivered.
83 ///
84 /// thread-safe and can be called from any number of threads
85 /// simultaneously.
86 ///
87 /// `Producer::Send` call may take at most
88 /// `delivery_timeout_ms` x `send_retries_count` milliseconds.
89 ///
90 /// If `partition` not passed, partition is chosen by internal
91 /// Kafka partitioner.
92 ///
93 /// @warning if `enable_idempotence` option is enabled, do not use both
94 /// explicit partitions and Kafka-chosen ones
95 /// @throws std::runtime_error if message is not delivery and acked by Kafka
96 /// Broker
97 void Send(const std::string& topic_name, std::string_view key,
98 std::string_view message,
99 std::optional<std::uint32_t> partition = std::nullopt) const;
100
101 /// @brief Same as `Producer::Send`, but returns the task which can be
102 /// used to wait the message delivery.
103 ///
104 /// @warning If user schedules a batch of send requests with
105 /// `Producer::SendAsync`, some send
106 /// requests may be retried by the library (for instance, in case of network
107 /// blink). Though, the order messages are written to partition may differ
108 /// from the order messages are initially sent
109 [[nodiscard]] engine::TaskWithResult<void> SendAsync(
110 std::string topic_name, std::string key, std::string message,
111 std::optional<std::uint32_t> partition = std::nullopt) const;
112
113 /// @brief Dumps per topic messages produce statistics.
114 /// @see impl/stats.hpp
115 void DumpMetric(utils::statistics::Writer& writer) const;
116
117 private:
118 void InitProducerAndStartPollingIfFirstSend() const;
119
120 void VerifyNotFinished() const;
121
122 /// @note for testsuite
123 void SendToTestPoint(std::string_view topic_name, std::string_view key,
124 std::string_view message) const;
125
126 /// @brief Adds consumer name to current span.
127 void ExtendCurrentSpan() const;
128
129 private:
130 const std::string component_name_;
131 engine::TaskProcessor& producer_task_processor_;
132
133 const std::chrono::milliseconds poll_timeout_{};
134 const std::size_t send_retries_{};
135
136 mutable std::atomic<bool> first_send_{true};
137 mutable std::unique_ptr<impl::Configuration> configuration_;
138 mutable std::unique_ptr<impl::ProducerImpl>
139 producer_; // mutable to be created on first send
140 mutable engine::Task poll_task_; // mutable to be created on first send
141};
142
143} // namespace kafka
144
145USERVER_NAMESPACE_END