userver: userver/kafka/utest/kafka_fixture.hpp Source File
Loading...
Searching...
No Matches
kafka_fixture.hpp
1#pragma once
2
3#include <atomic>
4#include <chrono>
5#include <deque>
6#include <vector>
7
8#include <userver/kafka/headers.hpp>
9#include <userver/kafka/impl/broker_secrets.hpp>
10#include <userver/kafka/impl/configuration.hpp>
11#include <userver/kafka/impl/consumer.hpp>
12#include <userver/kafka/producer.hpp>
13#include <userver/utest/utest.hpp>
14#include <userver/utils/box.hpp>
15#include <userver/utils/span.hpp>
16USERVER_NAMESPACE_BEGIN
17
18namespace kafka::utest {
19
20/// @ingroup userver_utest
21///
22/// @brief Message owning data wrapper for unit tests.
23struct Message {
24 std::string topic;
25 std::string key;
26 std::string payload;
27 std::optional<std::uint32_t> partition{};
28 std::vector<OwningHeader> headers{};
29};
30
31bool operator==(const Message& lhs, const Message& rhs);
32
33/// @ingroup userver_utest
34///
35/// @brief Helper for Kafka unit testing.
36///
37/// KafkaCluster is useful for
38/// - inline producer, consumer and their configuration creation;
39/// - generation of unique topic names;
40/// - sending and receiving batched of messages in sync form;
41///
42/// KafkaCluster expects that topic names used in test are already exist in
43/// Kafka broker or auto-created on produce to it.
44///
45/// It is recommended to create all topics before tests started, because
46/// topic creation leads to disk IO operation and maybe too slow in tests'
47/// runtime.
48///
49/// If one run tests with Kafka testsuite plugin, use
50/// `TESTSUITE_KAFKA_CUSTOM_TOPICS` environment variable to create topics before
51/// tests started.
52///
53/// @note KafkaCluster determines the broker's connection string
54/// based on `TESTSUITE_KAFKA_SERVER_HOST` (by default `localhost`) and
55/// `TESTSUITE_KAFKA_SERVER_PORT` (by default `9099`) environment variables.
56///
57/// @see https://yandex.github.io/yandex-taxi-testsuite/kafka
58class KafkaCluster : public ::testing::Test {
59public:
60 /// Kafka broker has some cold start issues on its. To stabilize tests
61 /// KafkaCluster patches producer's default delivery timeout.
62 /// To use custom delivery timeout in test, pass `configuration` argument to
63 /// MakeProducer.
64 static constexpr const std::chrono::milliseconds kDefaultTestProducerTimeout{
65 USERVER_NAMESPACE::utest::kMaxTestWaitTime / 2
66 };
67
68 KafkaCluster();
69
70 ~KafkaCluster() override;
71
72 std::string GenerateTopic(std::uint32_t partition_cnt = 1);
73
74 std::vector<std::string> GenerateTopics(std::size_t count);
75
76 impl::Configuration MakeProducerConfiguration(
77 const std::string& name,
78 impl::ProducerConfiguration configuration = {},
79 impl::Secret secrets = {}
80 );
81
82 impl::Configuration MakeConsumerConfiguration(
83 const std::string& name,
84 impl::ConsumerConfiguration configuration = {},
85 impl::Secret secrets = {}
86 );
87
88 Producer MakeProducer(const std::string& name, impl::ProducerConfiguration configuration = {});
89
90 std::deque<Producer> MakeProducers(
91 std::size_t count,
92 std::function<std::string(std::size_t)> name_generator,
93 impl::ProducerConfiguration configuration = {}
94 );
95
96 /// @brief Creates temporary producer and send messages span.
97 void SendMessages(utils::span<const Message> messages);
98
99 impl::Consumer MakeConsumer(
100 const std::string& name,
101 const std::vector<std::string>& topics,
102 impl::ConsumerConfiguration configuration = {},
103 impl::ConsumerExecutionParams params = {}
104 );
105
106 /// @brief Starts consumer, waits until `expected_message_count` messages are
107 /// consumed, calls `user_callback` if set, stops consumer.
109 impl::Consumer& consumer,
110 std::size_t expected_messages_count,
111 bool commit_after_receive = true,
112 std::optional<std::function<void(MessageBatchView)>> user_callback = {}
113 );
114
115 /// @brief The same as previous, but working with consumer_scope passed to function.
117 ConsumerScope& consumer,
118 std::size_t expected_messages_count,
119 bool commit_after_receive = true,
120 std::optional<std::function<void(MessageBatchView)>> user_callback = {}
121 );
122
123private:
124 impl::Secret AddBootstrapServers(impl::Secret secrets) const;
125 std::string InitBootstrapServers();
126
127private:
128 static std::atomic<std::size_t> kTopicsCount;
129 class MockCluster;
130 utils::Box<MockCluster> mock_;
131 const std::string bootstrap_servers_;
132};
133} // namespace kafka::utest
134
135USERVER_NAMESPACE_END