#include <userver/kafka/producer.hpp>
Apache Kafka Producer Client.
It exposes the API to send an arbitrary message to the Kafka Broker to certain topic by given key and optional partition.
Implementation does not block on any send to Kafka and asynchronously waits for each message to be delivered.
Producer
periodically polls the metadata about delivered messages from Kafka Broker, blocking for some time, in separate task processor.
Producer
maintains the per topic statistics including the broker connection errors.
Definition at line 44 of file producer.hpp.
Public Member Functions | |
Producer (std::unique_ptr< impl::Configuration > configuration, engine::TaskProcessor &producer_task_processor, std::chrono::milliseconds poll_timeout, std::size_t send_retries) | |
Creates the Kafka Producer. | |
~Producer () | |
Waits until all messages are sent for a certain timeout and destroy the inner producer. | |
Producer (const Producer &)=delete | |
Producer (Producer &&)=delete | |
Producer & | operator= (const Producer &)=delete |
Producer & | operator= (Producer &&)=delete |
void | Send (const std::string &topic_name, std::string_view key, std::string_view message, std::optional< std::uint32_t > partition=std::nullopt) const |
Sends given message to topic topic_name by given key and partition (if passed) with payload contains the message data. Asynchronously waits until the message is delivered or the delivery error occured. | |
engine::TaskWithResult< void > | SendAsync (std::string topic_name, std::string key, std::string message, std::optional< std::uint32_t > partition=std::nullopt) const |
Same as Producer::Send , but returns the task which can be used to wait the message delivery. | |
void | DumpMetric (utils::statistics::Writer &writer) const |
Dumps per topic messages produce statistics. | |
Static Public Attributes | |
static constexpr std::chrono::milliseconds | kDefaultPollTimeout {10} |
Time producer waits for new delivery events. | |
static constexpr std::size_t | kDefaultSendRetries = 5 |
How many times Produce::Send* retries when delivery failures. Retries take place only when errors are transient. | |
void kafka::Producer::DumpMetric | ( | utils::statistics::Writer & | writer | ) | const |
Dumps per topic messages produce statistics.
void kafka::Producer::Send | ( | const std::string & | topic_name, |
std::string_view | key, | ||
std::string_view | message, | ||
std::optional< std::uint32_t > | partition = std::nullopt ) const |
Sends given message to topic topic_name
by given key
and partition
(if passed) with payload contains the message
data. Asynchronously waits until the message is delivered or the delivery error occured.
No payload data is copied. Method holds the data until message is delivered.
thread-safe and can be called from any number of threads simultaneously.
Producer::Send
call may take at most delivery_timeout_ms
x send_retries_count
milliseconds.
If partition
not passed, partition is chosen by internal Kafka partitioner.
enable_idempotence
option is enabled, do not use both explicit partitions and Kafka-chosen ones std::runtime_error | if message is not delivery and acked by Kafka Broker |
engine::TaskWithResult< void > kafka::Producer::SendAsync | ( | std::string | topic_name, |
std::string | key, | ||
std::string | message, | ||
std::optional< std::uint32_t > | partition = std::nullopt ) const |
Same as Producer::Send
, but returns the task which can be used to wait the message delivery.
Producer::SendAsync
, some send requests may be retried by the library (for instance, in case of network blink). Though, the order messages are written to partition may differ from the order messages are initially sent
|
staticconstexpr |
Time producer waits for new delivery events.
Definition at line 47 of file producer.hpp.
|
staticconstexpr |
How many times Produce::Send*
retries when delivery failures. Retries take place only when errors are transient.
librdkafka
already has a retry mechanism. Moreover, user-retried requests may lead to messages reordering or duplication. Nevertheless, the library retries a small list of delivery errors (such as message guaranteed timeouts), including errors those are not retried by librdkafka
and errors that may occure when the Kafka cluster or topic have just been created (for instance, in tests) Definition at line 59 of file producer.hpp.