userver: userver/kafka/producer.hpp Source File
Loading...
Searching...
No Matches
producer.hpp
1#pragma once
2
3#include <chrono>
4#include <cstdint>
5
6#include <userver/engine/task/task.hpp>
7#include <userver/engine/task/task_processor_fwd.hpp>
8#include <userver/engine/task/task_with_result.hpp>
9#include <userver/utils/statistics/writer.hpp>
10
11USERVER_NAMESPACE_BEGIN
12
13namespace kafka {
14
15namespace impl {
16
17class Configuration;
18class ProducerImpl;
19
20} // namespace impl
21
22/// @ingroup userver_clients
23///
24/// @brief Apache Kafka Producer Client.
25///
26/// It exposes the API to send an arbitrary message to the Kafka Broker to
27/// certain topic by given key and optional partition.
28///
29/// ## Important implementation details
30///
31/// Implementation does not block on any send to Kafka and asynchronously
32/// waits for each message to be delivered.
33///
34/// `Producer` periodically polls the metadata about delivered messages
35/// from Kafka Broker, blocking for some time, in separate task processor.
36///
37/// `Producer` maintains the per topic statistics including the broker
38/// connection errors.
39///
40/// @remark Destructor may block for no more than a couple of seconds to ensure
41/// all sent messages are properly delivered
42///
43/// @see https://docs.confluent.io/platform/current/clients/producer.html
44class Producer final {
45 public:
46 /// @brief Time producer waits for new delivery events.
47 static constexpr std::chrono::milliseconds kDefaultPollTimeout{10};
48
49 /// @brief How many times `Produce::Send*` retries when delivery
50 /// failures. Retries take place only when errors are transient.
51 ///
52 /// @remark `librdkafka` already has a retry mechanism. Moreover, user-retried
53 /// requests may lead to messages reordering or duplication. Nevertheless, the
54 /// library retries a small list of delivery errors (such as message
55 /// guaranteed timeouts), including errors those are not retried by
56 /// `librdkafka` and errors that may occure when the Kafka cluster or topic
57 /// have just been created (for instance, in tests)
58 /// @see impl/producer_impl.cpp for the list of errors retryable by library
59 static constexpr std::size_t kDefaultSendRetries = 5;
60
61 /// @brief Creates the Kafka Producer.
62 Producer(std::unique_ptr<impl::Configuration> configuration,
63 engine::TaskProcessor& producer_task_processor,
64 std::chrono::milliseconds poll_timeout, std::size_t send_retries);
65
66 /// @brief Waits until all messages are sent for a certain timeout and destroy
67 /// the inner producer.
68 ~Producer();
69
70 Producer(const Producer&) = delete;
71 Producer(Producer&&) = delete;
72
73 Producer& operator=(const Producer&) = delete;
74 Producer& operator=(Producer&&) = delete;
75
76 /// @brief Sends given message to topic `topic_name` by given `key`
77 /// and `partition` (if passed) with payload contains the `message`
78 /// data. Asynchronously waits until the message is delivered or the delivery
79 /// error occured.
80 ///
81 /// No payload data is copied. Method holds the data until message is
82 /// delivered.
83 ///
84 /// thread-safe and can be called from any number of threads
85 /// simultaneously.
86 ///
87 /// `Producer::Send` call may take at most
88 /// `delivery_timeout_ms` x `send_retries_count` milliseconds.
89 ///
90 /// If `partition` not passed, partition is chosen by internal
91 /// Kafka partitioner.
92 ///
93 /// @warning if `enable_idempotence` option is enabled, do not use both
94 /// explicit partitions and Kafka-chosen ones
95 /// @throws std::runtime_error if message is not delivery and acked by Kafka
96 /// Broker
97 void Send(const std::string& topic_name, std::string_view key,
98 std::string_view message,
99 std::optional<std::uint32_t> partition = std::nullopt) const;
100
101 /// @brief Same as `Producer::Send`, but returns the task which can be
102 /// used to wait the message delivery.
103 ///
104 /// @warning If user schedules a batch of send requests with
105 /// `Producer::SendAsync`, some send
106 /// requests may be retried by the library (for instance, in case of network
107 /// blink). Though, the order messages are written to partition may differ
108 /// from the order messages are initially sent
109 [[nodiscard]] engine::TaskWithResult<void> SendAsync(
110 std::string topic_name, std::string key, std::string message,
111 std::optional<std::uint32_t> partition = std::nullopt) const;
112
113 /// @brief Dumps per topic messages produce statistics.
114 /// @see impl/stats.hpp
115 void DumpMetric(utils::statistics::Writer& writer) const;
116
117 private:
118 void InitProducerAndStartPollingIfFirstSend() const;
119
120 void VerifyNotFinished() const;
121
122 /// @note for testsuite
123 void SendToTestPoint(std::string_view topic_name, std::string_view key,
124 std::string_view message) const;
125
126 /// @brief Adds consumer name to current span.
127 void ExtendCurrentSpan() const;
128
129 private:
130 const std::string component_name_;
131 engine::TaskProcessor& producer_task_processor_;
132
133 const std::chrono::milliseconds poll_timeout_{};
134 const std::size_t send_retries_{};
135
136 mutable std::atomic<bool> first_send_{true};
137 mutable std::unique_ptr<impl::Configuration> configuration_;
138 mutable std::unique_ptr<impl::ProducerImpl>
139 producer_; // mutable to be created on first send
140 mutable engine::Task poll_task_; // mutable to be created on first send
141};
142
143} // namespace kafka
144
145USERVER_NAMESPACE_END