userver: userver/kafka/components/producer_component.hpp Source File
Loading...
Searching...
No Matches
producer_component.hpp
1#pragma once
2
3#include <string_view>
4
5#include <userver/kafka/producer.hpp>
6
7#include <userver/components/component_base.hpp>
8#include <userver/utils/statistics/entry.hpp>
9
10USERVER_NAMESPACE_BEGIN
11
12namespace kafka {
13
14// clang-format off
15
16/// @ingroup userver_components
17///
18/// @brief Apache Kafka Producer client component.
19///
20/// ## Static configuration example:
21///
22/// @snippet kafka/functional_tests/integrational_tests/static_config.yaml Kafka service sample - producer static config
23///
24/// ## Secdist format
25///
26/// A Kafka alias in secdist is described as a JSON object
27/// `kafka_settings`, containing credentials of Kafka brokers.
28///
29/// @snippet kafka/functional_tests/integrational_tests/tests/conftest.py Kafka service sample - secdist
30///
31/// ## Static options:
32/// Name | Description | Default value
33/// ------------------------ | ------------------------------------------------ | ---------------
34/// delivery_timeout | time a produced message waits for successful delivery | --
35/// queue_buffering_max | delay to wait for messages to be transmitted to broker | --
36/// enable_idempotence | whether to make producer idempotent | false
37/// poll_timeout | time producer waits for new delivery events | 10ms
38/// send_retries_count | how many times producer retries transient delivery errors | 5
39/// security_protocol | protocol used to communicate with brokers | --
40/// sasl_mechanisms | SASL mechanism to use for authentication | none
41/// ssl_ca_location | file or directory path to CA certificate(s) for verifying the broker's key | none
42/// rd_kafka_custom_options | a map of librdkafka library additional options | '{}'
43
44// clang-format on
45
46class ProducerComponent final : public components::ComponentBase {
47 public:
48 static constexpr std::string_view kName = "kafka-producer";
49
50 ProducerComponent(const components::ComponentConfig& config,
51 const components::ComponentContext& context);
52 ~ProducerComponent() override;
53
54 Producer& GetProducer();
55
56 static yaml_config::Schema GetStaticConfigSchema();
57
58 private:
59 Producer producer_;
60
61 /// @note Subscriptions must be the last fields! Add new fields above this
62 /// comment.
63 utils::statistics::Entry statistics_holder_;
64};
65
66} // namespace kafka
67
68USERVER_NAMESPACE_END