userver: userver/concurrent/mpsc_queue.hpp Source File
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages Concepts
mpsc_queue.hpp
Go to the documentation of this file.
1#pragma once
2
3/// @file userver/concurrent/mpsc_queue.hpp
4/// @brief Multiple producer, single consumer queue
5
6#include <atomic>
7#include <limits>
8#include <memory>
9
10#include <userver/concurrent/impl/intrusive_mpsc_queue.hpp>
11#include <userver/concurrent/impl/semaphore_capacity_control.hpp>
12#include <userver/concurrent/queue_helpers.hpp>
13#include <userver/engine/deadline.hpp>
14#include <userver/engine/semaphore.hpp>
15#include <userver/engine/single_consumer_event.hpp>
16#include <userver/engine/task/cancel.hpp>
17#include <userver/utils/assert.hpp>
18
19USERVER_NAMESPACE_BEGIN
20
21namespace concurrent {
22
23namespace impl {
24
25template <typename T>
26struct MpscQueueNode final : public SinglyLinkedBaseHook {
27 explicit MpscQueueNode(T&& value) : value(std::move(value)) {}
28
29 T value;
30};
31
32} // namespace impl
33
34/// @ingroup userver_concurrency
35///
36/// Multiple producer, single consumer queue
37///
38/// ## Example usage:
39///
40/// @snippet concurrent/mpsc_queue_test.cpp Sample concurrent::MpscQueue usage
41///
42/// @see @ref scripts/docs/en/userver/synchronization.md
43template <typename T>
44class MpscQueue final : public std::enable_shared_from_this<MpscQueue<T>> {
45 struct EmplaceEnabler final {
46 // Disable {}-initialization in Queue's constructor
47 explicit EmplaceEnabler() = default;
48 };
49
50 using Node = impl::MpscQueueNode<T>;
51
52 using ProducerToken = impl::NoToken;
53 using ConsumerToken = impl::NoToken;
54
55 friend class Producer<MpscQueue, ProducerToken, EmplaceEnabler>;
56 friend class Consumer<MpscQueue, ConsumerToken, EmplaceEnabler>;
57
58public:
59 static constexpr std::size_t kUnbounded = std::numeric_limits<std::size_t>::max();
60
61 using ValueType = T;
62
63 using Producer = concurrent::Producer<MpscQueue, ProducerToken, EmplaceEnabler>;
64 using Consumer = concurrent::Consumer<MpscQueue, ConsumerToken, EmplaceEnabler>;
65 using MultiProducer = concurrent::Producer<MpscQueue, impl::NoToken, EmplaceEnabler>;
66
67 /// @cond
68 // For internal use only
69 explicit MpscQueue(std::size_t max_size, EmplaceEnabler /*unused*/)
70 : remaining_capacity_(max_size), remaining_capacity_control_(remaining_capacity_) {}
71
72 MpscQueue(MpscQueue&&) = delete;
73 MpscQueue(const MpscQueue&) = delete;
74 MpscQueue& operator=(MpscQueue&&) = delete;
75 MpscQueue& operator=(const MpscQueue&) = delete;
76 ~MpscQueue();
77 /// @endcond
78
79 /// Create a new queue
80 static std::shared_ptr<MpscQueue> Create(std::size_t max_size = kUnbounded) {
81 return std::make_shared<MpscQueue>(max_size, EmplaceEnabler{});
82 }
83
84 /// Get a `Producer` which makes it possible to push items into the queue.
85 /// Can be called multiple times. The resulting `Producer` is not thread-safe,
86 /// so you have to use multiple Producers of the same queue to simultaneously
87 /// write from multiple coroutines/threads.
88 ///
89 /// @note `Producer` may outlive the queue and the `Consumer`.
90 Producer GetProducer();
91
92 /// Get a `MultiProducer` which makes it possible to push items into the
93 /// queue. Can be called multiple times. The resulting `MultiProducer` is
94 /// thread-safe, so it can be used simultaneously from multiple
95 /// coroutines/threads.
96 ///
97 /// @note `MultiProducer` may outlive the queue and the `Consumer`.
98 MultiProducer GetMultiProducer();
99
100 /// Get a `Consumer` which makes it possible to read items from the queue.
101 /// Can be called only once. You may not use the `Consumer` simultaneously
102 /// from multiple coroutines/threads.
103 ///
104 /// @note `Consumer` may outlive the queue and producers.
105 Consumer GetConsumer();
106
107 /// @brief Sets the limit on the queue size, pushes over this limit will block
108 /// @note This is a soft limit and may be slightly overrun under load.
109 void SetSoftMaxSize(size_t size);
110
111 /// @brief Gets the limit on the queue size
112 [[nodiscard]] size_t GetSoftMaxSize() const;
113
114 /// @brief Gets the approximate size of queue
115 [[nodiscard]] size_t GetSizeApproximate() const;
116
117private:
118 bool Push(ProducerToken&, T&&, engine::Deadline);
119 bool PushNoblock(ProducerToken&, T&&);
120 bool DoPush(ProducerToken&, T&&);
121
122 bool Pop(ConsumerToken&, T&, engine::Deadline);
123 bool PopNoblock(ConsumerToken&, T&);
124 bool DoPop(ConsumerToken&, T&);
125
126 void MarkConsumerIsDead();
127 void MarkProducerIsDead();
128
129 bool NoMoreProducers() const { return producer_is_created_ && producers_count_ == 0; }
130 bool NoMoreConsumers() const { return consumer_is_created_and_dead_; }
131
132 impl::IntrusiveMpscQueue<Node> queue_{};
133 engine::SingleConsumerEvent nonempty_event_{};
134 engine::CancellableSemaphore remaining_capacity_;
135 impl::SemaphoreCapacityControl remaining_capacity_control_;
136 std::atomic<bool> consumer_is_created_{false};
137 std::atomic<bool> consumer_is_created_and_dead_{false};
138 std::atomic<bool> producer_is_created_{false};
139 std::atomic<size_t> producers_count_{0};
140 std::atomic<size_t> size_{0};
141};
142
143/// @cond
144template <typename T>
145MpscQueue<T>::~MpscQueue() {
146 UASSERT(consumer_is_created_and_dead_ || !consumer_is_created_);
147 UASSERT(!producers_count_);
148 // Clear remaining items in queue.
149 while (const auto node = std::unique_ptr<Node>{queue_.TryPopBlocking()}) {
150 remaining_capacity_.unlock_shared();
151 }
152}
153/// @endcond
154
155template <typename T>
156typename MpscQueue<T>::Producer MpscQueue<T>::GetProducer() {
157 ++producers_count_;
158 producer_is_created_ = true;
159 nonempty_event_.Send();
160 return Producer(this->shared_from_this(), EmplaceEnabler{});
161}
162
163template <typename T>
164typename MpscQueue<T>::MultiProducer MpscQueue<T>::GetMultiProducer() {
165 // MultiProducer and Producer are actually the same for MpscQueue, which is an
166 // implementation detail.
167 return GetProducer();
168}
169
170template <typename T>
171typename MpscQueue<T>::Consumer MpscQueue<T>::GetConsumer() {
172 UINVARIANT(!consumer_is_created_, "MpscQueue::Consumer must only be obtained a single time");
173 consumer_is_created_ = true;
174 return Consumer(this->shared_from_this(), EmplaceEnabler{});
175}
176
177template <typename T>
178void MpscQueue<T>::SetSoftMaxSize(size_t max_size) {
179 remaining_capacity_control_.SetCapacity(max_size);
180}
181
182template <typename T>
183size_t MpscQueue<T>::GetSoftMaxSize() const {
184 return remaining_capacity_control_.GetCapacity();
185}
186
187template <typename T>
188size_t MpscQueue<T>::GetSizeApproximate() const {
189 return size_;
190}
191
192template <typename T>
193bool MpscQueue<T>::Push(ProducerToken& token, T&& value, engine::Deadline deadline) {
194 return remaining_capacity_.try_lock_shared_until(deadline) && DoPush(token, std::move(value));
195}
196
197template <typename T>
198bool MpscQueue<T>::PushNoblock(ProducerToken& token, T&& value) {
199 return remaining_capacity_.try_lock_shared() && DoPush(token, std::move(value));
200}
201
202template <typename T>
203bool MpscQueue<T>::DoPush(ProducerToken& /*unused*/, T&& value) {
204 if (NoMoreConsumers()) {
205 remaining_capacity_.unlock_shared();
206 return false;
207 }
208
209 auto node = std::make_unique<Node>(std::move(value));
210 queue_.Push(*node);
211 (void)node.release();
212
213 ++size_;
214 nonempty_event_.Send();
215
216 return true;
217}
218
219template <typename T>
220bool MpscQueue<T>::Pop(ConsumerToken& token, T& value, engine::Deadline deadline) {
221 bool no_more_producers = false;
222 const bool success = nonempty_event_.WaitUntil(deadline, [&] {
223 if (DoPop(token, value)) {
224 return true;
225 }
226 if (NoMoreProducers()) {
227 // Producer might have pushed something in queue between .pop()
228 // and !producer_is_created_and_dead_ check. Check twice to avoid
229 // TOCTOU.
230 if (!DoPop(token, value)) {
231 no_more_producers = true;
232 }
233 return true;
234 }
235 return false;
236 });
237 return success && !no_more_producers;
238}
239
240template <typename T>
241bool MpscQueue<T>::PopNoblock(ConsumerToken& token, T& value) {
242 return DoPop(token, value);
243}
244
245template <typename T>
246bool MpscQueue<T>::DoPop(ConsumerToken& /*unused*/, T& value) {
247 if (const auto node = std::unique_ptr<Node>{queue_.TryPopWeak()}) {
248 value = std::move(node->value);
249
250 --size_;
251 remaining_capacity_.unlock_shared();
252 nonempty_event_.Reset();
253 return true;
254 }
255 return false;
256}
257
258template <typename T>
259void MpscQueue<T>::MarkConsumerIsDead() {
260 consumer_is_created_and_dead_ = true;
261 remaining_capacity_control_.SetCapacityOverride(0);
262}
263
264template <typename T>
265void MpscQueue<T>::MarkProducerIsDead() {
266 if (--producers_count_ == 0) {
267 nonempty_event_.Send();
268 }
269}
270
271} // namespace concurrent
272
273USERVER_NAMESPACE_END