userver: userver/concurrent/mpsc_queue.hpp Source File
Loading...
Searching...
No Matches
mpsc_queue.hpp
Go to the documentation of this file.
1#pragma once
2
3/// @file userver/concurrent/mpsc_queue.hpp
4/// @brief Multiple producer, single consumer queue
5
6#include <atomic>
7#include <limits>
8#include <memory>
9
10#include <userver/concurrent/impl/intrusive_mpsc_queue.hpp>
11#include <userver/concurrent/impl/semaphore_capacity_control.hpp>
12#include <userver/concurrent/queue_helpers.hpp>
13#include <userver/engine/deadline.hpp>
14#include <userver/engine/semaphore.hpp>
15#include <userver/engine/single_consumer_event.hpp>
16#include <userver/engine/task/cancel.hpp>
17#include <userver/utils/assert.hpp>
18
19USERVER_NAMESPACE_BEGIN
20
21namespace concurrent {
22
23namespace impl {
24
25template <typename T>
26struct MpscQueueNode final : public SinglyLinkedBaseHook {
27 explicit MpscQueueNode(T&& value)
28 : value(std::move(value))
29 {}
30
31 T value;
32};
33
34} // namespace impl
35
36/// @ingroup userver_concurrency
37///
38/// Multiple producer, single consumer queue
39///
40/// ## Example usage:
41///
42/// @snippet concurrent/mpsc_queue_test.cpp Sample concurrent::MpscQueue usage
43///
44/// @see @ref scripts/docs/en/userver/synchronization.md
45template <typename T>
46class MpscQueue final : public std::enable_shared_from_this<MpscQueue<T>> {
47 struct EmplaceEnabler final {
48 // Disable {}-initialization in Queue's constructor
49 explicit EmplaceEnabler() = default;
50 };
51
52 using Node = impl::MpscQueueNode<T>;
53
54 using ProducerToken = impl::NoToken;
55 using ConsumerToken = impl::NoToken;
56
57 friend class Producer<MpscQueue, ProducerToken, EmplaceEnabler>;
58 friend class Consumer<MpscQueue, ConsumerToken, EmplaceEnabler>;
59
60public:
61 static constexpr std::size_t kUnbounded = std::numeric_limits<std::size_t>::max();
62
63 using ValueType = T;
64
65 using Producer = concurrent::Producer<MpscQueue, ProducerToken, EmplaceEnabler>;
66 using Consumer = concurrent::Consumer<MpscQueue, ConsumerToken, EmplaceEnabler>;
67 using MultiProducer = concurrent::Producer<MpscQueue, impl::NoToken, EmplaceEnabler>;
68
69 /// @cond
70 // For internal use only
71 explicit MpscQueue(std::size_t max_size, EmplaceEnabler /*unused*/)
72 : remaining_capacity_(max_size),
73 remaining_capacity_control_(remaining_capacity_)
74 {}
75
76 MpscQueue(MpscQueue&&) = delete;
77 MpscQueue(const MpscQueue&) = delete;
78 MpscQueue& operator=(MpscQueue&&) = delete;
79 MpscQueue& operator=(const MpscQueue&) = delete;
80 ~MpscQueue();
81 /// @endcond
82
83 /// Create a new queue
84 static std::shared_ptr<MpscQueue> Create(std::size_t max_size = kUnbounded) {
85 return std::make_shared<MpscQueue>(max_size, EmplaceEnabler{});
86 }
87
88 /// Get a `Producer` which makes it possible to push items into the queue.
89 /// Can be called multiple times. The resulting `Producer` is not thread-safe,
90 /// so you have to use multiple Producers of the same queue to simultaneously
91 /// write from multiple coroutines/threads.
92 ///
93 /// @note `Producer` may outlive the queue and the `Consumer`.
94 Producer GetProducer();
95
96 /// Get a `MultiProducer` which makes it possible to push items into the
97 /// queue. Can be called multiple times. The resulting `MultiProducer` is
98 /// thread-safe, so it can be used simultaneously from multiple
99 /// coroutines/threads.
100 ///
101 /// @note `MultiProducer` may outlive the queue and the `Consumer`.
102 MultiProducer GetMultiProducer();
103
104 /// Get a `Consumer` which makes it possible to read items from the queue.
105 /// Can be called only once. You may not use the `Consumer` simultaneously
106 /// from multiple coroutines/threads.
107 ///
108 /// @note `Consumer` may outlive the queue and producers.
109 Consumer GetConsumer();
110
111 /// @brief Sets the limit on the queue size, pushes over this limit will block
112 /// @note This is a soft limit and may be slightly overrun under load.
113 void SetSoftMaxSize(size_t size);
114
115 /// @brief Gets the limit on the queue size
116 [[nodiscard]] size_t GetSoftMaxSize() const;
117
118 /// @brief Gets the approximate size of queue
119 [[nodiscard]] size_t GetSizeApproximate() const;
120
121private:
122 bool Push(ProducerToken&, T&&, engine::Deadline);
123 bool PushNoblock(ProducerToken&, T&&);
124 bool DoPush(ProducerToken&, T&&);
125
126 bool Pop(ConsumerToken&, T&, engine::Deadline);
127 bool PopNoblock(ConsumerToken&, T&);
128 bool DoPop(ConsumerToken&, T&, impl::IntrusiveMpscQueueImpl::PopMode);
129
130 void MarkConsumerIsDead();
131 void MarkProducerIsDead();
132
133 bool NoMoreProducers() const { return producer_is_created_ && producers_count_ == 0; }
134 bool NoMoreConsumers() const { return consumer_is_created_and_dead_; }
135
136 impl::IntrusiveMpscQueue<Node> queue_{};
137 engine::SingleConsumerEvent nonempty_event_{};
138 engine::CancellableSemaphore remaining_capacity_;
139 impl::SemaphoreCapacityControl remaining_capacity_control_;
140 std::atomic<bool> consumer_is_created_{false};
141 std::atomic<bool> consumer_is_created_and_dead_{false};
142 std::atomic<bool> producer_is_created_{false};
143 std::atomic<size_t> producers_count_{0};
144 std::atomic<size_t> size_{0};
145};
146
147/// @cond
148template <typename T>
149MpscQueue<T>::~MpscQueue() {
150 UASSERT(consumer_is_created_and_dead_ || !consumer_is_created_);
151 UASSERT(!producers_count_);
152 // Clear remaining items in queue.
153 while (const auto node = std::unique_ptr<Node>{queue_.TryPopBlocking()}) {
154 remaining_capacity_.unlock_shared();
155 }
156}
157/// @endcond
158
159template <typename T>
160typename MpscQueue<T>::Producer MpscQueue<T>::GetProducer() {
161 ++producers_count_;
162 producer_is_created_ = true;
163 nonempty_event_.Send();
164 return Producer(this->shared_from_this(), EmplaceEnabler{});
165}
166
167template <typename T>
168typename MpscQueue<T>::MultiProducer MpscQueue<T>::GetMultiProducer() {
169 // MultiProducer and Producer are actually the same for MpscQueue, which is an
170 // implementation detail.
171 return GetProducer();
172}
173
174template <typename T>
175typename MpscQueue<T>::Consumer MpscQueue<T>::GetConsumer() {
176 UINVARIANT(!consumer_is_created_, "MpscQueue::Consumer must only be obtained a single time");
177 consumer_is_created_ = true;
178 return Consumer(this->shared_from_this(), EmplaceEnabler{});
179}
180
181template <typename T>
182void MpscQueue<T>::SetSoftMaxSize(size_t max_size) {
183 remaining_capacity_control_.SetCapacity(max_size);
184}
185
186template <typename T>
187size_t MpscQueue<T>::GetSoftMaxSize() const {
188 return remaining_capacity_control_.GetCapacity();
189}
190
191template <typename T>
192size_t MpscQueue<T>::GetSizeApproximate() const {
193 return size_;
194}
195
196template <typename T>
197bool MpscQueue<T>::Push(ProducerToken& token, T&& value, engine::Deadline deadline) {
198 return remaining_capacity_.try_lock_shared_until(deadline) && DoPush(token, std::move(value));
199}
200
201template <typename T>
202bool MpscQueue<T>::PushNoblock(ProducerToken& token, T&& value) {
203 return remaining_capacity_.try_lock_shared() && DoPush(token, std::move(value));
204}
205
206template <typename T>
207bool MpscQueue<T>::DoPush(ProducerToken& /*unused*/, T&& value) {
208 if (NoMoreConsumers()) {
209 remaining_capacity_.unlock_shared();
210 return false;
211 }
212
213 auto node = std::make_unique<Node>(std::move(value));
214 queue_.Push(*node);
215 (void)node.release();
216
217 ++size_;
218 nonempty_event_.Send();
219
220 return true;
221}
222
223template <typename T>
224bool MpscQueue<T>::Pop(ConsumerToken& token, T& value, engine::Deadline deadline) {
225 bool no_more_producers = false;
226 const bool success = nonempty_event_.WaitUntil(deadline, [&] {
227 // kWeak is OK here, because if there is another push operation in process,
228 // they will notify us after pushing.
229 if (DoPop(token, value, impl::IntrusiveMpscQueueImpl::PopMode::kWeak)) {
230 return true;
231 }
232 if (NoMoreProducers()) {
233 // Producer might have pushed something in queue between .pop()
234 // and !producer_is_created_and_dead_ check. Check twice to avoid
235 // TOCTOU.
236 if (!DoPop(token, value, impl::IntrusiveMpscQueueImpl::PopMode::kRarelyBlocking)) {
237 no_more_producers = true;
238 }
239 return true;
240 }
241 return false;
242 });
243 return success && !no_more_producers;
244}
245
246template <typename T>
247bool MpscQueue<T>::PopNoblock(ConsumerToken& token, T& value) {
248 // kRarelyBlocking is required here, because with kWeak we sometimes would miss an item if another push
249 // is in process, and there is no guarantee that the user will retry PopNoblock.
250 //
251 // If there was a high-level consumer API that is not affected by kWeak (e.g. some batching API),
252 // then it could be used there.
253 // As it stands, it would be too bug-prone to provide weak guarantees in PopNoblock.
254 return DoPop(token, value, impl::IntrusiveMpscQueueImpl::PopMode::kRarelyBlocking);
255}
256
257template <typename T>
258bool MpscQueue<T>::DoPop(ConsumerToken& /*unused*/, T& value, impl::IntrusiveMpscQueueImpl::PopMode pop_mode) {
259 if (const auto node = std::unique_ptr<Node>{queue_.TryPop(pop_mode)}) {
260 value = std::move(node->value);
261
262 --size_;
263 remaining_capacity_.unlock_shared();
264 nonempty_event_.Reset();
265 return true;
266 }
267 return false;
268}
269
270template <typename T>
271void MpscQueue<T>::MarkConsumerIsDead() {
272 consumer_is_created_and_dead_ = true;
273 remaining_capacity_control_.SetCapacityOverride(0);
274}
275
276template <typename T>
277void MpscQueue<T>::MarkProducerIsDead() {
278 if (--producers_count_ == 0) {
279 nonempty_event_.Send();
280 }
281}
282
283} // namespace concurrent
284
285USERVER_NAMESPACE_END