userver
C++ Async Framework
Loading...
Searching...
No Matches
topic_writer.hpp
Go to the documentation of this file.
1
#
pragma
once
2
3
/// @file userver/ydb/topic_writer.hpp
4
/// @brief YDB Topic writer interface and implementation
5
6
#
include
<
memory
>
7
#
include
<
optional
>
8
#
include
<
string
>
9
#
include
<
unordered_map
>
10
#
include
<
vector
>
11
12
#
include
<
userver
/
compiler
/
impl
/
lifetime
.
hpp
>
13
#
include
<
userver
/
utils
/
impl
/
transparent_hash
.
hpp
>
14
#
include
<
userver
/
utils
/
statistics
/
fwd
.
hpp
>
15
16
#
include
<
userver
/
ydb
/
topic
.
hpp
>
17
#
include
<
userver
/
ydb
/
topic_writer_types
.
hpp
>
18
19
USERVER_NAMESPACE_BEGIN
20
21
// Forward declaration
22
namespace
ydb::impl {
23
class
TopicWriterWorker;
24
}
25
26
namespace
ydb {
27
28
/// @brief Settings for TopicWriter
29
struct
TopicWriterSettings
{
30
/// Maximum number of workers allowed per topic writer
31
static
constexpr
std::size_t
kMaxWorkers
= 100;
32
33
/// @brief Level of the statistics provided to monitoring
34
enum
class
StatisticLevel
{
35
/// Consolidated stats for whole topic writer (sum of stats for all workers
36
/// within this writer)
37
kWriter
,
38
/// Stats for all workers individually
39
kWorker
40
};
41
42
/// Topic client for communication with YDB. Note that TopicClient
43
/// is a client to the topic service, not a client to a specific topic.
44
std::shared_ptr<ydb::TopicClient>
topic_client
;
45
/// Topic name to write messages into
46
std::string
topic_name
;
47
/// Number of workers used to write messages into the topic. Messages are
48
/// distributed across workers in round-robin fashion. Increasing this
49
/// value improves throughput but does not preserve message ordering.
50
std::size_t
workers_num
{1};
51
/// Maximum number of messages that can be buffered in the incoming queue
52
/// before WriteMessage starts returning kResourceExhausted
53
std::size_t
max_incoming_msg_queue_size
{100};
54
/// Maximum number of control events from YDB that can be buffered
55
/// before the worker stops reading new events from the session
56
std::size_t
max_ydb_control_event_queue_size
{10};
57
58
/// Desired level for collected statistics
59
StatisticLevel
stats_level
{
StatisticLevel
::
kWriter
};
60
};
61
62
/// @brief Base interface for writing messages to a YDB topic
63
class
TopicWriterBase
{
64
public
:
65
/// @brief Write a message to YDB
66
///
67
/// @param[in] message message data to be sent to the topic
68
/// @param[in] meta key-value metadata to attach to the message
69
///
70
/// @return result of the message handling
71
[[
nodiscard
]]
virtual
TopicWriteResult
WriteMessage
(std::string message,
const
TopicWriterMetadata& meta) = 0;
72
73
TopicWriterBase() =
default
;
74
TopicWriterBase(
const
TopicWriterBase
&) =
delete
;
75
TopicWriterBase(
TopicWriterBase
&&) =
delete
;
76
TopicWriterBase
& operator=(
const
TopicWriterBase
&) =
delete
;
77
TopicWriterBase
& operator=(
TopicWriterBase
&&) =
delete
;
78
virtual
~TopicWriterBase() =
default
;
79
};
80
81
/// @brief Takes messages provided by the user and sends them to a YDB topic
82
///
83
/// This class can be created directly, or you can use TopicWriterManager. If
84
/// created via TopicWriterManager, it will take care of collecting and writing
85
/// statistics for you. Otherwise, behaviour is unchanged.
86
class
TopicWriter
final
:
public
TopicWriterBase
{
87
public
:
88
/// @brief Creates a new topic writer
89
///
90
/// @param[in] name identifier used for statistics labeling; should be
91
/// unique across the service to avoid metric collisions
92
/// @param[in] settings controls which topic to write into and tuning
93
/// parameters such as worker count and queue sizes
94
TopicWriter
(std::string name,
TopicWriterSettings
settings);
95
96
/// @brief Writes a message to YDB
97
///
98
/// Internally a worker is selected for the message sending in round-robin
99
/// fashion across all configured workers.
100
[[nodiscard]]
TopicWriteResult
WriteMessage
(std::string message,
const
TopicWriterMetadata& meta)
override
;
101
102
private
:
103
friend
void
DumpMetric
(
utils
::
statistics
::Writer& writer,
const
TopicWriter& topic_writer);
104
105
// Alias for topic. This is configuration name from component
106
// config and it will also be used for statistics.
107
const
std::string name_;
108
109
const
TopicWriterSettings
settings_;
110
std::shared_ptr<ydb::TopicClient> topic_client_;
111
// for round robin algorithm
112
std::atomic<std::size_t> current_worker_{0};
113
std::vector<std::shared_ptr<impl::TopicWriterWorker>> workers_;
114
};
115
116
/// @brief Manages a collection of named YDB topic writers
117
///
118
/// Provides access to individual TopicWriter instances by name,
119
/// and aggregates their statistics.
120
class
TopicWriterManager
final
{
121
public
:
122
/// @brief Constructs a TopicWriterManager with the given per-topic settings
123
/// @param settings map from topic writer name to its settings; the keys
124
/// become the names used to look up writers via GetTopicWriter
125
explicit
TopicWriterManager
(std::unordered_map<std::string, TopicWriterSettings> settings);
126
127
/// @brief Returns the topic writer for the provided name
128
/// @param name key that was used in the settings map passed to the
129
/// constructor; determines which topic writer is returned
130
/// @return reference to the particular topic writer, or throws an exception
131
/// no topic writer with the specified name
132
TopicWriter&
GetTopicWriter
(std::string_view name) USERVER_IMPL_LIFETIME_BOUND;
133
134
private
:
135
friend
void
DumpMetric
(
utils
::
statistics
::Writer& writer,
const
TopicWriterManager& manager);
136
137
utils::impl::TransparentMap<std::string, std::shared_ptr<TopicWriter>> topic_writers_;
138
};
139
140
/// @brief Dumps aggregated metrics for all topic writers into the statistics writer
141
/// @param writer statistics writer to dump into
142
/// @param manager the topic writer manager whose metrics to dump
143
void
DumpMetric
(
utils
::
statistics
::Writer& writer,
const
TopicWriterManager& manager);
144
145
/// @brief Dumps metrics for a TopicWriter into the statistics writer
146
/// @param writer statistics writer to dump into
147
/// @param topic_writer the topic writer whose metrics to dump
148
void
DumpMetric
(
utils
::
statistics
::Writer& writer,
const
TopicWriter& topic_writer);
149
150
}
// namespace ydb
151
152
USERVER_NAMESPACE_END
userver
ydb
topic_writer.hpp
Generated on Tue Jun 30 2026 13:18:49 for userver by
Doxygen
1.13.2