userver: userver/kafka/utest/kafka_fixture.hpp Source File
Loading...
Searching...
No Matches
kafka_fixture.hpp
1#pragma once
2
3#include <atomic>
4#include <chrono>
5#include <deque>
6#include <vector>
7
8#include <userver/kafka/impl/broker_secrets.hpp>
9#include <userver/kafka/impl/configuration.hpp>
10#include <userver/kafka/impl/consumer.hpp>
11#include <userver/kafka/producer.hpp>
12#include <userver/utest/utest.hpp>
13#include <userver/utils/span.hpp>
14
15USERVER_NAMESPACE_BEGIN
16
17namespace kafka::utest {
18
19/// @ingroup userver_utest
20///
21/// @brief Message owning data wrapper for unit tests.
22struct Message {
23 std::string topic;
24 std::string key;
25 std::string payload;
26 std::optional<std::uint32_t> partition;
27};
28
29bool operator==(const Message& lhs, const Message& rhs);
30
31/// @ingroup userver_utest
32///
33/// @brief Helper for Kafka unit testing.
34///
35/// KafkaCluster is useful for
36/// - inline producer, consumer and their configuration creation;
37/// - generation of unique topic names;
38/// - sending and receiving batched of messages in sync form;
39///
40/// KafkaCluster expects that topic names used in test are already exist in
41/// Kafka broker or auto-created on produce to it.
42///
43/// It is recommended to create all topics before tests started, because
44/// topic creation leads to disk IO operation and maybe too slow in tests'
45/// runtime.
46///
47/// If one run tests with Kafka testsuite plugin, use
48/// `TESTSUITE_KAFKA_CUSTOM_TOPICS` environment variable to create topics before
49/// tests started.
50///
51/// @note KafkaCluster determines the broker's connection string
52/// based on `TESTSUITE_KAFKA_SERVER_HOST` (by default `localhost`) and
53/// `TESTSUITE_KAFKA_SERVER_PORT` (by default `9099`) environment variables.
54///
55/// @see https://yandex.github.io/yandex-taxi-testsuite/kafka
56class KafkaCluster : public ::testing::Test {
57public:
58 /// Kafka broker has some cold start issues on its. To stabilize tests
59 /// KafkaCluster patches producer's default delivery timeout.
60 /// To use custom delivery timeout in test, pass `configuration` argument to
61 /// MakeProducer.
62 static constexpr const std::chrono::milliseconds kDefaultTestProducerTimeout{
63 USERVER_NAMESPACE::utest::kMaxTestWaitTime / 2};
64
65 KafkaCluster();
66
67 ~KafkaCluster() override = default;
68
69 std::string GenerateTopic();
70
71 std::vector<std::string> GenerateTopics(std::size_t count);
72
73 impl::Configuration MakeProducerConfiguration(
74 const std::string& name,
75 impl::ProducerConfiguration configuration = {},
76 impl::Secret secrets = {}
77 );
78
79 impl::Configuration MakeConsumerConfiguration(
80 const std::string& name,
81 impl::ConsumerConfiguration configuration = {},
82 impl::Secret secrets = {}
83 );
84
85 Producer MakeProducer(const std::string& name, impl::ProducerConfiguration configuration = {});
86
87 std::deque<Producer> MakeProducers(
88 std::size_t count,
89 std::function<std::string(std::size_t)> nameGenerator,
90 impl::ProducerConfiguration configuration = {}
91 );
92
93 /// @brief Creates temporary producer and send messages span.
94 void SendMessages(utils::span<const Message> messages);
95
96 impl::Consumer MakeConsumer(
97 const std::string& name,
98 const std::vector<std::string>& topics,
99 impl::ConsumerConfiguration configuration = {},
100 impl::ConsumerExecutionParams params = {}
101 );
102
103 /// @brief Starts consumer, waits until `expected_message_count` messages are
104 /// consumed, calls `user_callback` if set, stops consumer.
108 bool commit_after_receive = true,
110 );
111
112private:
113 impl::Secret AddBootstrapServers(impl::Secret secrets) const;
114
115private:
116 std::atomic<std::size_t> topics_count_{0};
117
118 const std::string bootstrap_servers_;
119};
120
121} // namespace kafka::utest
122
123USERVER_NAMESPACE_END