6#include <boost/intrusive_ptr.hpp>
7#include <boost/smart_ptr/intrusive_ref_counter.hpp>
9#include <userver/engine/deadline.hpp>
10#include <userver/engine/exception.hpp>
11#include <userver/engine/future_status.hpp>
12#include <userver/engine/single_consumer_event.hpp>
13#include <userver/engine/task/cancel.hpp>
14#include <userver/utils/make_intrusive_ptr.hpp>
16USERVER_NAMESPACE_BEGIN
22struct EventHolder
final : boost::intrusive_ref_counter<EventHolder> {
23 engine::SingleConsumerEvent event{engine::SingleConsumerEvent::NoAutoReset{}};
40template <
typename SubscribableFuture>
41class SubscribableFutureWrapper
final {
43 explicit SubscribableFutureWrapper(SubscribableFuture&& future)
44 : original_future_(
static_cast<SubscribableFuture&&>(future)),
47 original_future_.Subscribe([event_holder = event_holder_](
auto&) { event_holder->event.Send(); });
51 SubscribableFuture&
GetFuture() {
return original_future_; }
57 if (
TryWaitUntil(engine::Deadline{}
) != engine::FutureStatus::kReady) {
58 throw engine::WaitInterruptedException(engine::current_task::CancellationReason());
65 [[nodiscard]] engine::FutureStatus
TryWaitUntil(engine::Deadline deadline) {
66 if (event_holder_->event.WaitForEventUntil(deadline)) {
67 return engine::FutureStatus::kReady;
69 return engine::current_task::ShouldCancel() ? engine::FutureStatus::kCancelled : engine::FutureStatus::kTimeout;
73 SubscribableFuture original_future_;
74 boost::intrusive_ptr<impl::EventHolder> event_holder_;
85template <
typename SubscribableFuture>
87 SubscribableFutureWrapper<SubscribableFuture&>{future}.Wait();
96template <
typename SubscribableFuture>
98 SubscribableFuture&& future,
99 engine::Deadline deadline
101 return SubscribableFutureWrapper<SubscribableFuture&>{future}.TryWaitUntil(deadline);