userver: userver/kafka/producer.hpp Source File
Loading...
Searching...
No Matches
producer.hpp
1#pragma once
2
3#include <cstdint>
4
5#include <userver/engine/task/task_processor_fwd.hpp>
6#include <userver/engine/task/task_with_result.hpp>
7#include <userver/kafka/exceptions.hpp>
8#include <userver/utils/fast_pimpl.hpp>
9#include <userver/utils/statistics/writer.hpp>
10
11USERVER_NAMESPACE_BEGIN
12
13namespace kafka {
14
15namespace impl {
16
17class Configuration;
18class ProducerImpl;
19
20struct ProducerConfiguration;
21struct Secret;
22
23} // namespace impl
24
25/// @ingroup userver_clients
26///
27/// @brief Apache Kafka Producer Client.
28///
29/// It exposes the API to send an arbitrary message to the Kafka Broker to
30/// certain topic by given key and optional partition.
31///
32/// ## Important implementation details
33///
34/// Message send tasks handling other messages' delivery reports, suspending
35/// their execution while no events exist.
36/// This makes message production parallel and leads to high Producer
37/// scalability.
38///
39/// Producer maintains per topic statistics including the broker
40/// connection errors.
41///
42/// @remark Destructor may wait for no more than a 2 x `delivery_timeout` to
43/// ensure all sent messages are properly delivered.
44///
45/// @see https://docs.confluent.io/platform/current/clients/producer.html
46class Producer final {
47 public:
48 /// @brief Creates the Kafka Producer.
49 ///
50 /// @param producer_task_processor is task processor where producer creates
51 /// tasks for message delivery scheduling and waiting.
52 Producer(const std::string& name,
53 engine::TaskProcessor& producer_task_processor,
54 const impl::ProducerConfiguration& configuration,
55 const impl::Secret& secrets);
56
57 /// @brief Waits until all messages are sent for at most 2 x
58 /// `delivery_timeout` and destroys the producer.
59 ///
60 /// @remark In a basic producer use cases, the destructor returns immediately.
61 ~Producer();
62
63 Producer(const Producer&) = delete;
64 Producer(Producer&&) = delete;
65
66 Producer& operator=(const Producer&) = delete;
67 Producer& operator=(Producer&&) = delete;
68
69 /// @brief Sends given message to topic `topic_name` by given `key`
70 /// and `partition` (if passed) with payload contains the `message`
71 /// data. Asynchronously waits until the message is delivered or the delivery
72 /// error occurred.
73 ///
74 /// No payload data is copied. Method holds the data until message is
75 /// delivered.
76 ///
77 /// Thread-safe and can be called from any number of threads
78 /// concurrently.
79 ///
80 /// If `partition` not passed, partition is chosen by internal
81 /// Kafka partitioner.
82 ///
83 /// @warning if `enable_idempotence` option is enabled, do not use both
84 /// explicit partitions and Kafka-chosen ones.
85 ///
86 /// @throws SendException and its descendants if message is not delivered
87 /// and acked by Kafka Broker in configured timeout.
88 ///
89 /// @note Use SendException::IsRetryable method to understand whether there is
90 /// a sense to retry the message sending.
91 /// @snippet kafka/tests/producer_test.cpp Producer retryable error
92 void Send(const std::string& topic_name, std::string_view key,
93 std::string_view message,
94 std::optional<std::uint32_t> partition = std::nullopt) const;
95
96 /// @brief Same as Producer::Send, but returns the task which can be
97 /// used to wait the message delivery manually.
98 ///
99 /// @warning If user schedules a batch of send requests with
100 /// Producer::SendAsync, some send
101 /// requests may be retried by the library (for instance, in case of network
102 /// blink). Though, the order messages are written to partition may differ
103 /// from the order messages are initially sent
104 /// @snippet kafka/tests/producer_test.cpp Producer batch send async
105 [[nodiscard]] engine::TaskWithResult<void> SendAsync(
106 std::string topic_name, std::string key, std::string message,
107 std::optional<std::uint32_t> partition = std::nullopt) const;
108
109 /// @brief Dumps per topic messages produce statistics. No expected to be
110 /// called manually.
111 /// @see impl/stats.hpp
112 void DumpMetric(utils::statistics::Writer& writer) const;
113
114 private:
115 void SendImpl(const std::string& topic_name, std::string_view key,
116 std::string_view message,
117 std::optional<std::uint32_t> partition) const;
118
119 private:
120 const std::string name_;
121 engine::TaskProcessor& producer_task_processor_;
122
123 static constexpr std::size_t kImplSize{944};
124 static constexpr std::size_t kImplAlign{16};
125 utils::FastPimpl<impl::ProducerImpl, kImplSize, kImplAlign> producer_;
126};
127
128} // namespace kafka
129
130USERVER_NAMESPACE_END