userver: userver/kafka/components/producer_component.hpp Source File
Loading...
Searching...
No Matches
producer_component.hpp
1#pragma once
2
3#include <string_view>
4
5#include <userver/kafka/producer.hpp>
6
7#include <userver/components/loggable_component_base.hpp>
8#include <userver/utils/statistics/entry.hpp>
9
10USERVER_NAMESPACE_BEGIN
11
12namespace kafka {
13
14// clang-format off
15
16/// @ingroup userver_components
17///
18/// @brief Apache Kafka Producer client component.
19///
20/// ## Static configuration example:
21///
22/// @snippet kafka/functional_tests/integrational_tests/static_config.yaml Kafka service sample - producer static config
23///
24/// ## Secdist format
25///
26/// A Kafka alias in secdist is described as a JSON object
27/// `kafka_settings`, containing credentials of Kafka brokers.
28///
29/// @snippet kafka/functional_tests/integrational_tests/tests/conftest.py Kafka service sample - secdist
30///
31/// ## Static options:
32/// Name | Description | Default value
33/// ------------------------ | ------------------------------------------------ | ---------------
34/// delivery_timeout_ms | time a produced message waits for successful delivery | --
35/// queue_buffering_max_ms | delay to wait for messages to be transmitted to broker | --
36/// enable_idempotence | whether to make producer idempotent | false
37/// poll_timeout_ms | time in milliseconds producer waits for new delivery events | 10
38/// send_retries_count | how many times producer retries transient delivery errors | 5
39/// security_protocol | protocol used to communicate with brokers | --
40/// sasl_mechanisms | SASL mechanism to use for authentication | none
41/// ssl_ca_location | file or directory path to CA certificate(s) for verifying the broker's key | none
42
43// clang-format on
44
45class ProducerComponent final : public components::LoggableComponentBase {
46 public:
47 static constexpr std::string_view kName = "kafka-producer";
48
49 ProducerComponent(const components::ComponentConfig& config,
50 const components::ComponentContext& context);
51 ~ProducerComponent() override;
52
53 Producer& GetProducer();
54
55 static yaml_config::Schema GetStaticConfigSchema();
56
57 private:
58 Producer producer_;
59
60 /// @note Subscriptions must be the last fields! Add new fields above this
61 /// comment.
62 utils::statistics::Entry statistics_holder_;
63};
64
65} // namespace kafka
66
67USERVER_NAMESPACE_END