userver: userver/ydb/topic.hpp Source File
Loading...
Searching...
No Matches
topic.hpp
Go to the documentation of this file.
1#pragma once
2
3/// @file userver/ydb/topic.hpp
4/// @brief YDB Topic client
5
6#include <chrono>
7#include <memory>
8#include <optional>
9#include <string>
10
11#include <ydb-cpp-sdk/client/topic/client.h>
12#include <ydb-cpp-sdk/client/types/executor/executor.h>
13
14#include <userver/compiler/impl/lifetime.hpp>
15
16USERVER_NAMESPACE_BEGIN
17
18namespace ydb {
19
20namespace impl {
21class Driver;
22struct TopicSettings;
23} // namespace impl
24
25/// @brief Read session used to connect to one or more topics for reading
26///
27/// @see https://ydb.tech/docs/en/reference/ydb-sdk/topic#reading
28///
29/// ## Example usage:
30///
31/// @ref samples/ydb_service/components/topic_reader.hpp
32/// @ref samples/ydb_service/components/topic_reader.cpp
33///
34/// @example samples/ydb_service/components/topic_reader.hpp
35/// @example samples/ydb_service/components/topic_reader.cpp
36class TopicReadSession final {
37public:
38 /// @cond
39 // For internal use only.
40 explicit TopicReadSession(std::shared_ptr<NYdb::NTopic::IReadSession> read_session);
41 /// @endcond
42
43 /// @brief Get read session events
44 ///
45 /// Waits until event occurs
46 /// @param max_events_count maximum events count in batch
47 /// @param max_size_bytes total size limit for data messages in batch
48 /// if not specified, read session chooses event batch size automatically
50 std::optional<std::size_t> max_events_count = {},
51 size_t max_size_bytes = std::numeric_limits<size_t>::max()
52 );
53
54 /// @brief Get read session events
55 ///
56 /// Waits until event occurs
57 /// @param settings ydb native read session settings
59 const NYdb::NTopic::TReadSessionGetEventSettings& settings
60 );
61
62 /// @brief Close read session
63 ///
64 /// Waits for all commit acknowledgments to arrive.
65 /// Force close after timeout
66 bool Close(std::chrono::milliseconds timeout);
67
68 /// @brief Get native read session
69 ///
70 /// @warning Use with care! Facilities from @ref userver/drivers/subscribable_futures.hpp can help
71 /// with non-blocking wait operations.
72 NYdb::NTopic::IReadSession& GetNativeTopicReadSession() USERVER_IMPL_LIFETIME_BOUND;
73
74private:
75 std::shared_ptr<NYdb::NTopic::IReadSession> read_session_;
76};
77
78/// @brief Write session used to connect to a topic for writting
79///
80/// @see https://ydb.tech/docs/en/reference/ydb-sdk/topic#write
81class TopicWriteSession final {
82public:
83 /// @cond
84 /// For internal use only.
85 explicit TopicWriteSession(std::shared_ptr<NYdb::NTopic::IWriteSession> write_session);
86 /// @endcond
87
88 /// @brief Wait for the next write session event
89 ///
90 /// Suspends the current coroutine until an event is available, then returns it without blocking the thread.
91 NYdb::NTopic::TWriteSessionEvent::TEvent GetEvent();
92
93 /// @brief Poll for a write session event without waiting
94 ///
95 /// Returns the next buffered event immediately if one is available, or `std::nullopt` if the event queue is empty.
96 /// Does not suspend the coroutine.
97 ///
98 /// @note Sometimes may return `std::nullopt` even if an event is available. Intended for use in loops.
99 std::optional<NYdb::NTopic::TWriteSessionEvent::TEvent> TryGetEvent();
100
101 /// @brief Write a messsage using a continuation token from TReadyToAcceptEvent
102 ///
103 /// Must be called only after receiving TReadyToAcceptEvent from GetEvent() or TryGetEvent().
104 void Write(NYdb::NTopic::TContinuationToken&& token, NYdb::NTopic::TWriteMessage&& message);
105
106 /// @brief Close write session
107 ///
108 /// Waits for all in-flights messages to be acknowledged.
109 /// Force closes after timeout
110 bool Close(std::chrono::milliseconds timeout);
111
112 /// @brief Get native write session
113 ///
114 /// @warning Use with care! Facilities from @ref userver/drivers/subscribable_futures.hpp can help
115 /// with non-blocking wait operations.
116 NYdb::NTopic::IWriteSession& GetNativeTopicWriteSession() USERVER_IMPL_LIFETIME_BOUND;
117
118private:
119 std::shared_ptr<NYdb::NTopic::IWriteSession> write_session_;
120};
121
122/// @ingroup userver_clients
123///
124/// @brief YDB Topic Client
125///
126/// @see https://ydb.tech/docs/en/concepts/topic
127class TopicClient final {
128public:
129 /// @cond
130 // For internal use only.
131 TopicClient(std::shared_ptr<impl::Driver> driver, impl::TopicSettings settings);
132 /// @endcond
133
134 ~TopicClient();
135
136 /// Alter topic
137 void AlterTopic(const std::string& path, const NYdb::NTopic::TAlterTopicSettings& settings);
138
139 /// Describe topic
140 NYdb::NTopic::TDescribeTopicResult DescribeTopic(const std::string& path);
141
142 /// Create read session
143 TopicReadSession CreateReadSession(const NYdb::NTopic::TReadSessionSettings& settings);
144
145 /// Create write session
146 TopicWriteSession CreateWriteSession(const NYdb::NTopic::TWriteSessionSettings& settings);
147
148 /// Get native topic client
149 /// @warning Use with care! Facilities from
150 /// `<core/include/userver/drivers/subscribable_futures.hpp>` can help with
151 /// non-blocking wait operations.
152 NYdb::NTopic::TTopicClient& GetNativeTopicClient() USERVER_IMPL_LIFETIME_BOUND;
153
154private:
155 std::shared_ptr<impl::Driver> driver_;
156 // Owned executors: Stop() only after `topic_client_` is destroyed (see
157 // ~TopicClient). Joining these threads after the native client is gone
158 // avoids atexit use-after-destroy (e.g. SEGV in TCodecMap). Stopping them
159 // while TTopicClient is still alive would deadlock or stall writes.
160 NYdb::IExecutor::TPtr compression_executor_;
161 NYdb::IExecutor::TPtr handlers_executor_;
162 // `reset()` in ~TopicClient runs before Stop() on the executors above.
163 std::optional<NYdb::NTopic::TTopicClient> topic_client_;
164};
165
166} // namespace ydb
167
168USERVER_NAMESPACE_END