userver: userver/kafka/components/consumer_component.hpp Source File
⚠️ This is the documentation for an old userver version. Click here to switch to the latest version.
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages Concepts
consumer_component.hpp
1#pragma once
2
3#include <userver/kafka/consumer_scope.hpp>
4
5#include <userver/components/loggable_component_base.hpp>
6#include <userver/utils/fast_pimpl.hpp>
7#include <userver/utils/statistics/entry.hpp>
8
9USERVER_NAMESPACE_BEGIN
10
11namespace kafka {
12
13namespace impl {
14
15class Consumer;
16
17} // namespace impl
18
19// clang-format off
20
21/// @ingroup userver_components
22///
23/// @brief Apache Kafka Consumer client component.
24///
25/// ## Static configuration example:
26///
27/// @snippet kafka/functional_tests/integrational_tests/static_config.yaml Kafka service sample - consumer static config
28///
29/// ## Secdist format
30///
31/// A Kafka alias in secdist is described as a JSON object
32/// `kafka_settings`, containing credentials of Kafka brokers.
33///
34/// @snippet kafka/functional_tests/integrational_tests/tests/conftest.py Kafka service sample - secdist
35///
36/// ## Static options:
37/// Name | Description | Default value
38/// ---------------------------------- | ------------------------------------------------ | ---------------
39/// group_id | consumer group id | --
40/// topics | list of topics consumer subscribes | --
41/// enable_auto_commit | whether to automatically and periodically commit offsets | false
42/// auto_offset_reset | action to take when there is no initial offset in offset store | --
43/// max_batch_size | maximum batch size for one callback call | --
44/// env_pod_name | environment variable to substitute `{pod_name}` substring in `group_id` | none
45/// security_protocol | protocol used to communicate with brokers | --
46/// sasl_mechanisms | SASL mechanism to use for authentication | none
47/// ssl_ca_location | File or directory path to CA certificate(s) for verifying the broker's key | 300000
48/// topic_metadata_refresh_interval_ms | period of time in milliseconds at which topic and broker metadata is refreshed | 900000
49/// metadata_max_age_ms | metadata cache max age | none
50
51// clang-format on
52
53class ConsumerComponent final : public components::LoggableComponentBase {
54 public:
55 static constexpr std::string_view kName = "kafka-consumer";
56
57 ConsumerComponent(const components::ComponentConfig& config,
58 const components::ComponentContext& context);
59 ~ConsumerComponent() override;
60
61 ConsumerScope GetConsumer();
62
63 static yaml_config::Schema GetStaticConfigSchema();
64
65 private:
66 static constexpr std::size_t kImplSize = 2480;
67 static constexpr std::size_t kImplAlign = 16;
68 utils::FastPimpl<impl::Consumer, kImplSize, kImplAlign> consumer_;
69
70 /// @note Subscriptions must be the last fields! Add new fields above this
71 /// comment.
72 utils::statistics::Entry statistics_holder_;
73};
74
75} // namespace kafka
76
77USERVER_NAMESPACE_END