8#include <userver/kafka/headers.hpp>
9#include <userver/kafka/impl/broker_secrets.hpp>
10#include <userver/kafka/impl/configuration.hpp>
11#include <userver/kafka/impl/consumer.hpp>
12#include <userver/kafka/producer.hpp>
13#include <userver/utest/utest.hpp>
14#include <userver/utils/box.hpp>
15#include <userver/utils/span.hpp>
16USERVER_NAMESPACE_BEGIN
18namespace kafka::utest {
27 std::optional<std::uint32_t> partition{};
28 std::vector<OwningHeader> headers{};
65 USERVER_NAMESPACE::utest::kMaxTestWaitTime / 2};
69 ~KafkaCluster() override;
71 std::string GenerateTopic(std::uint32_t partition_cnt = 1);
73 std::vector<std::string> GenerateTopics(std::size_t count);
75 impl::Configuration MakeProducerConfiguration(
76 const std::string& name,
77 impl::ProducerConfiguration configuration = {},
78 impl::Secret secrets = {}
81 impl::Configuration MakeConsumerConfiguration(
82 const std::string& name,
83 impl::ConsumerConfiguration configuration = {},
84 impl::Secret secrets = {}
87 Producer MakeProducer(
const std::string& name, impl::ProducerConfiguration configuration = {});
89 std::deque<Producer> MakeProducers(
91 std::function<std::string(std::size_t)> nameGenerator,
92 impl::ProducerConfiguration configuration = {}
98 impl::Consumer MakeConsumer(
99 const std::string& name,
100 const std::vector<std::string>& topics,
101 impl::ConsumerConfiguration configuration = {},
102 impl::ConsumerExecutionParams params = {}
108 impl::Consumer& consumer,
109 std::size_t expected_messages_count,
110 bool commit_after_receive =
true,
111 std::optional<std::function<
void(MessageBatchView)>> user_callback = {}
116 ConsumerScope& consumer,
117 std::size_t expected_messages_count,
118 bool commit_after_receive =
true,
119 std::optional<std::function<
void(MessageBatchView)>> user_callback = {}
123 impl::Secret AddBootstrapServers(impl::Secret secrets)
const;
124 std::string InitBootstrapServers();
127 static std::atomic<std::size_t> kTopicsCount;
129 utils::
Box<MockCluster> mock_;
130 const std::string bootstrap_servers_;