userver: userver/kafka/producer_component.hpp Source File
Loading...
Searching...
No Matches
producer_component.hpp
1#pragma once
2
3#include <string_view>
4
5#include <userver/kafka/producer.hpp>
6
7#include <userver/components/component_base.hpp>
8#include <userver/utils/statistics/entry.hpp>
9
10USERVER_NAMESPACE_BEGIN
11
12namespace kafka {
13
14// clang-format off
15
16/// @ingroup userver_components
17///
18/// @brief Apache Kafka Producer client component.
19///
20/// ## Static configuration example:
21///
22/// @snippet samples/kafka_service/static_config.yaml Kafka service sample - producer static config
23///
24/// ## Secdist format
25///
26/// A Kafka alias in secdist is described as a JSON object
27/// `kafka_settings`, containing credentials of Kafka brokers.
28///
29/// @snippet samples/kafka_service/testsuite/conftest.py Kafka service sample - secdist
30///
31/// ## Static options:
32/// Name | Description | Default value
33/// ---------------------------- | ------------------------------------------------ | ---------------
34/// client_id | Client identifier. May be an arbitrary string | userver
35/// delivery_timeout | time a produced message waits for successful delivery | --
36/// queue_buffering_max | delay to wait for messages to be transmitted to broker | --
37/// enable_idempotence | whether to make producer idempotent | false
38/// queue_buffering_max_messages | maximum number of messages waiting for delivery | 100000
39/// queue_buffering_max_kbytes | maximum size of messages waiting for delivery | 1048576
40/// message_max_bytes | maximum size of message | 1000000
41/// message_send_max_retries | maximum number of send request retries until `delivery_timeout` reached | 2147483647
42/// retry_backoff | backoff time before retrying send request, exponentially increases after each retry | 100
43/// retry_backoff_max | backoff upper bound | 1000
44/// security_protocol | protocol used to communicate with brokers | --
45/// sasl_mechanisms | SASL mechanism to use for authentication | none
46/// ssl_ca_location | file or directory path to CA certificate(s) for verifying the broker's key | none
47/// rd_kafka_custom_options | a map of librdkafka library additional options | '{}'
48
49// clang-format on
50
51class ProducerComponent final : public components::ComponentBase {
52public:
53 /// @ingroup userver_component_names
54 /// @brief The default name of kafka::ProducerComponent component
55 static constexpr std::string_view kName{"kafka-producer"};
56
57 ProducerComponent(const components::ComponentConfig& config, const components::ComponentContext& context);
58 ~ProducerComponent() override;
59
60 /// @brief Returns a producer instance reference.
61 /// @see kafka::Producer
62 const Producer& GetProducer();
63
64 static yaml_config::Schema GetStaticConfigSchema();
65
66private:
67 Producer producer_;
68
69 /// @note Subscriptions must be the last fields! Add new fields above this
70 /// comment.
71 utils::statistics::Entry statistics_holder_;
72};
73
74} // namespace kafka
75
76USERVER_NAMESPACE_END