userver: userver/ydb/federated_topic.hpp Source File
Loading...
Searching...
No Matches
federated_topic.hpp
Go to the documentation of this file.
1#pragma once
2
3/// @file userver/ydb/federated_topic.hpp
4/// @brief YDB Federated Topic client
5///
6/// Federated Topic SDK serves as a wrapper over Topic SDK, coordinates
7/// reading from federated YDB installations topics, has a subset of functions
8/// of usual Topic SDK
9///
10/// @ref userver/ydb/topic.hpp
11
12#include <chrono>
13#include <memory>
14#include <optional>
15
16#include <ydb-cpp-sdk/client/federated_topic/federated_topic.h>
17#include <ydb-cpp-sdk/client/types/executor/executor.h>
18
19#include <userver/compiler/impl/lifetime.hpp>
20
21USERVER_NAMESPACE_BEGIN
22
23namespace ydb {
24
25namespace impl {
26class Driver;
27struct TopicSettings;
28} // namespace impl
29
30/// @brief Read session used to connect to one or more topics for reading
31///
32/// @see https://ydb.tech/docs/en/reference/ydb-sdk/topic#reading
33///
34/// ## Example usage:
35///
36/// @ref samples/ydb_service/components/federated_topic_reader.hpp
37/// @ref samples/ydb_service/components/federated_topic_reader.cpp
38///
39/// @example samples/ydb_service/components/federated_topic_reader.hpp
40/// @example samples/ydb_service/components/federated_topic_reader.cpp
41class FederatedTopicReadSession final {
42public:
43 /// @cond
44 // For internal use only.
45 explicit FederatedTopicReadSession(std::shared_ptr<NYdb::NFederatedTopic::IFederatedReadSession> read_session);
46 /// @endcond
47
48 /// @brief Get read session events
49 ///
50 /// Waits until event occurs
51 /// @param max_events_count maximum events count in batch
52 /// @param max_size_bytes total size limit for data messages in batch
53 /// if not specified, read session chooses event batch size automatically
55 std::optional<std::size_t> max_events_count = {},
56 size_t max_size_bytes = std::numeric_limits<size_t>::max()
57 );
58
59 /// @brief Close read session
60 ///
61 /// Waits for all commit acknowledgments to arrive.
62 /// Force close after timeout
63 bool Close(std::chrono::milliseconds timeout);
64
65 /// @brief Get native read session
66 ///
67 /// @warning Use with care! Facilities from @ref userver/drivers/subscribable_futures.hpp can help
68 /// with non-blocking wait operations.
69 NYdb::NFederatedTopic::IFederatedReadSession& GetNativeTopicReadSession() USERVER_IMPL_LIFETIME_BOUND;
70
71private:
72 std::shared_ptr<NYdb::NFederatedTopic::IFederatedReadSession> read_session_;
73};
74
75/// @ingroup userver_clients
76///
77/// @brief YDB Federated Topic Client
78///
79/// @see https://ydb.tech/docs/en/concepts/topic
80class FederatedTopicClient final {
81public:
82 /// @cond
83 // For internal use only.
84 FederatedTopicClient(std::shared_ptr<impl::Driver> driver, impl::TopicSettings settings);
85 /// @endcond
86
87 ~FederatedTopicClient();
88
89 /// Create read session
90 FederatedTopicReadSession CreateReadSession(const NYdb::NFederatedTopic::TFederatedReadSessionSettings& settings);
91
92 /// Get native topic client
93 /// @warning Use with care! Facilities from
94 /// `<core/include/userver/drivers/subscribable_futures.hpp>` can help with
95 /// non-blocking wait operations.
96 NYdb::NFederatedTopic::TFederatedTopicClient& GetNativeTopicClient() USERVER_IMPL_LIFETIME_BOUND;
97
98private:
99 std::shared_ptr<impl::Driver> driver_;
100 // Owned executors: Stop() only after `topic_client_` is destroyed (see
101 // ~FederatedTopicClient). Joining these threads after the native client is
102 // gone avoids atexit use-after-destroy (e.g. SEGV in TCodecMap). Stopping
103 // them while TFederatedTopicClient is still alive would deadlock or stall
104 // writes.
105 NYdb::IExecutor::TPtr compression_executor_;
106 NYdb::IExecutor::TPtr handlers_executor_;
107 // `reset()` in ~FederatedTopicClient runs before Stop() on the executors
108 // above.
109 std::optional<NYdb::NFederatedTopic::TFederatedTopicClient> topic_client_;
110};
111
112} // namespace ydb
113
114USERVER_NAMESPACE_END