userver: userver/concurrent/mpsc_queue.hpp Source File
Loading...
Searching...
No Matches
mpsc_queue.hpp
Go to the documentation of this file.
1#pragma once
2
3/// @file userver/concurrent/mpsc_queue.hpp
4/// @brief Multiple producer, single consumer queue
5
6#include <atomic>
7#include <limits>
8#include <memory>
9
10#include <boost/lockfree/queue.hpp>
11
12#include <userver/concurrent/impl/semaphore_capacity_control.hpp>
13#include <userver/concurrent/queue_helpers.hpp>
14#include <userver/engine/deadline.hpp>
15#include <userver/engine/semaphore.hpp>
16#include <userver/engine/single_consumer_event.hpp>
17#include <userver/engine/task/cancel.hpp>
18#include <userver/utils/assert.hpp>
19
20USERVER_NAMESPACE_BEGIN
21
22namespace concurrent {
23
24namespace impl {
25
26/// Helper template. Default implementation is straightforward.
27template <typename T>
28struct QueueHelper {
29 using LockFreeQueue = boost::lockfree::queue<T>;
30
31 static void Push(LockFreeQueue& queue, T&& value) {
32 [[maybe_unused]] bool push_result = queue.push(std::move(value));
33 UASSERT(push_result);
34 }
35
36 [[nodiscard]] static bool Pop(LockFreeQueue& queue, T& value) {
37 return queue.pop(value);
38 }
39
40 static_assert(std::is_trivially_destructible_v<T>,
41 "T has non-trivial destructor. Use "
42 "MpscQueue<std::unique_ptr<T>> instead of MpscQueue<T>");
43};
44
45/// This partial specialization allows to use std::unique_ptr with Queue.
46template <typename T>
47struct QueueHelper<std::unique_ptr<T>> {
48 using LockFreeQueue = boost::lockfree::queue<T*>;
49
50 static void Push(LockFreeQueue& queue, std::unique_ptr<T>&& value) {
51 QueueHelper<T*>::Push(queue, value.release());
52 }
53
54 [[nodiscard]] static bool Pop(LockFreeQueue& queue,
55 std::unique_ptr<T>& value) {
56 T* ptr{nullptr};
57 if (!QueueHelper<T*>::Pop(queue, ptr)) return false;
58 value.reset(ptr);
59 return true;
60 }
61};
62
63} // namespace impl
64
65/// @ingroup userver_concurrency
66///
67/// Multiple producer, single consumer queue
68///
69/// ## Example usage:
70///
71/// @snippet concurrent/mpsc_queue_test.cpp Sample concurrent::MpscQueue usage
72///
73/// @see @ref scripts/docs/en/userver/synchronization.md
74template <typename T>
75class MpscQueue final : public std::enable_shared_from_this<MpscQueue<T>> {
76 struct EmplaceEnabler final {
77 // Disable {}-initialization in Queue's constructor
78 explicit EmplaceEnabler() = default;
79 };
80
81 using QueueHelper = impl::QueueHelper<T>;
82
83 using ProducerToken = impl::NoToken;
84 using ConsumerToken = impl::NoToken;
85
86 friend class Producer<MpscQueue, ProducerToken, EmplaceEnabler>;
87 friend class Consumer<MpscQueue, ConsumerToken, EmplaceEnabler>;
88
89 public:
90 static constexpr std::size_t kUnbounded =
91 std::numeric_limits<std::size_t>::max();
92
93 using ValueType = T;
94
95 using Producer =
96 concurrent::Producer<MpscQueue, ProducerToken, EmplaceEnabler>;
97 using Consumer =
98 concurrent::Consumer<MpscQueue, ConsumerToken, EmplaceEnabler>;
99 using MultiProducer =
100 concurrent::Producer<MpscQueue, impl::NoToken, EmplaceEnabler>;
101
102 /// @cond
103 // For internal use only
104 explicit MpscQueue(std::size_t max_size, EmplaceEnabler /*unused*/)
105 : remaining_capacity_(max_size),
106 remaining_capacity_control_(remaining_capacity_) {}
107
108 MpscQueue(MpscQueue&&) = delete;
109 MpscQueue(const MpscQueue&) = delete;
110 MpscQueue& operator=(MpscQueue&&) = delete;
111 MpscQueue& operator=(const MpscQueue&) = delete;
112 ~MpscQueue();
113 /// @endcond
114
115 /// Create a new queue
116 static std::shared_ptr<MpscQueue> Create(std::size_t max_size = kUnbounded) {
117 return std::make_shared<MpscQueue>(max_size, EmplaceEnabler{});
118 }
119
120 /// Get a `Producer` which makes it possible to push items into the queue.
121 /// Can be called multiple times. The resulting `Producer` is not thread-safe,
122 /// so you have to use multiple Producers of the same queue to simultaneously
123 /// write from multiple coroutines/threads.
124 ///
125 /// @note `Producer` may outlive the queue and the `Consumer`.
126 Producer GetProducer();
127
128 /// Get a `MultiProducer` which makes it possible to push items into the
129 /// queue. Can be called multiple times. The resulting `MultiProducer` is
130 /// thread-safe, so it can be used simultaneously from multiple
131 /// coroutines/threads.
132 ///
133 /// @note `MultiProducer` may outlive the queue and the `Consumer`.
134 MultiProducer GetMultiProducer();
135
136 /// Get a `Consumer` which makes it possible to read items from the queue.
137 /// Can be called only once. You may not use the `Consumer` simultaneously
138 /// from multiple coroutines/threads.
139 ///
140 /// @note `Consumer` may outlive the queue and producers.
141 Consumer GetConsumer();
142
143 /// @brief Sets the limit on the queue size, pushes over this limit will block
144 /// @note This is a soft limit and may be slightly overrun under load.
145 void SetSoftMaxSize(size_t size);
146
147 /// @brief Gets the limit on the queue size
148 [[nodiscard]] size_t GetSoftMaxSize() const;
149
150 /// @brief Gets the approximate size of queue
151 [[nodiscard]] size_t GetSizeApproximate() const;
152
153 private:
154 bool Push(ProducerToken&, T&&, engine::Deadline);
155 bool PushNoblock(ProducerToken&, T&&);
156 bool DoPush(ProducerToken&, T&&);
157
158 bool Pop(ConsumerToken&, T&, engine::Deadline);
159 bool PopNoblock(ConsumerToken&, T&);
160 bool DoPop(ConsumerToken&, T&);
161
162 void MarkConsumerIsDead();
163 void MarkProducerIsDead();
164
165 // Resolves to boost::lockfree::queue<T> except for std::unique_ptr<T>
166 // specialization. In that case, resolves to boost::lockfree::queue<T*>
167 typename QueueHelper::LockFreeQueue queue_{1};
168 engine::SingleConsumerEvent nonempty_event_;
169 engine::CancellableSemaphore remaining_capacity_;
170 concurrent::impl::SemaphoreCapacityControl remaining_capacity_control_;
171 std::atomic<bool> consumer_is_created_{false};
172 std::atomic<bool> consumer_is_created_and_dead_{false};
173 std::atomic<bool> producer_is_created_and_dead_{false};
174 std::atomic<size_t> producers_count_{0};
175 std::atomic<size_t> size_{0};
176};
177
178template <typename T>
179MpscQueue<T>::~MpscQueue() {
180 UASSERT(consumer_is_created_and_dead_ || !consumer_is_created_);
181 UASSERT(!producers_count_);
182 // Clear remaining items in queue. This will work for unique_ptr as well.
183 T value;
184 ConsumerToken temp_token{queue_};
185 while (PopNoblock(temp_token, value)) {
186 }
187}
188
189template <typename T>
190typename MpscQueue<T>::Producer MpscQueue<T>::GetProducer() {
191 producers_count_++;
192 producer_is_created_and_dead_ = false;
193 nonempty_event_.Send();
194 return Producer(this->shared_from_this(), EmplaceEnabler{});
195}
196
197template <typename T>
198typename MpscQueue<T>::MultiProducer MpscQueue<T>::GetMultiProducer() {
199 // MultiProducer and Producer are actually the same for MpscQueue, which is an
200 // implementation detail.
201 return GetProducer();
202}
203
204template <typename T>
205typename MpscQueue<T>::Consumer MpscQueue<T>::GetConsumer() {
206 UINVARIANT(!consumer_is_created_,
207 "MpscQueue::Consumer must only be obtained a single time");
208 consumer_is_created_ = true;
209 return Consumer(this->shared_from_this(), EmplaceEnabler{});
210}
211
212template <typename T>
213void MpscQueue<T>::SetSoftMaxSize(size_t max_size) {
214 remaining_capacity_control_.SetCapacity(max_size);
215}
216
217template <typename T>
218size_t MpscQueue<T>::GetSoftMaxSize() const {
219 return remaining_capacity_control_.GetCapacity();
220}
221
222template <typename T>
223size_t MpscQueue<T>::GetSizeApproximate() const {
224 return size_;
225}
226
227template <typename T>
228bool MpscQueue<T>::Push(ProducerToken& token, T&& value,
229 engine::Deadline deadline) {
230 return remaining_capacity_.try_lock_shared_until(deadline) &&
231 DoPush(token, std::move(value));
232}
233
234template <typename T>
235bool MpscQueue<T>::PushNoblock(ProducerToken& token, T&& value) {
236 return remaining_capacity_.try_lock_shared() &&
237 DoPush(token, std::move(value));
238}
239
240template <typename T>
241bool MpscQueue<T>::DoPush(ProducerToken& /*unused*/, T&& value) {
242 if (consumer_is_created_and_dead_) {
243 remaining_capacity_.unlock_shared();
244 return false;
245 }
246
247 QueueHelper::Push(queue_, std::move(value));
248 ++size_;
249 nonempty_event_.Send();
250
251 return true;
252}
253
254template <typename T>
255bool MpscQueue<T>::Pop(ConsumerToken& token, T& value,
256 engine::Deadline deadline) {
257 while (!DoPop(token, value)) {
258 if (producer_is_created_and_dead_ ||
259 !nonempty_event_.WaitForEventUntil(deadline)) {
260 // Producer might have pushed something in queue between .pop()
261 // and !producer_is_created_and_dead_ check. Check twice to avoid
262 // TOCTOU.
263 return DoPop(token, value);
264 }
265 }
266 return true;
267}
268
269template <typename T>
270bool MpscQueue<T>::PopNoblock(ConsumerToken& token, T& value) {
271 return DoPop(token, value);
272}
273
274template <typename T>
275bool MpscQueue<T>::DoPop(ConsumerToken& /*unused*/, T& value) {
276 if (QueueHelper::Pop(queue_, value)) {
277 --size_;
278 remaining_capacity_.unlock_shared();
279 nonempty_event_.Reset();
280 return true;
281 }
282 return false;
283}
284
285template <typename T>
286void MpscQueue<T>::MarkConsumerIsDead() {
287 consumer_is_created_and_dead_ = true;
288 remaining_capacity_control_.SetCapacityOverride(0);
289}
290
291template <typename T>
292void MpscQueue<T>::MarkProducerIsDead() {
293 producer_is_created_and_dead_ = (--producers_count_ == 0);
294 nonempty_event_.Send();
295}
296
297} // namespace concurrent
298
299USERVER_NAMESPACE_END