userver: userver/kafka/components/consumer_component.hpp Source File
Loading...
Searching...
No Matches
consumer_component.hpp
1#pragma once
2
3#include <userver/kafka/consumer_scope.hpp>
4
5#include <userver/components/loggable_component_base.hpp>
6#include <userver/utils/fast_pimpl.hpp>
7#include <userver/utils/statistics/entry.hpp>
8
9USERVER_NAMESPACE_BEGIN
10
11namespace kafka {
12
13namespace impl {
14
15class Consumer;
16
17} // namespace impl
18
19// clang-format off
20
21/// @ingroup userver_components
22///
23/// @brief Apache Kafka Consumer client component.
24///
25/// ## Static configuration example:
26///
27/// @snippet kafka/functional_tests/integrational_tests/static_config.yaml Kafka service sample - consumer static config
28///
29/// ## Secdist format
30///
31/// A Kafka alias in secdist is described as a JSON object
32/// `kafka_settings`, containing credentials of Kafka brokers.
33///
34/// @snippet kafka/functional_tests/integrational_tests/tests/conftest.py Kafka service sample - secdist
35///
36/// ## Static options:
37/// Name | Description | Default value
38/// ---------------------------------- | ------------------------------------------------ | ---------------
39/// group_id | consumer group id | --
40/// topics | list of topics consumer subscribes | --
41/// enable_auto_commit | whether to automatically and periodically commit offsets | false
42/// auto_offset_reset | action to take when there is no initial offset in offset store | --
43/// max_batch_size | maximum batch size for one callback call | --
44/// env_pod_name | environment variable to substitute `{pod_name}` substring in `group_id` | none
45/// security_protocol | protocol used to communicate with brokers | --
46/// sasl_mechanisms | SASL mechanism to use for authentication | none
47/// ssl_ca_location | File or directory path to CA certificate(s) for verifying the broker's key | 300000
48/// topic_metadata_refresh_interval_ms | period of time in milliseconds at which topic and broker metadata is refreshed | 900000
49/// metadata_max_age_ms | metadata cache max age | none
50
51// clang-format on
52
53class ConsumerComponent final : public components::LoggableComponentBase {
54 public:
55 static constexpr std::string_view kName = "kafka-consumer";
56
57 ConsumerComponent(const components::ComponentConfig& config,
58 const components::ComponentContext& context);
59 ~ConsumerComponent() override;
60
61 ConsumerScope GetConsumer();
62
63 static yaml_config::Schema GetStaticConfigSchema();
64
65 private:
66 static constexpr std::size_t kImplSize = 2480;
67 static constexpr std::size_t kImplAlign = 16;
68 utils::FastPimpl<impl::Consumer, kImplSize, kImplAlign> consumer_;
69
70 /// @note Subscriptions must be the last fields! Add new fields above this
71 /// comment.
72 utils::statistics::Entry statistics_holder_;
73};
74
75} // namespace kafka
76
77USERVER_NAMESPACE_END