userver: userver/kafka/components/consumer_component.hpp Source File
Loading...
Searching...
No Matches
consumer_component.hpp
1#pragma once
2
3#include <string_view>
4
5#include <userver/kafka/consumer_scope.hpp>
6
7#include <userver/components/component_base.hpp>
8#include <userver/utils/fast_pimpl.hpp>
9#include <userver/utils/statistics/entry.hpp>
10
11USERVER_NAMESPACE_BEGIN
12
13namespace kafka {
14
15namespace impl {
16
17class Consumer;
18
19} // namespace impl
20
21// clang-format off
22
23/// @ingroup userver_components
24///
25/// @brief Apache Kafka Consumer client component.
26///
27/// ## Static configuration example:
28///
29/// @snippet kafka/functional_tests/integrational_tests/static_config.yaml Kafka service sample - consumer static config
30///
31/// ## Secdist format
32///
33/// A Kafka alias in secdist is described as a JSON object
34/// `kafka_settings`, containing credentials of Kafka brokers.
35///
36/// @snippet kafka/functional_tests/integrational_tests/tests/conftest.py Kafka service sample - secdist
37///
38/// ## Static options:
39/// Name | Description | Default value
40/// ---------------------------------- | ------------------------------------------------ | ---------------
41/// group_id | consumer group id | --
42/// topics | list of topics consumer subscribes | --
43/// enable_auto_commit | whether to automatically and periodically commit offsets | false
44/// auto_offset_reset | action to take when there is no initial offset in offset store | smallest
45/// max_batch_size | maximum batch size for one callback call | --
46/// env_pod_name | environment variable to substitute `{pod_name}` substring in `group_id` | none
47/// security_protocol | protocol used to communicate with brokers | --
48/// sasl_mechanisms | SASL mechanism to use for authentication | none
49/// ssl_ca_location | File or directory path to CA certificate(s) for verifying the broker's key | 5m
50/// topic_metadata_refresh_interval | period of time at which topic and broker metadata is refreshed | 15m
51/// metadata_max_age | metadata cache max age | none
52/// rd_kafka_custom_options | a map of librdkafka library additional options | '{}'
53
54// clang-format on
55
56class ConsumerComponent final : public components::ComponentBase {
57 public:
58 static constexpr std::string_view kName = "kafka-consumer";
59
60 ConsumerComponent(const components::ComponentConfig& config,
61 const components::ComponentContext& context);
62 ~ConsumerComponent() override;
63
64 ConsumerScope GetConsumer();
65
66 static yaml_config::Schema GetStaticConfigSchema();
67
68 private:
69 static constexpr std::size_t kImplSize = 2480;
70 static constexpr std::size_t kImplAlign = 16;
71 utils::FastPimpl<impl::Consumer, kImplSize, kImplAlign> consumer_;
72
73 /// @note Subscriptions must be the last fields! Add new fields above this
74 /// comment.
75 utils::statistics::Entry statistics_holder_;
76};
77
78} // namespace kafka
79
80USERVER_NAMESPACE_END