userver: userver/kafka/consumer_component.hpp Source File
Loading...
Searching...
No Matches
consumer_component.hpp
1#pragma once
2
3#include <string_view>
4
5#include <userver/kafka/consumer_scope.hpp>
6
7#include <userver/components/component_base.hpp>
8#include <userver/utils/fast_pimpl.hpp>
9#include <userver/utils/statistics/entry.hpp>
10
11USERVER_NAMESPACE_BEGIN
12
13namespace kafka {
14
15namespace impl {
16
17class Consumer;
18
19} // namespace impl
20
21// clang-format off
22
23/// @ingroup userver_components
24///
25/// @brief Apache Kafka Consumer client component.
26///
27/// ## Static configuration example:
28///
29/// @snippet samples/kafka_service/static_config.yaml Kafka service sample - consumer static config
30///
31/// ## Secdist format
32///
33/// A Kafka alias in secdist is described as a JSON object
34/// `kafka_settings`, containing credentials of Kafka brokers.
35///
36/// @snippet samples/kafka_service/testsuite/conftest.py Kafka service sample - secdist
37///
38/// ## Static options:
39/// Name | Description | Default value
40/// ---------------------------------- | ------------------------------------------------ | ---------------
41/// client_id | Client identifier. May be an arbitrary string | userver
42/// group_id | consumer group id (name) | --
43/// topics | list of topics consumer subscribes | --
44/// max_batch_size | maximum number of messages consumer waits for new message before calling a callback | 1
45/// poll_timeout | maximum amount of time consumer waits for messages for new messages before calling a callback | 1s
46/// max_callback_duration | duration user callback must fit not to be kicked from the consumer group | 5m
47/// restart_after_failure_delay | time consumer suspends execution if user-callback fails | 10s
48/// auto_offset_reset | action to take when there is no initial offset in offset store | smallest
49/// env_pod_name | environment variable to substitute `{pod_name}` substring in `group_id` | none
50/// security_protocol | protocol used to communicate with brokers | --
51/// sasl_mechanisms | SASL mechanism to use for authentication | none
52/// ssl_ca_location | File or directory path to CA certificate(s) for verifying the broker's key | none
53/// topic_metadata_refresh_interval | period of time at which topic and broker metadata is refreshed | 5m
54/// metadata_max_age | metadata cache max age | 15
55/// rd_kafka_custom_options | a map of librdkafka library additional options | '{}'
56
57// clang-format on
58
59class ConsumerComponent final : public components::ComponentBase {
60public:
61 /// @ingroup userver_component_names
62 /// @brief The default name of kafka::ConsumerComponent component
63 static constexpr std::string_view kName{"kafka-consumer"};
64
65 ConsumerComponent(const components::ComponentConfig& config, const components::ComponentContext& context);
66 ~ConsumerComponent() override;
67
68 /// @brief Returns consumer instance.
69 /// @see kafka::ConsumerScope
70 ConsumerScope GetConsumer();
71
72 static yaml_config::Schema GetStaticConfigSchema();
73
74private:
75 static constexpr std::size_t kImplSize = 2480;
76 static constexpr std::size_t kImplAlign = 16;
77 utils::FastPimpl<impl::Consumer, kImplSize, kImplAlign> consumer_;
78
79 /// @note Subscriptions must be the last fields! Add new fields above this
80 /// comment.
81 utils::statistics::Entry statistics_holder_;
82};
83
84} // namespace kafka
85
86USERVER_NAMESPACE_END