userver: userver/drivers/subscribable_futures.hpp Source File
Loading...
Searching...
No Matches
subscribable_futures.hpp
Go to the documentation of this file.
1#pragma once
2
3/// @file userver/drivers/subscribable_futures.hpp
4/// @brief @copybrief drivers::SubscribableFutureWrapper
5
6#include <boost/intrusive_ptr.hpp>
7#include <boost/smart_ptr/intrusive_ref_counter.hpp>
8
9#include <userver/engine/deadline.hpp>
10#include <userver/engine/exception.hpp>
11#include <userver/engine/future_status.hpp>
12#include <userver/engine/single_consumer_event.hpp>
13#include <userver/engine/task/cancel.hpp>
14#include <userver/utils/make_intrusive_ptr.hpp>
15
16USERVER_NAMESPACE_BEGIN
17
18namespace drivers {
19
20namespace impl {
21
22struct EventHolder final : boost::intrusive_ref_counter<EventHolder> {
23 engine::SingleConsumerEvent event{engine::SingleConsumerEvent::NoAutoReset{}};
24};
25
26} // namespace impl
27
28/// @ingroup userver_concurrency
29///
30/// @brief An adaptor for working with certain external futures.
31///
32/// The `SubscribableFuture` must have a `Subscribe(Callback)` method that calls
33/// the callback when the future is ready. If the future is already ready, then
34/// `Subscribe` must call the callback immediately. If the promise is dropped
35/// and will never be fulfilled properly, then the `SubscribableFuture` should
36/// call the callback anyway.
37///
38/// @see drivers::WaitForSubscribableFuture
39/// @see drivers::TryWaitForSubscribableFuture
40template <typename SubscribableFuture>
41class SubscribableFutureWrapper final {
42public:
43 explicit SubscribableFutureWrapper(SubscribableFuture&& future)
44 : original_future_(static_cast<SubscribableFuture&&>(future)),
45 event_holder_(utils::make_intrusive_ptr<impl::EventHolder>()) {
46 original_future_.Subscribe([event_holder = event_holder_](auto&) { event_holder->event.Send(); });
47 }
48
49 /// @returns the original future
50 SubscribableFuture& GetFuture() { return original_future_; }
51
52 /// @brief Wait for the future. The result can be retrieved from the original
53 /// future using GetFuture once ready.
54 /// @throws engine::WaitInterruptedException on task cancellation
55 void Wait() {
56 if (TryWaitUntil(engine::Deadline{}) != engine::FutureStatus::kReady) {
58 }
59 }
60
61 /// @brief Wait for the future. The result can be retrieved from the original
62 /// future using GetFuture once ready.
63 /// @returns an error code if deadline is exceeded or task is cancelled
64 [[nodiscard]] engine::FutureStatus TryWaitUntil(engine::Deadline deadline) {
65 if (event_holder_->event.WaitForEventUntil(deadline)) {
66 return engine::FutureStatus::kReady;
67 }
69 }
70
71private:
72 SubscribableFuture original_future_;
73 boost::intrusive_ptr<impl::EventHolder> event_holder_;
74};
75
76/// @ingroup userver_concurrency
77///
78/// @brief Waits on the given future as described
79/// on drivers::SubscribableFutureWrapper.
80///
81/// The result can be retrieved from the original future once ready.
82///
83/// @throws engine::WaitInterruptedException on task cancellation
84template <typename SubscribableFuture>
85void WaitForSubscribableFuture(SubscribableFuture&& future) {
86 SubscribableFutureWrapper<SubscribableFuture&>{future}.Wait();
87}
88
89/// @ingroup userver_concurrency
90/// @overload
91/// @returns an error code if deadline is exceeded or task is cancelled
92///
93/// @warning Repeatedly waiting again after `deadline` expiration leads to a
94/// memory leak, use drivers::SubscribableFutureWrapper instead.
95template <typename SubscribableFuture>
96[[nodiscard]] engine::FutureStatus
97TryWaitForSubscribableFuture(SubscribableFuture&& future, engine::Deadline deadline) {
98 return SubscribableFutureWrapper<SubscribableFuture&>{future}.TryWaitUntil(deadline);
99}
100
101} // namespace drivers
102
103USERVER_NAMESPACE_END