userver: userver/concurrent/mpsc_queue.hpp Source File
⚠️ This is the documentation for an old userver version. Click here to switch to the latest version.
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages Concepts
mpsc_queue.hpp
Go to the documentation of this file.
1#pragma once
2
3/// @file userver/concurrent/mpsc_queue.hpp
4/// @brief Multiple producer, single consumer queue
5
6#include <atomic>
7#include <limits>
8#include <memory>
9
10#include <boost/lockfree/queue.hpp>
11
12#include <userver/concurrent/impl/semaphore_capacity_control.hpp>
13#include <userver/concurrent/queue_helpers.hpp>
14#include <userver/engine/deadline.hpp>
15#include <userver/engine/semaphore.hpp>
16#include <userver/engine/single_consumer_event.hpp>
17#include <userver/engine/task/cancel.hpp>
18#include <userver/utils/assert.hpp>
19
20USERVER_NAMESPACE_BEGIN
21
22namespace concurrent {
23
24namespace impl {
25
26/// Helper template. Default implementation is straightforward.
27template <typename T>
28struct QueueHelper {
29 using LockFreeQueue = boost::lockfree::queue<T>;
30
31 static void Push(LockFreeQueue& queue, T&& value) {
32 [[maybe_unused]] bool push_result = queue.push(std::move(value));
33 UASSERT(push_result);
34 }
35
36 [[nodiscard]] static bool Pop(LockFreeQueue& queue, T& value) {
37 return queue.pop(value);
38 }
39
40 static_assert(std::is_trivially_destructible_v<T>,
41 "T has non-trivial destructor. Use "
42 "MpscQueue<std::unique_ptr<T>> instead of MpscQueue<T>");
43};
44
45/// This partial specialization allows to use std::unique_ptr with Queue.
46template <typename T>
47struct QueueHelper<std::unique_ptr<T>> {
48 using LockFreeQueue = boost::lockfree::queue<T*>;
49
50 static void Push(LockFreeQueue& queue, std::unique_ptr<T>&& value) {
51 QueueHelper<T*>::Push(queue, value.release());
52 }
53
54 [[nodiscard]] static bool Pop(LockFreeQueue& queue,
55 std::unique_ptr<T>& value) {
56 T* ptr{nullptr};
57 if (!QueueHelper<T*>::Pop(queue, ptr)) return false;
58 value.reset(ptr);
59 return true;
60 }
61};
62
63} // namespace impl
64
65/// @ingroup userver_concurrency
66///
67/// Multiple producer, single consumer queue
68///
69/// ## Example usage:
70///
71/// @snippet concurrent/mpsc_queue_test.cpp Sample concurrent::MpscQueue usage
72///
73/// @see @ref scripts/docs/en/userver/synchronization.md
74template <typename T>
75class MpscQueue final : public std::enable_shared_from_this<MpscQueue<T>> {
76 struct EmplaceEnabler final {
77 // Disable {}-initialization in Queue's constructor
78 explicit EmplaceEnabler() = default;
79 };
80
81 using QueueHelper = impl::QueueHelper<T>;
82
83 using ProducerToken = impl::NoToken;
84 using ConsumerToken = impl::NoToken;
85
86 friend class Producer<MpscQueue, ProducerToken, EmplaceEnabler>;
87 friend class Consumer<MpscQueue, ConsumerToken, EmplaceEnabler>;
88
89 public:
90 static constexpr std::size_t kUnbounded =
91 std::numeric_limits<std::size_t>::max();
92
93 using ValueType = T;
94
95 using Producer =
96 concurrent::Producer<MpscQueue, ProducerToken, EmplaceEnabler>;
97 using Consumer =
98 concurrent::Consumer<MpscQueue, ConsumerToken, EmplaceEnabler>;
99 using MultiProducer =
100 concurrent::Producer<MpscQueue, impl::NoToken, EmplaceEnabler>;
101
102 /// @cond
103 // For internal use only
104 explicit MpscQueue(std::size_t max_size, EmplaceEnabler /*unused*/)
105 : remaining_capacity_(max_size),
106 remaining_capacity_control_(remaining_capacity_) {}
107
108 MpscQueue(MpscQueue&&) = delete;
109 MpscQueue(const MpscQueue&) = delete;
110 MpscQueue& operator=(MpscQueue&&) = delete;
111 MpscQueue& operator=(const MpscQueue&) = delete;
112 ~MpscQueue();
113 /// @endcond
114
115 /// Create a new queue
116 static std::shared_ptr<MpscQueue> Create(std::size_t max_size = kUnbounded) {
117 return std::make_shared<MpscQueue>(max_size, EmplaceEnabler{});
118 }
119
120 /// Get a `Producer` which makes it possible to push items into the queue.
121 /// Can be called multiple times. The resulting `Producer` is not thread-safe,
122 /// so you have to use multiple Producers of the same queue to simultaneously
123 /// write from multiple coroutines/threads.
124 ///
125 /// @note `Producer` may outlive the queue and the `Consumer`.
126 Producer GetProducer();
127
128 /// Get a `MultiProducer` which makes it possible to push items into the
129 /// queue. Can be called multiple times. The resulting `MultiProducer` is
130 /// thread-safe, so it can be used simultaneously from multiple
131 /// coroutines/threads.
132 ///
133 /// @note `MultiProducer` may outlive the queue and the `Consumer`.
134 MultiProducer GetMultiProducer();
135
136 /// Get a `Consumer` which makes it possible to read items from the queue.
137 /// Can be called only once. You may not use the `Consumer` simultaneously
138 /// from multiple coroutines/threads.
139 ///
140 /// @note `Consumer` may outlive the queue and producers.
141 Consumer GetConsumer();
142
143 /// @brief Sets the limit on the queue size, pushes over this limit will block
144 /// @note This is a soft limit and may be slightly overrun under load.
145 void SetSoftMaxSize(size_t size);
146
147 /// @brief Gets the limit on the queue size
148 [[nodiscard]] size_t GetSoftMaxSize() const;
149
150 /// @brief Gets the approximate size of queue
151 [[nodiscard]] size_t GetSizeApproximate() const;
152
153 private:
154 bool Push(ProducerToken&, T&&, engine::Deadline);
155 bool PushNoblock(ProducerToken&, T&&);
156 bool DoPush(ProducerToken&, T&&);
157
158 bool Pop(ConsumerToken&, T&, engine::Deadline);
159 bool PopNoblock(ConsumerToken&, T&);
160 bool DoPop(ConsumerToken&, T&);
161
162 void MarkConsumerIsDead();
163 void MarkProducerIsDead();
164
165 // Resolves to boost::lockfree::queue<T> except for std::unique_ptr<T>
166 // specialization. In that case, resolves to boost::lockfree::queue<T*>
167 typename QueueHelper::LockFreeQueue queue_{1};
168 engine::SingleConsumerEvent nonempty_event_;
169 engine::CancellableSemaphore remaining_capacity_;
170 concurrent::impl::SemaphoreCapacityControl remaining_capacity_control_;
171 std::atomic<bool> consumer_is_created_{false};
172 std::atomic<bool> consumer_is_created_and_dead_{false};
173 std::atomic<bool> producer_is_created_and_dead_{false};
174 std::atomic<size_t> producers_count_{0};
175 std::atomic<size_t> size_{0};
176};
177
178template <typename T>
179MpscQueue<T>::~MpscQueue() {
180 UASSERT(consumer_is_created_and_dead_ || !consumer_is_created_);
181 UASSERT(!producers_count_);
182 // Clear remaining items in queue. This will work for unique_ptr as well.
183 T value;
184 ConsumerToken temp_token{queue_};
185 while (PopNoblock(temp_token, value)) {
186 }
187}
188
189template <typename T>
190typename MpscQueue<T>::Producer MpscQueue<T>::GetProducer() {
191 producers_count_++;
192 producer_is_created_and_dead_ = false;
193 nonempty_event_.Send();
194 return Producer(this->shared_from_this(), EmplaceEnabler{});
195}
196
197template <typename T>
198typename MpscQueue<T>::MultiProducer MpscQueue<T>::GetMultiProducer() {
199 // MultiProducer and Producer are actually the same for MpscQueue, which is an
200 // implementation detail.
201 return GetProducer();
202}
203
204template <typename T>
205typename MpscQueue<T>::Consumer MpscQueue<T>::GetConsumer() {
206 UINVARIANT(!consumer_is_created_,
207 "MpscQueue::Consumer must only be obtained a single time");
208 consumer_is_created_ = true;
209 return Consumer(this->shared_from_this(), EmplaceEnabler{});
210}
211
212template <typename T>
213void MpscQueue<T>::SetSoftMaxSize(size_t max_size) {
214 remaining_capacity_control_.SetCapacity(max_size);
215}
216
217template <typename T>
218size_t MpscQueue<T>::GetSoftMaxSize() const {
219 return remaining_capacity_control_.GetCapacity();
220}
221
222template <typename T>
223size_t MpscQueue<T>::GetSizeApproximate() const {
224 return size_;
225}
226
227template <typename T>
228bool MpscQueue<T>::Push(ProducerToken& token, T&& value,
229 engine::Deadline deadline) {
230 return remaining_capacity_.try_lock_shared_until(deadline) &&
231 DoPush(token, std::move(value));
232}
233
234template <typename T>
235bool MpscQueue<T>::PushNoblock(ProducerToken& token, T&& value) {
236 return remaining_capacity_.try_lock_shared() &&
237 DoPush(token, std::move(value));
238}
239
240template <typename T>
241bool MpscQueue<T>::DoPush(ProducerToken& /*unused*/, T&& value) {
242 if (consumer_is_created_and_dead_) {
243 remaining_capacity_.unlock_shared();
244 return false;
245 }
246
247 QueueHelper::Push(queue_, std::move(value));
248 ++size_;
249 nonempty_event_.Send();
250
251 return true;
252}
253
254template <typename T>
255bool MpscQueue<T>::Pop(ConsumerToken& token, T& value,
256 engine::Deadline deadline) {
257 while (!DoPop(token, value)) {
258 if (producer_is_created_and_dead_ ||
259 !nonempty_event_.WaitForEventUntil(deadline)) {
260 // Producer might have pushed something in queue between .pop()
261 // and !producer_is_created_and_dead_ check. Check twice to avoid
262 // TOCTOU.
263 return DoPop(token, value);
264 }
265 }
266 return true;
267}
268
269template <typename T>
270bool MpscQueue<T>::PopNoblock(ConsumerToken& token, T& value) {
271 return DoPop(token, value);
272}
273
274template <typename T>
275bool MpscQueue<T>::DoPop(ConsumerToken& /*unused*/, T& value) {
276 if (QueueHelper::Pop(queue_, value)) {
277 --size_;
278 remaining_capacity_.unlock_shared();
279 nonempty_event_.Reset();
280 return true;
281 }
282 return false;
283}
284
285template <typename T>
286void MpscQueue<T>::MarkConsumerIsDead() {
287 consumer_is_created_and_dead_ = true;
288 remaining_capacity_control_.SetCapacityOverride(0);
289}
290
291template <typename T>
292void MpscQueue<T>::MarkProducerIsDead() {
293 producer_is_created_and_dead_ = (--producers_count_ == 0);
294 nonempty_event_.Send();
295}
296
297} // namespace concurrent
298
299USERVER_NAMESPACE_END