userver: userver/kafka/producer.hpp Source File
Loading...
Searching...
No Matches
producer.hpp
1#pragma once
2
3#include <cstdint>
4
5#include <userver/engine/task/task_processor_fwd.hpp>
6#include <userver/engine/task/task_with_result.hpp>
7#include <userver/kafka/exceptions.hpp>
8#include <userver/utils/fast_pimpl.hpp>
9#include <userver/utils/statistics/writer.hpp>
10
11USERVER_NAMESPACE_BEGIN
12
13namespace kafka {
14
15namespace impl {
16
17class Configuration;
18class ProducerImpl;
19
20struct ProducerConfiguration;
21struct Secret;
22
23} // namespace impl
24
25/// @ingroup userver_clients
26///
27/// @brief Apache Kafka Producer Client.
28///
29/// It exposes the API to send an arbitrary message to the Kafka Broker to
30/// certain topic by given key and optional partition.
31///
32/// ## Important implementation details
33///
34/// Message send tasks handling other messages' delivery reports, suspending
35/// their execution while no events exist.
36/// This makes message production parallel and leads to high Producer
37/// scalability.
38///
39/// Producer maintains per topic statistics including the broker
40/// connection errors.
41///
42/// @remark Destructor may wait for no more than a 2 x `delivery_timeout` to
43/// ensure all sent messages are properly delivered.
44///
45/// @see https://docs.confluent.io/platform/current/clients/producer.html
46class Producer final {
47public:
48 /// @brief Creates the Kafka Producer.
49 ///
50 /// @param producer_task_processor is task processor where producer creates
51 /// tasks for message delivery scheduling and waiting.
53 const std::string& name,
54 engine::TaskProcessor& producer_task_processor,
55 const impl::ProducerConfiguration& configuration,
56 const impl::Secret& secrets
57 );
58
59 /// @brief Waits until all messages are sent for at most 2 x
60 /// `delivery_timeout` and destroys the producer.
61 ///
62 /// @remark In a basic producer use cases, the destructor returns immediately.
63 ~Producer();
64
65 Producer(const Producer&) = delete;
66 Producer(Producer&&) = delete;
67
68 Producer& operator=(const Producer&) = delete;
69 Producer& operator=(Producer&&) = delete;
70
71 /// @brief Sends given message to topic `topic_name` by given `key`
72 /// and `partition` (if passed) with payload contains the `message`
73 /// data. Asynchronously waits until the message is delivered or the delivery
74 /// error occurred.
75 ///
76 /// No payload data is copied. Method holds the data until message is
77 /// delivered.
78 ///
79 /// Thread-safe and can be called from any number of threads
80 /// concurrently.
81 ///
82 /// If `partition` not passed, partition is chosen by internal
83 /// Kafka partitioner.
84 ///
85 /// @warning if `enable_idempotence` option is enabled, do not use both
86 /// explicit partitions and Kafka-chosen ones.
87 ///
88 /// @throws SendException and its descendants if message is not delivered
89 /// and acked by Kafka Broker in configured timeout.
90 ///
91 /// @note Use SendException::IsRetryable method to understand whether there is
92 /// a sense to retry the message sending.
93 /// @snippet kafka/tests/producer_kafkatest.cpp Producer retryable error
94 void Send(
95 const std::string& topic_name,
96 std::string_view key,
97 std::string_view message,
98 std::optional<std::uint32_t> partition = std::nullopt
99 ) const;
100
101 /// @brief Same as Producer::Send, but returns the task which can be
102 /// used to wait the message delivery manually.
103 ///
104 /// @warning If user schedules a batch of send requests with
105 /// Producer::SendAsync, some send
106 /// requests may be retried by the library (for instance, in case of network
107 /// blink). Though, the order messages are written to partition may differ
108 /// from the order messages are initially sent
109 /// @snippet kafka/tests/producer_kafkatest.cpp Producer batch send async
110 [[nodiscard]] engine::TaskWithResult<void> SendAsync(
111 std::string topic_name,
112 std::string key,
113 std::string message,
114 std::optional<std::uint32_t> partition = std::nullopt
115 ) const;
116
117 /// @brief Dumps per topic messages produce statistics. No expected to be
118 /// called manually.
119 /// @see kafka/impl/stats.hpp
120 void DumpMetric(utils::statistics::Writer& writer) const;
121
122private:
123 void SendImpl(
124 const std::string& topic_name,
125 std::string_view key,
126 std::string_view message,
127 std::optional<std::uint32_t> partition
128 ) const;
129
130private:
131 const std::string name_;
132 engine::TaskProcessor& producer_task_processor_;
133
134 static constexpr std::size_t kImplSize{944};
135 static constexpr std::size_t kImplAlign{16};
136 utils::FastPimpl<impl::ProducerImpl, kImplSize, kImplAlign> producer_;
137};
138
139} // namespace kafka
140
141USERVER_NAMESPACE_END