userver: userver/drivers/subscribable_futures.hpp Source File
⚠️ This is the documentation for an old userver version. Click here to switch to the latest version.
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages Concepts
subscribable_futures.hpp
Go to the documentation of this file.
1#pragma once
2
3/// @file userver/drivers/subscribable_futures.hpp
4/// @brief @copybrief drivers::SubscribableFutureWrapper
5
6#include <boost/intrusive_ptr.hpp>
7#include <boost/smart_ptr/intrusive_ref_counter.hpp>
8
9#include <userver/engine/deadline.hpp>
10#include <userver/engine/exception.hpp>
11#include <userver/engine/future_status.hpp>
12#include <userver/engine/single_consumer_event.hpp>
13#include <userver/engine/task/cancel.hpp>
14#include <userver/utils/make_intrusive_ptr.hpp>
15
16USERVER_NAMESPACE_BEGIN
17
18namespace drivers {
19
20namespace impl {
21
22struct EventHolder final : boost::intrusive_ref_counter<EventHolder> {
23 engine::SingleConsumerEvent event{engine::SingleConsumerEvent::NoAutoReset{}};
24};
25
26} // namespace impl
27
28/// @ingroup userver_concurrency
29///
30/// @brief An adaptor for working with certain external futures.
31///
32/// The `SubscribableFuture` must have a `Subscribe(Callback)` method that calls
33/// the callback when the future is ready. If the future is already ready, then
34/// `Subscribe` must call the callback immediately. If the promise is dropped
35/// and will never be fulfilled properly, then the `SubscribableFuture` should
36/// call the callback anyway.
37///
38/// @see drivers::WaitForSubscribableFuture
39/// @see drivers::TryWaitForSubscribableFuture
40template <typename SubscribableFuture>
41class SubscribableFutureWrapper final {
42 public:
43 explicit SubscribableFutureWrapper(SubscribableFuture&& future)
44 : original_future_(static_cast<SubscribableFuture&&>(future)),
45 event_holder_(utils::make_intrusive_ptr<impl::EventHolder>()) {
46 original_future_.Subscribe(
47 [event_holder = event_holder_](auto&) { event_holder->event.Send(); });
48 }
49
50 /// @returns the original future
51 SubscribableFuture& GetFuture() { return original_future_; }
52
53 /// @brief Wait for the future. The result can be retrieved from the original
54 /// future using GetFuture once ready.
55 /// @throws engine::WaitInterruptedException on task cancellation
56 void Wait() {
57 if (TryWaitUntil(engine::Deadline{}) != engine::FutureStatus::kReady) {
58 throw engine::WaitInterruptedException(
60 }
61 }
62
63 /// @brief Wait for the future. The result can be retrieved from the original
64 /// future using GetFuture once ready.
65 /// @returns an error code if deadline is exceeded or task is cancelled
66 [[nodiscard]] engine::FutureStatus TryWaitUntil(engine::Deadline deadline) {
67 if (event_holder_->event.WaitForEventUntil(deadline)) {
68 return engine::FutureStatus::kReady;
69 }
73 }
74
75 private:
76 SubscribableFuture original_future_;
77 boost::intrusive_ptr<impl::EventHolder> event_holder_;
78};
79
80/// @ingroup userver_concurrency
81///
82/// @brief Waits on the given future as described
83/// on drivers::SubscribableFutureWrapper.
84///
85/// The result can be retrieved from the original future once ready.
86///
87/// @throws engine::WaitInterruptedException on task cancellation
88template <typename SubscribableFuture>
89void WaitForSubscribableFuture(SubscribableFuture&& future) {
90 SubscribableFutureWrapper<SubscribableFuture&>{future}.Wait();
91}
92
93/// @ingroup userver_concurrency
94/// @overload
95/// @returns an error code if deadline is exceeded or task is cancelled
96///
97/// @warning Repeatedly waiting again after `deadline` expiration leads to a
98/// memory leak, use drivers::SubscribableFutureWrapper instead.
99template <typename SubscribableFuture>
101 SubscribableFuture&& future, engine::Deadline deadline) {
102 return SubscribableFutureWrapper<SubscribableFuture&>{future}.TryWaitUntil(
103 deadline);
104}
105
106} // namespace drivers
107
108USERVER_NAMESPACE_END