userver: userver/ydb/topic_writer.hpp Source File
Loading...
Searching...
No Matches
topic_writer.hpp
Go to the documentation of this file.
1#pragma once
2
3/// @file userver/ydb/topic_writer.hpp
4/// @brief YDB Topic writer interface and implementation
5
6#include <memory>
7#include <optional>
8#include <string>
9#include <unordered_map>
10#include <vector>
11
12#include <userver/compiler/impl/lifetime.hpp>
13#include <userver/utils/impl/transparent_hash.hpp>
14#include <userver/utils/statistics/fwd.hpp>
15
16#include <userver/ydb/topic.hpp>
17#include <userver/ydb/topic_writer_types.hpp>
18
19USERVER_NAMESPACE_BEGIN
20
21// Forward declaration
22namespace ydb::impl {
23class TopicWriterWorker;
24}
25
26namespace ydb {
27
28/// @brief Settings for TopicWriter
30 /// Maximum number of workers allowed per topic writer
31 static constexpr std::size_t kMaxWorkers = 100;
32
33 /// @brief Level of the statistics provided to monitoring
34 enum class StatisticLevel {
35 /// Consolidated stats for whole topic writer (sum of stats for all workers
36 /// within this writer)
38 /// Stats for all workers individually
40 };
41
42 /// Topic client for communication with YDB. Note that TopicClient
43 /// is a client to the topic service, not a client to a specific topic.
44 std::shared_ptr<ydb::TopicClient> topic_client;
45 /// Topic name to write messages into
46 std::string topic_name;
47 /// Number of workers used to write messages into the topic. Messages are
48 /// distributed across workers in round-robin fashion. Increasing this
49 /// value improves throughput but does not preserve message ordering.
50 std::size_t workers_num{1};
51 /// Maximum number of messages that can be buffered in the incoming queue
52 /// before WriteMessage starts returning kResourceExhausted
54 /// Maximum number of control events from YDB that can be buffered
55 /// before the worker stops reading new events from the session
57
58 /// Desired level for collected statistics
60};
61
62/// @brief Base interface for writing messages to a YDB topic
64public:
65 /// @brief Write a message to YDB
66 ///
67 /// @param[in] message message data to be sent to the topic
68 /// @param[in] meta key-value metadata to attach to the message
69 ///
70 /// @return result of the message handling
71 [[nodiscard]] virtual TopicWriteResult WriteMessage(std::string message, const TopicWriterMetadata& meta) = 0;
72
73 TopicWriterBase() = default;
74 TopicWriterBase(const TopicWriterBase&) = delete;
75 TopicWriterBase(TopicWriterBase&&) = delete;
76 TopicWriterBase& operator=(const TopicWriterBase&) = delete;
77 TopicWriterBase& operator=(TopicWriterBase&&) = delete;
78 virtual ~TopicWriterBase() = default;
79};
80
81/// @brief Takes messages provided by the user and sends them to a YDB topic
82///
83/// This class can be created directly, or you can use TopicWriterManager. If
84/// created via TopicWriterManager, it will take care of collecting and writing
85/// statistics for you. Otherwise, behaviour is unchanged.
86class TopicWriter final : public TopicWriterBase {
87public:
88 /// @brief Creates a new topic writer
89 ///
90 /// @param[in] name identifier used for statistics labeling; should be
91 /// unique across the service to avoid metric collisions
92 /// @param[in] settings controls which topic to write into and tuning
93 /// parameters such as worker count and queue sizes
94 TopicWriter(std::string name, TopicWriterSettings settings);
95
96 /// @brief Writes a message to YDB
97 ///
98 /// Internally a worker is selected for the message sending in round-robin
99 /// fashion across all configured workers.
100 [[nodiscard]] TopicWriteResult WriteMessage(std::string message, const TopicWriterMetadata& meta) override;
101
102private:
103 friend void DumpMetric(utils::statistics::Writer& writer, const TopicWriter& topic_writer);
104
105 // Alias for topic. This is configuration name from component
106 // config and it will also be used for statistics.
107 const std::string name_;
108
109 const TopicWriterSettings settings_;
110 std::shared_ptr<ydb::TopicClient> topic_client_;
111 // for round robin algorithm
112 std::atomic<std::size_t> current_worker_{0};
113 std::vector<std::shared_ptr<impl::TopicWriterWorker>> workers_;
114};
115
116/// @brief Manages a collection of named YDB topic writers
117///
118/// Provides access to individual TopicWriter instances by name,
119/// and aggregates their statistics.
120class TopicWriterManager final {
121public:
122 /// @brief Constructs a TopicWriterManager with the given per-topic settings
123 /// @param settings map from topic writer name to its settings; the keys
124 /// become the names used to look up writers via GetTopicWriter
125 explicit TopicWriterManager(std::unordered_map<std::string, TopicWriterSettings> settings);
126
127 /// @brief Returns the topic writer for the provided name
128 /// @param name key that was used in the settings map passed to the
129 /// constructor; determines which topic writer is returned
130 /// @return reference to the particular topic writer, or throws an exception
131 /// no topic writer with the specified name
132 TopicWriter& GetTopicWriter(std::string_view name) USERVER_IMPL_LIFETIME_BOUND;
133
134private:
135 friend void DumpMetric(utils::statistics::Writer& writer, const TopicWriterManager& manager);
136
137 utils::impl::TransparentMap<std::string, std::shared_ptr<TopicWriter>> topic_writers_;
138};
139
140/// @brief Dumps aggregated metrics for all topic writers into the statistics writer
141/// @param writer statistics writer to dump into
142/// @param manager the topic writer manager whose metrics to dump
143void DumpMetric(utils::statistics::Writer& writer, const TopicWriterManager& manager);
144
145/// @brief Dumps metrics for a TopicWriter into the statistics writer
146/// @param writer statistics writer to dump into
147/// @param topic_writer the topic writer whose metrics to dump
148void DumpMetric(utils::statistics::Writer& writer, const TopicWriter& topic_writer);
149
150} // namespace ydb
151
152USERVER_NAMESPACE_END