userver: userver/engine/single_consumer_event.hpp Source File
Loading...
Searching...
No Matches
single_consumer_event.hpp
Go to the documentation of this file.
1#pragma once
2
3/// @file userver/engine/single_consumer_event.hpp
4/// @brief @copybrief engine::SingleConsumerEvent
5
6#include <atomic>
7#include <chrono>
8
9#include <userver/engine/deadline.hpp>
10#include <userver/engine/impl/wait_list_fwd.hpp>
11
12USERVER_NAMESPACE_BEGIN
13
14namespace engine {
15
16/// @ingroup userver_concurrency
17///
18/// @brief A multiple-producers, single-consumer event
19class SingleConsumerEvent final {
20public:
21 struct NoAutoReset final {};
22
23 /// Creates an event that resets automatically on retrieval.
25
26 /// Creates an event that does not reset automatically.
27 explicit SingleConsumerEvent(NoAutoReset) noexcept;
28
29 SingleConsumerEvent(const SingleConsumerEvent&) = delete;
30 SingleConsumerEvent(SingleConsumerEvent&&) = delete;
31 SingleConsumerEvent& operator=(const SingleConsumerEvent&) = delete;
32 SingleConsumerEvent& operator=(SingleConsumerEvent&&) = delete;
33 ~SingleConsumerEvent();
34
35 /// @return whether this event resets automatically on retrieval
36 bool IsAutoReset() const noexcept;
37
38 /// @brief Waits until the event is in a signaled state.
39 ///
40 /// If the event is auto-resetting, clears the signal flag upon waking up. If
41 /// already in a signaled state, does the same without sleeping.
42 ///
43 /// If we the waiting failed (the event did not signal), because the optional
44 /// deadline is expired or the current task is cancelled, returns `false`.
45 ///
46 /// @return whether the event signaled
47 [[nodiscard]] bool WaitForEvent();
48
49 /// @overload bool WaitForEvent()
50 template <typename Clock, typename Duration>
51 [[nodiscard]] bool WaitForEventFor(std::chrono::duration<Clock, Duration>);
52
53 /// @overload bool WaitForEvent()
54 template <typename Clock, typename Duration>
55 [[nodiscard]] bool WaitForEventUntil(std::chrono::time_point<Clock, Duration>);
56
57 /// @overload bool WaitForEvent()
58 [[nodiscard]] bool WaitForEventUntil(Deadline);
59
60 /// @brief Works like `std::condition_variable::wait_until`. Waits until
61 /// @a stop_waiting becomes `true`, and we are notified via @ref Send.
62 ///
63 /// If @a stop_waiting` is already `true`, returns right away.
64 ///
65 /// Unlike `std::condition_variable` and engine::ConditionVariable, there are
66 /// no locks around the state watched by @a stop_waiting, so that state must
67 /// be atomic. `std::memory_order_relaxed` is OK inside @a stop_waiting and
68 /// inside the notifiers as long as it does not mess up their logic.
69 ///
70 /// **Example.** Suppose we want to wait until a counter is even, then grab
71 /// it.
72 ///
73 /// Initialization:
74 /// @snippet engine/single_consumer_event_test.cpp CV init
75 ///
76 /// Notifier side:
77 /// @snippet engine/single_consumer_event_test.cpp CV notifier
78 ///
79 /// Waiter side:
80 /// @snippet engine/single_consumer_event_test.cpp CV waiter
81 template <typename Predicate>
82 [[nodiscard]] bool WaitUntil(Deadline, Predicate stop_waiting);
83
84 /// Resets the signal flag. Guarantees at least 'acquire' and 'release'
85 /// memory ordering. Must only be called by the waiting task.
86 void Reset() noexcept;
87
88 /// Sets the signal flag and wakes a coroutine that waits on it (if any).
89 /// If the signal flag is already set, does nothing.
90 ///
91 /// The waiter is allowed to destroy the SingleConsumerEvent immediately
92 /// after exiting WaitForEvent, ONLY IF the wait succeeded. Otherwise
93 /// a concurrent task may call Send on a destroyed SingleConsumerEvent.
94 /// Here is an example of this situation:
95 /// @snippet engine/single_consumer_event_test.cpp Wait and destroy
96 void Send();
97
98 /// Returns `true` iff already signaled. Never resets the signal.
99 [[nodiscard]] bool IsReady() const noexcept;
100
101private:
102 class EventWaitStrategy;
103
104 bool GetIsSignaled() noexcept;
105
106 void CheckIsAutoResetForWaitPredicate();
107
108 impl::FastPimplWaitListLight waiters_;
109 const bool is_auto_reset_{true};
110};
111
112template <typename Clock, typename Duration>
113bool SingleConsumerEvent::WaitForEventFor(std::chrono::duration<Clock, Duration> duration) {
114 return WaitForEventUntil(Deadline::FromDuration(duration));
115}
116
117template <typename Clock, typename Duration>
118bool SingleConsumerEvent::WaitForEventUntil(std::chrono::time_point<Clock, Duration> time_point) {
119 return WaitForEventUntil(Deadline::FromTimePoint(time_point));
120}
121
122template <typename Predicate>
123bool SingleConsumerEvent::WaitUntil(Deadline deadline, Predicate stop_waiting) {
124 CheckIsAutoResetForWaitPredicate();
125
126 // If the state, according to what we've been previously notified of via
127 // 'Send', is OK, then return right away. Fresh state updates can also
128 // leak to us here, but we should not rely on it.
129 while (!stop_waiting()) {
130 // Wait until we are allowed to make progress.
131 // On the first such wait, we may discover a signal from the state that
132 // has already leaked to us previously (as described above).
133 //
134 // We may also receive false signals from cases when we are allowed
135 // and unallowed to make progress in a rapid sequence, or when the notifier
136 // thinks that we might be happy with the state, but we aren't.
137 if (!WaitForEventUntil(deadline)) {
138 return false;
139 }
140 }
141
142 return true;
143}
144
145} // namespace engine
146
147USERVER_NAMESPACE_END