userver: userver/kafka/utest/kafka_fixture.hpp Source File
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages Concepts
kafka_fixture.hpp
1#pragma once
2
3#include <atomic>
4#include <chrono>
5#include <deque>
6#include <vector>
7
8#include <userver/kafka/headers.hpp>
9#include <userver/kafka/impl/broker_secrets.hpp>
10#include <userver/kafka/impl/configuration.hpp>
11#include <userver/kafka/impl/consumer.hpp>
12#include <userver/kafka/producer.hpp>
13#include <userver/utest/utest.hpp>
14#include <userver/utils/span.hpp>
15
16USERVER_NAMESPACE_BEGIN
17
18namespace kafka::utest {
19
20/// @ingroup userver_utest
21///
22/// @brief Message owning data wrapper for unit tests.
23struct Message {
24 std::string topic;
25 std::string key;
26 std::string payload;
27 std::optional<std::uint32_t> partition{};
28 std::vector<OwningHeader> headers{};
29};
30
31bool operator==(const Message& lhs, const Message& rhs);
32
33/// @ingroup userver_utest
34///
35/// @brief Helper for Kafka unit testing.
36///
37/// KafkaCluster is useful for
38/// - inline producer, consumer and their configuration creation;
39/// - generation of unique topic names;
40/// - sending and receiving batched of messages in sync form;
41///
42/// KafkaCluster expects that topic names used in test are already exist in
43/// Kafka broker or auto-created on produce to it.
44///
45/// It is recommended to create all topics before tests started, because
46/// topic creation leads to disk IO operation and maybe too slow in tests'
47/// runtime.
48///
49/// If one run tests with Kafka testsuite plugin, use
50/// `TESTSUITE_KAFKA_CUSTOM_TOPICS` environment variable to create topics before
51/// tests started.
52///
53/// @note KafkaCluster determines the broker's connection string
54/// based on `TESTSUITE_KAFKA_SERVER_HOST` (by default `localhost`) and
55/// `TESTSUITE_KAFKA_SERVER_PORT` (by default `9099`) environment variables.
56///
57/// @see https://yandex.github.io/yandex-taxi-testsuite/kafka
58class KafkaCluster : public ::testing::Test {
59public:
60 /// Kafka broker has some cold start issues on its. To stabilize tests
61 /// KafkaCluster patches producer's default delivery timeout.
62 /// To use custom delivery timeout in test, pass `configuration` argument to
63 /// MakeProducer.
64 static constexpr const std::chrono::milliseconds kDefaultTestProducerTimeout{
65 USERVER_NAMESPACE::utest::kMaxTestWaitTime / 2};
66
67 KafkaCluster();
68
69 ~KafkaCluster() override = default;
70
71 std::string GenerateTopic();
72
73 std::vector<std::string> GenerateTopics(std::size_t count);
74
75 impl::Configuration MakeProducerConfiguration(
76 const std::string& name,
77 impl::ProducerConfiguration configuration = {},
78 impl::Secret secrets = {}
79 );
80
81 impl::Configuration MakeConsumerConfiguration(
82 const std::string& name,
83 impl::ConsumerConfiguration configuration = {},
84 impl::Secret secrets = {}
85 );
86
87 Producer MakeProducer(const std::string& name, impl::ProducerConfiguration configuration = {});
88
89 std::deque<Producer> MakeProducers(
90 std::size_t count,
91 std::function<std::string(std::size_t)> nameGenerator,
92 impl::ProducerConfiguration configuration = {}
93 );
94
95 /// @brief Creates temporary producer and send messages span.
96 void SendMessages(utils::span<const Message> messages);
97
98 impl::Consumer MakeConsumer(
99 const std::string& name,
100 const std::vector<std::string>& topics,
101 impl::ConsumerConfiguration configuration = {},
102 impl::ConsumerExecutionParams params = {}
103 );
104
105 /// @brief Starts consumer, waits until `expected_message_count` messages are
106 /// consumed, calls `user_callback` if set, stops consumer.
108 impl::Consumer& consumer,
109 std::size_t expected_messages_count,
110 bool commit_after_receive = true,
111 std::optional<std::function<void(MessageBatchView)>> user_callback = {}
112 );
113
114private:
115 impl::Secret AddBootstrapServers(impl::Secret secrets) const;
116
117private:
118 static std::atomic<std::size_t> kTopicsCount;
119
120 const std::string bootstrap_servers_;
121};
122
123} // namespace kafka::utest
124
125USERVER_NAMESPACE_END