userver: userver/kafka/producer.hpp Source File
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages Concepts
producer.hpp
1#pragma once
2
3#include <cstdint>
4
5#include <userver/engine/task/task_processor_fwd.hpp>
6#include <userver/engine/task/task_with_result.hpp>
7#include <userver/kafka/exceptions.hpp>
8#include <userver/kafka/headers.hpp>
9#include <userver/utils/fast_pimpl.hpp>
10#include <userver/utils/statistics/writer.hpp>
11
12USERVER_NAMESPACE_BEGIN
13
14namespace kafka {
15
16/// @brief Unassigned partition.
17///
18/// The unassigned partition is used by the producer API for messages
19/// that should be partitioned using the default partitioner.
20///
21/// @note By default partitions are distributed uniformly.
22extern const std::optional<std::uint32_t> kUnassignedPartition;
23
24namespace impl {
25
26class Configuration;
27class ProducerImpl;
28
29struct ProducerConfiguration;
30struct Secret;
31
32class HeadersHolder;
33
34} // namespace impl
35
36/// @ingroup userver_clients
37///
38/// @brief Apache Kafka Producer Client.
39///
40/// It exposes the API to send an arbitrary message to the Kafka Broker to
41/// certain topic by given key and optional partition.
42///
43/// ## Important implementation details
44///
45/// Message send tasks handling other messages' delivery reports, suspending
46/// their execution while no events exist.
47/// This makes message production parallel and leads to high Producer
48/// scalability.
49///
50/// Producer maintains per topic statistics including the broker
51/// connection errors.
52///
53/// @remark Destructor may wait for no more than a 2 x `delivery_timeout` to
54/// ensure all sent messages are properly delivered.
55///
56/// @see https://docs.confluent.io/platform/current/clients/producer.html
57class Producer final {
58public:
59 /// @cond
60 // @param producer_task_processor is task processor where producer creates
61 // tasks for message delivery scheduling and waiting.
62 Producer(
63 const std::string& name,
64 engine::TaskProcessor& producer_task_processor,
65 const impl::ProducerConfiguration& configuration,
66 const impl::Secret& secrets
67 );
68 /// @endcond
69
70 /// @brief Waits until all messages are sent for at most 2 x
71 /// `delivery_timeout` and destroys the producer.
72 ///
73 /// @remark In a basic producer use cases, the destructor returns immediately.
75
76 Producer(const Producer&) = delete;
77 Producer(Producer&&) = delete;
78
79 Producer& operator=(const Producer&) = delete;
80 Producer& operator=(Producer&&) = delete;
81
82 /// @brief Sends given message to topic `topic_name` by given `key`
83 /// and `partition` (if passed) with payload contains the `message`
84 /// data. Asynchronously waits until the message is delivered or the delivery
85 /// error occurred.
86 ///
87 /// No payload data is copied. Method holds the data until message is
88 /// delivered.
89 ///
90 /// Thread-safe and can be called from any number of threads
91 /// concurrently.
92 ///
93 /// If `partition` not passed, partition is chosen by internal
94 /// Kafka partitioner.
95 ///
96 /// @warning if `enable_idempotence` option is enabled, do not use both
97 /// explicit partitions and Kafka-chosen ones.
98 ///
99 /// @throws SendException and its descendants if message is not delivered
100 /// and acked by Kafka Broker in configured timeout.
101 ///
102 /// @note Use SendException::IsRetryable method to understand whether there is
103 /// a sense to retry the message sending.
104 /// @snippet kafka/tests/producer_kafkatest.cpp Producer retryable error
105 void Send(
106 const std::string& topic_name,
107 std::string_view key,
108 std::string_view message,
109 std::optional<std::uint32_t> partition = kUnassignedPartition,
110 HeaderViews headers = {}
111 ) const;
112
113 /// @brief Same as Producer::Send, but returns the task which can be
114 /// used to wait the message delivery manually.
115 ///
116 /// @warning If user schedules a batch of send requests with
117 /// Producer::SendAsync, some send
118 /// requests may be retried by the library (for instance, in case of network
119 /// blink). Though, the order messages are written to partition may differ
120 /// from the order messages are initially sent
121 /// @snippet kafka/tests/producer_kafkatest.cpp Producer batch send async
122 [[nodiscard]] engine::TaskWithResult<void> SendAsync(
123 std::string topic_name,
124 std::string key,
125 std::string message,
126 std::optional<std::uint32_t> partition = kUnassignedPartition,
127 HeaderViews headers = {}
128 ) const;
129
130 /// @brief Dumps per topic messages produce statistics. No expected to be
131 /// called manually.
132 /// @see kafka/impl/stats.hpp
133 void DumpMetric(utils::statistics::Writer& writer) const;
134
135private:
136 void SendImpl(
137 const std::string& topic_name,
138 std::string_view key,
139 std::string_view message,
140 std::optional<std::uint32_t> partition,
141 impl::HeadersHolder&& headers_holder
142 ) const;
143
144private:
145 const std::string name_;
146 engine::TaskProcessor& producer_task_processor_;
147
148 static constexpr std::size_t kImplSize{944};
149 static constexpr std::size_t kImplAlign{16};
150 utils::FastPimpl<impl::ProducerImpl, kImplSize, kImplAlign> producer_;
151};
152
153} // namespace kafka
154
155USERVER_NAMESPACE_END