userver: kafka::Producer Class Reference
Loading...
Searching...
No Matches
kafka::Producer Class Referencefinal

#include <userver/kafka/producer.hpp>

Detailed Description

Apache Kafka Producer Client.

It exposes the API to send an arbitrary message to the Kafka Broker to certain topic by given key and optional partition.

Important implementation details

Message send tasks handling other messages' delivery reports, suspending their execution while no events exist. This makes message production parallel and leads to high Producer scalability.

Producer maintains per topic statistics including the broker connection errors.

Remarks
Destructor may wait for no more than a 2 x delivery_timeout to ensure all sent messages are properly delivered.
See also
https://docs.confluent.io/platform/current/clients/producer.html
Examples
samples/kafka_service/src/produce.cpp, samples/kafka_service/src/produce.hpp, and samples/kafka_service/src/producer_handler.hpp.

Definition at line 60 of file producer.hpp.

Public Member Functions

 ~Producer ()
 Waits until all messages are sent for at most 2 x delivery_timeout and destroys the producer.
 
 Producer (const Producer &)=delete
 
 Producer (Producer &&)=delete
 
Produceroperator= (const Producer &)=delete
 
Produceroperator= (Producer &&)=delete
 
void Send (utils::zstring_view topic_name, std::string_view key, std::string_view message, std::optional< std::uint32_t > partition=kUnassignedPartition, HeaderViews headers={}) const
 Sends given message to topic topic_name by given key and partition (if passed) with payload contains the message data. Asynchronously waits until the message is delivered or the delivery error occurred.
 
template<typename Messages>
std::enable_if_t< std::is_convertible_v< decltype(std::declval< const Messages & >()[0]), std::string_view > &&std::is_integral_v< decltype(std::declval< const Messages & >().size())> > Send (utils::zstring_view topic_name, std::string_view key, const Messages &messages, std::optional< std::uint32_t > partition=kUnassignedPartition, HeaderViews headers={}) const
 Sends given messages to topic topic_name by given key and partition (if passed) with payload contains the messages data. Asynchronously waits until the messages are delivered or the delivery error occurred.
 
engine::TaskWithResult< void > SendAsync (std::string topic_name, std::string key, std::string message, std::optional< std::uint32_t > partition=kUnassignedPartition, HeaderViews headers={}) const
 Same as Producer::Send, but returns the task which can be used to wait the message delivery manually.
 
void DumpMetric (utils::statistics::Writer &writer) const
 Dumps per topic messages produce statistics. No expected to be called manually.
 

Constructor & Destructor Documentation

◆ ~Producer()

kafka::Producer::~Producer ( )

Waits until all messages are sent for at most 2 x delivery_timeout and destroys the producer.

Remarks
In a basic producer use cases, the destructor returns immediately.

Member Function Documentation

◆ DumpMetric()

void kafka::Producer::DumpMetric ( utils::statistics::Writer & writer) const

Dumps per topic messages produce statistics. No expected to be called manually.

See also
kafka/impl/stats.hpp

◆ Send() [1/2]

template<typename Messages>
std::enable_if_t< std::is_convertible_v< decltype(std::declval< const Messages & >()[0]), std::string_view > &&std::is_integral_v< decltype(std::declval< const Messages & >().size())> > kafka::Producer::Send ( utils::zstring_view topic_name,
std::string_view key,
const Messages & messages,
std::optional< std::uint32_t > partition = kUnassignedPartition,
HeaderViews headers = {} ) const
inline

Sends given messages to topic topic_name by given key and partition (if passed) with payload contains the messages data. Asynchronously waits until the messages are delivered or the delivery error occurred.

No payload data is copied. Method holds the data until messages are delivered.

Thread-safe and can be called from any number of threads concurrently.

If partition not passed, partition is chosen by internal Kafka partitioner.

Warning
if enable_idempotence option is enabled, do not use both explicit partitions and Kafka-chosen ones.
Exceptions
BulkSendExceptionif some messages was not delivered and acked by Kafka Broker in configured timeout.
Note
Use BulkSendException::GetExceptions method to get a list of occured nested exceptions.

Definition at line 130 of file producer.hpp.

◆ Send() [2/2]

void kafka::Producer::Send ( utils::zstring_view topic_name,
std::string_view key,
std::string_view message,
std::optional< std::uint32_t > partition = kUnassignedPartition,
HeaderViews headers = {} ) const

Sends given message to topic topic_name by given key and partition (if passed) with payload contains the message data. Asynchronously waits until the message is delivered or the delivery error occurred.

No payload data is copied. Method holds the data until message is delivered.

Thread-safe and can be called from any number of threads concurrently.

If partition not passed, partition is chosen by internal Kafka partitioner.

Warning
if enable_idempotence option is enabled, do not use both explicit partitions and Kafka-chosen ones.
Exceptions
SendExceptionand its descendants if message is not delivered and acked by Kafka Broker in configured timeout.
Note
Use SendException::IsRetryable method to understand whether there is a sense to retry the message sending.
std::vector<engine::TaskWithResult<void>> results;
results.reserve(kMaxQueueMessages);
for (std::uint32_t send{0}; send < kMaxQueueMessages; ++send) {
results.push_back(producer.SendAsync(topic, fmt::format("test-key-{}", send), std::to_string(send)));
}
std::vector<std::uint32_t> sends_to_retry;
for (std::uint32_t send{0}; send < kMaxQueueMessages; ++send) {
try {
results[send].Get();
} catch (const kafka::SendException& e) {
if (e.IsRetryable()) {
// Probabl issues with network and reached `delivery_timeout`, retry
sends_to_retry.push_back(send);
} else {
// LOG ...
}
}
}
Examples
samples/kafka_service/src/produce.cpp.

◆ SendAsync()

engine::TaskWithResult< void > kafka::Producer::SendAsync ( std::string topic_name,
std::string key,
std::string message,
std::optional< std::uint32_t > partition = kUnassignedPartition,
HeaderViews headers = {} ) const
nodiscard

Same as Producer::Send, but returns the task which can be used to wait the message delivery manually.

Warning
If user schedules a batch of send requests with Producer::SendAsync, some send requests may be retried by the library (for instance, in case of network blink). Though, the order messages are written to partition may differ from the order messages are initially sent
std::vector<engine::TaskWithResult<void>> results;
results.reserve(kSendCount);
for (std::size_t send{0}; send < kSendCount; ++send) {
results
.emplace_back(producer.SendAsync(topic, fmt::format("test-key-{}", send), fmt::format("test-msg-{}", send))
);
}

The documentation for this class was generated from the following file: