userver: userver/concurrent/queue.hpp Source File
⚠️ This is the documentation for an old userver version. Click here to switch to the latest version.
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages Concepts
queue.hpp
1#pragma once
2
3#include <atomic>
4#include <limits>
5#include <memory>
6
7#include <moodycamel/concurrentqueue.h>
8
9#include <userver/concurrent/impl/semaphore_capacity_control.hpp>
10#include <userver/concurrent/queue_helpers.hpp>
11#include <userver/engine/deadline.hpp>
12#include <userver/engine/semaphore.hpp>
13#include <userver/engine/single_consumer_event.hpp>
14#include <userver/engine/task/cancel.hpp>
15#include <userver/utils/assert.hpp>
16#include <userver/utils/atomic.hpp>
17
18USERVER_NAMESPACE_BEGIN
19
20namespace concurrent {
21
22namespace impl {
23
24template <bool MultipleProducer, bool MultipleConsumer>
25struct SimpleQueuePolicy {
26 template <typename T>
27 static constexpr std::size_t GetElementSize(const T&) {
28 return 1;
29 }
30
31 static constexpr bool kIsMultipleProducer{MultipleProducer};
32 static constexpr bool kIsMultipleConsumer{MultipleConsumer};
33};
34
35template <bool MultipleProducer, bool MultipleConsumer>
36struct ContainerQueuePolicy {
37 template <typename T>
38 static std::size_t GetElementSize(const T& value) {
39 return std::size(value);
40 }
41
42 static constexpr bool kIsMultipleProducer{MultipleProducer};
43 static constexpr bool kIsMultipleConsumer{MultipleConsumer};
44};
45
46} // namespace impl
47
48/// Queue with single and multi producer/consumer options
49///
50/// @see @ref scripts/docs/en/userver/synchronization.md
51template <typename T, typename QueuePolicy>
52class GenericQueue final
53 : public std::enable_shared_from_this<GenericQueue<T, QueuePolicy>> {
54 struct EmplaceEnabler final {
55 // Disable {}-initialization in Queue's constructor
56 explicit EmplaceEnabler() = default;
57 };
58
59 using ProducerToken =
60 std::conditional_t<QueuePolicy::kIsMultipleProducer,
61 moodycamel::ProducerToken, impl::NoToken>;
62 using ConsumerToken =
63 std::conditional_t<QueuePolicy::kIsMultipleProducer,
64 moodycamel::ConsumerToken, impl::NoToken>;
65 using MultiProducerToken = impl::MultiToken;
66 using MultiConsumerToken =
67 std::conditional_t<QueuePolicy::kIsMultipleProducer, impl::MultiToken,
68 impl::NoToken>;
69
70 using SingleProducerToken =
71 std::conditional_t<!QueuePolicy::kIsMultipleProducer,
72 moodycamel::ProducerToken, impl::NoToken>;
73
74 friend class Producer<GenericQueue, ProducerToken, EmplaceEnabler>;
75 friend class Producer<GenericQueue, MultiProducerToken, EmplaceEnabler>;
76 friend class Consumer<GenericQueue, ConsumerToken, EmplaceEnabler>;
77 friend class Consumer<GenericQueue, MultiConsumerToken, EmplaceEnabler>;
78
79 public:
80 using ValueType = T;
81
82 using Producer =
83 concurrent::Producer<GenericQueue, ProducerToken, EmplaceEnabler>;
84 using Consumer =
85 concurrent::Consumer<GenericQueue, ConsumerToken, EmplaceEnabler>;
86 using MultiProducer =
87 concurrent::Producer<GenericQueue, MultiProducerToken, EmplaceEnabler>;
88 using MultiConsumer =
89 concurrent::Consumer<GenericQueue, MultiConsumerToken, EmplaceEnabler>;
90
91 static constexpr std::size_t kUnbounded =
92 std::numeric_limits<std::size_t>::max() / 4;
93
94 /// @cond
95 // For internal use only
96 explicit GenericQueue(std::size_t max_size, EmplaceEnabler /*unused*/)
97 : queue_(),
98 single_producer_token_(queue_),
99 producer_side_(*this, std::min(max_size, kUnbounded)),
100 consumer_side_(*this) {}
101
102 ~GenericQueue() {
103 UASSERT(consumers_count_ == kCreatedAndDead || !consumers_count_);
104 UASSERT(producers_count_ == kCreatedAndDead || !producers_count_);
105
106 if (producers_count_ == kCreatedAndDead) {
107 // To allow reading the remaining items
108 consumer_side_.ResumeBlockingOnPop();
109 }
110
111 // Clear remaining items in queue
112 T value;
113 ConsumerToken token{queue_};
114 while (consumer_side_.PopNoblock(token, value)) {
115 }
116 }
117
118 GenericQueue(GenericQueue&&) = delete;
119 GenericQueue(const GenericQueue&) = delete;
120 GenericQueue& operator=(GenericQueue&&) = delete;
121 GenericQueue& operator=(const GenericQueue&) = delete;
122 /// @endcond
123
124 /// Create a new queue
125 static std::shared_ptr<GenericQueue> Create(
126 std::size_t max_size = kUnbounded) {
127 return std::make_shared<GenericQueue>(max_size, EmplaceEnabler{});
128 }
129
130 /// Get a `Producer` which makes it possible to push items into the queue.
131 /// Can be called multiple times. The resulting `Producer` is not thread-safe,
132 /// so you have to use multiple Producers of the same queue to simultaneously
133 /// write from multiple coroutines/threads.
134 ///
135 /// @note `Producer` may outlive the queue and consumers.
137 PrepareProducer();
138 return Producer(this->shared_from_this(), EmplaceEnabler{});
139 }
140
141 /// Get a `MultiProducer` which makes it possible to push items into the
142 /// queue. Can be called multiple times. The resulting `MultiProducer` is
143 /// thread-safe, so it can be used simultaneously from multiple
144 /// coroutines/threads.
145 ///
146 /// @note `MultiProducer` may outlive the queue and consumers.
147 ///
148 /// @note Prefer `Producer` tokens when possible, because `MultiProducer`
149 /// token incurs some overhead.
150 MultiProducer GetMultiProducer() {
151 static_assert(QueuePolicy::kIsMultipleProducer,
152 "Trying to obtain MultiProducer for a single-producer queue");
153 PrepareProducer();
154 return MultiProducer(this->shared_from_this(), EmplaceEnabler{});
155 }
156
157 /// Get a `Consumer` which makes it possible to read items from the queue.
158 /// Can be called multiple times. The resulting `Consumer` is not thread-safe,
159 /// so you have to use multiple `Consumer`s of the same queue to
160 /// simultaneously write from multiple coroutines/threads.
161 ///
162 /// @note `Consumer` may outlive the queue and producers.
164 PrepareConsumer();
165 return Consumer(this->shared_from_this(), EmplaceEnabler{});
166 }
167
168 /// Get a `MultiConsumer` which makes it possible to read items from the
169 /// queue. Can be called multiple times. The resulting `MultiConsumer` is
170 /// thread-safe, so it can be used simultaneously from multiple
171 /// coroutines/threads.
172 ///
173 /// @note `MultiConsumer` may outlive the queue and producers.
174 ///
175 /// @note Prefer `Consumer` tokens when possible, because `MultiConsumer`
176 /// token incurs some overhead.
178 static_assert(QueuePolicy::kIsMultipleConsumer,
179 "Trying to obtain MultiConsumer for a single-consumer queue");
180 PrepareConsumer();
181 return MultiConsumer(this->shared_from_this(), EmplaceEnabler{});
182 }
183
184 /// @brief Sets the limit on the queue size, pushes over this limit will block
185 /// @note This is a soft limit and may be slightly overrun under load.
186 void SetSoftMaxSize(std::size_t max_size) {
187 producer_side_.SetSoftMaxSize(std::min(max_size, kUnbounded));
188 }
189
190 /// @brief Gets the limit on the queue size
191 std::size_t GetSoftMaxSize() const { return producer_side_.GetSoftMaxSize(); }
192
193 /// @brief Gets the approximate size of queue
194 std::size_t GetSizeApproximate() const {
195 return producer_side_.GetSizeApproximate();
196 }
197
198 private:
199 class SingleProducerSide;
200 class MultiProducerSide;
201 class SingleConsumerSide;
202 class MultiConsumerSide;
203
204 /// Proxy-class makes synchronization of Push operations in multi or single
205 /// producer cases
206 using ProducerSide =
207 std::conditional_t<QueuePolicy::kIsMultipleProducer, MultiProducerSide,
208 SingleProducerSide>;
209
210 /// Proxy-class makes synchronization of Pop operations in multi or single
211 /// consumer cases
212 using ConsumerSide =
213 std::conditional_t<QueuePolicy::kIsMultipleConsumer, MultiConsumerSide,
214 SingleConsumerSide>;
215
216 template <typename Token>
217 [[nodiscard]] bool Push(Token& token, T&& value, engine::Deadline deadline) {
218 return producer_side_.Push(token, std::move(value), deadline);
219 }
220
221 template <typename Token>
222 [[nodiscard]] bool PushNoblock(Token& token, T&& value) {
223 return producer_side_.PushNoblock(token, std::move(value));
224 }
225
226 template <typename Token>
227 [[nodiscard]] bool Pop(Token& token, T& value, engine::Deadline deadline) {
228 return consumer_side_.Pop(token, value, deadline);
229 }
230
231 template <typename Token>
232 [[nodiscard]] bool PopNoblock(Token& token, T& value) {
233 return consumer_side_.PopNoblock(token, value);
234 }
235
236 void PrepareProducer() {
237 std::size_t old_producers_count{};
238 utils::AtomicUpdate(producers_count_, [&](auto old_value) {
239 UINVARIANT(QueuePolicy::kIsMultipleProducer || old_value != 1,
240 "Incorrect usage of queue producers");
241 old_producers_count = old_value;
242 return old_value == kCreatedAndDead ? 1 : old_value + 1;
243 });
244
245 if (old_producers_count == kCreatedAndDead) {
246 consumer_side_.ResumeBlockingOnPop();
247 }
248 }
249
250 void PrepareConsumer() {
251 std::size_t old_consumers_count{};
252 utils::AtomicUpdate(consumers_count_, [&](auto old_value) {
253 UINVARIANT(QueuePolicy::kIsMultipleConsumer || old_value != 1,
254 "Incorrect usage of queue consumers");
255 old_consumers_count = old_value;
256 return old_value == kCreatedAndDead ? 1 : old_value + 1;
257 });
258
259 if (old_consumers_count == kCreatedAndDead) {
260 producer_side_.ResumeBlockingOnPush();
261 }
262 }
263
264 void MarkConsumerIsDead() {
265 const auto new_consumers_count =
266 utils::AtomicUpdate(consumers_count_, [](auto old_value) {
267 return old_value == 1 ? kCreatedAndDead : old_value - 1;
268 });
269 if (new_consumers_count == kCreatedAndDead) {
270 producer_side_.StopBlockingOnPush();
271 }
272 }
273
274 void MarkProducerIsDead() {
275 const auto new_producers_count =
276 utils::AtomicUpdate(producers_count_, [](auto old_value) {
277 return old_value == 1 ? kCreatedAndDead : old_value - 1;
278 });
279 if (new_producers_count == kCreatedAndDead) {
280 consumer_side_.StopBlockingOnPop();
281 }
282 }
283
284 public: // TODO
285 /// @cond
286 bool NoMoreConsumers() const { return consumers_count_ == kCreatedAndDead; }
287
288 bool NoMoreProducers() const { return producers_count_ == kCreatedAndDead; }
289 /// @endcond
290
291 private:
292 template <typename Token>
293 void DoPush(Token& token, T&& value) {
294 if constexpr (std::is_same_v<Token, moodycamel::ProducerToken>) {
295 static_assert(QueuePolicy::kIsMultipleProducer);
296 queue_.enqueue(token, std::move(value));
297 } else if constexpr (std::is_same_v<Token, MultiProducerToken>) {
298 static_assert(QueuePolicy::kIsMultipleProducer);
299 queue_.enqueue(std::move(value));
300 } else {
301 static_assert(std::is_same_v<Token, impl::NoToken>);
302 static_assert(!QueuePolicy::kIsMultipleProducer);
303 queue_.enqueue(single_producer_token_, std::move(value));
304 }
305
306 consumer_side_.OnElementPushed();
307 }
308
309 template <typename Token>
310 [[nodiscard]] bool DoPop(Token& token, T& value) {
311 bool success{};
312
313 if constexpr (std::is_same_v<Token, moodycamel::ConsumerToken>) {
314 static_assert(QueuePolicy::kIsMultipleProducer);
315 success = queue_.try_dequeue(token, value);
316 } else if constexpr (std::is_same_v<Token, impl::MultiToken>) {
317 static_assert(QueuePolicy::kIsMultipleProducer);
318 success = queue_.try_dequeue(value);
319 } else {
320 static_assert(std::is_same_v<Token, impl::NoToken>);
321 static_assert(!QueuePolicy::kIsMultipleProducer);
322 success = queue_.try_dequeue_from_producer(single_producer_token_, value);
323 }
324
325 if (success) {
326 producer_side_.OnElementPopped(QueuePolicy::GetElementSize(value));
327 return true;
328 }
329
330 return false;
331 }
332
333 moodycamel::ConcurrentQueue<T> queue_{1};
334 std::atomic<std::size_t> consumers_count_{0};
335 std::atomic<std::size_t> producers_count_{0};
336
337 SingleProducerToken single_producer_token_;
338
339 ProducerSide producer_side_;
340 ConsumerSide consumer_side_;
341
342 static constexpr std::size_t kCreatedAndDead =
343 std::numeric_limits<std::size_t>::max();
344 static constexpr std::size_t kSemaphoreUnlockValue =
345 std::numeric_limits<std::size_t>::max() / 2;
346};
347
348// Single-producer ProducerSide implementation
349template <typename T, typename QueuePolicy>
350class GenericQueue<T, QueuePolicy>::SingleProducerSide final {
351 public:
352 explicit SingleProducerSide(GenericQueue& queue, std::size_t capacity)
353 : queue_(queue), used_capacity_(0), total_capacity_(capacity) {}
354
355 // Blocks if there is a consumer to Pop the current value and task
356 // shouldn't cancel and queue if full
357 template <typename Token>
358 [[nodiscard]] bool Push(Token& token, T&& value, engine::Deadline deadline) {
359 if (DoPush(token, std::move(value))) {
360 return true;
361 }
362
363 return non_full_event_.WaitForEventUntil(deadline) &&
364 // NOLINTNEXTLINE(bugprone-use-after-move)
365 DoPush(token, std::move(value));
366 }
367
368 template <typename Token>
369 [[nodiscard]] bool PushNoblock(Token& token, T&& value) {
370 return DoPush(token, std::move(value));
371 }
372
373 void OnElementPopped(std::size_t released_capacity) {
374 used_capacity_.fetch_sub(released_capacity);
375 non_full_event_.Send();
376 }
377
378 void StopBlockingOnPush() {
379 total_capacity_ += kSemaphoreUnlockValue;
380 non_full_event_.Send();
381 }
382
383 void ResumeBlockingOnPush() { total_capacity_ -= kSemaphoreUnlockValue; }
384
385 void SetSoftMaxSize(std::size_t new_capacity) {
386 const auto old_capacity = total_capacity_.exchange(new_capacity);
387 if (new_capacity > old_capacity) non_full_event_.Send();
388 }
389
390 std::size_t GetSoftMaxSize() const noexcept { return total_capacity_.load(); }
391
392 std::size_t GetSizeApproximate() const noexcept {
393 return used_capacity_.load();
394 }
395
396 private:
397 template <typename Token>
398 [[nodiscard]] bool DoPush(Token& token, T&& value) {
399 const std::size_t value_size = QueuePolicy::GetElementSize(value);
400 if (queue_.NoMoreConsumers() ||
401 used_capacity_.load() + value_size > total_capacity_.load()) {
402 return false;
403 }
404
405 used_capacity_.fetch_add(value_size);
406 queue_.DoPush(token, std::move(value));
407 non_full_event_.Reset();
408 return true;
409 }
410
411 GenericQueue& queue_;
412 engine::SingleConsumerEvent non_full_event_;
413 std::atomic<std::size_t> used_capacity_;
414 std::atomic<std::size_t> total_capacity_;
415};
416
417// Multi producer ProducerSide implementation
418template <typename T, typename QueuePolicy>
419class GenericQueue<T, QueuePolicy>::MultiProducerSide final {
420 public:
421 explicit MultiProducerSide(GenericQueue& queue, std::size_t capacity)
422 : queue_(queue),
423 remaining_capacity_(capacity),
424 remaining_capacity_control_(remaining_capacity_) {}
425
426 // Blocks if there is a consumer to Pop the current value and task
427 // shouldn't cancel and queue if full
428 template <typename Token>
429 [[nodiscard]] bool Push(Token& token, T&& value, engine::Deadline deadline) {
430 const std::size_t value_size = QueuePolicy::GetElementSize(value);
431 return remaining_capacity_.try_lock_shared_until_count(deadline,
432 value_size) &&
433 DoPush(token, std::move(value));
434 }
435
436 template <typename Token>
437 [[nodiscard]] bool PushNoblock(Token& token, T&& value) {
438 const std::size_t value_size = QueuePolicy::GetElementSize(value);
439 return remaining_capacity_.try_lock_shared_count(value_size) &&
440 DoPush(token, std::move(value));
441 }
442
443 void OnElementPopped(std::size_t value_size) {
444 remaining_capacity_.unlock_shared_count(value_size);
445 }
446
447 void StopBlockingOnPush() {
448 remaining_capacity_control_.SetCapacityOverride(0);
449 }
450
451 void ResumeBlockingOnPush() {
452 remaining_capacity_control_.RemoveCapacityOverride();
453 }
454
455 void SetSoftMaxSize(std::size_t count) {
456 remaining_capacity_control_.SetCapacity(count);
457 }
458
459 std::size_t GetSizeApproximate() const noexcept {
460 return remaining_capacity_.UsedApprox();
461 }
462
463 std::size_t GetSoftMaxSize() const noexcept {
464 return remaining_capacity_control_.GetCapacity();
465 }
466
467 private:
468 template <typename Token>
469 [[nodiscard]] bool DoPush(Token& token, T&& value) {
470 const std::size_t value_size = QueuePolicy::GetElementSize(value);
471 UASSERT(value_size > 0);
472 if (queue_.NoMoreConsumers()) {
473 remaining_capacity_.unlock_shared_count(value_size);
474 return false;
475 }
476
477 queue_.DoPush(token, std::move(value));
478 return true;
479 }
480
481 GenericQueue& queue_;
482 engine::CancellableSemaphore remaining_capacity_;
483 concurrent::impl::SemaphoreCapacityControl remaining_capacity_control_;
484};
485
486// Single consumer ConsumerSide implementation
487template <typename T, typename QueuePolicy>
488class GenericQueue<T, QueuePolicy>::SingleConsumerSide final {
489 public:
490 explicit SingleConsumerSide(GenericQueue& queue)
491 : queue_(queue), element_count_(0) {}
492
493 // Blocks only if queue is empty
494 template <typename Token>
495 [[nodiscard]] bool Pop(Token& token, T& value, engine::Deadline deadline) {
496 while (!DoPop(token, value)) {
497 if (queue_.NoMoreProducers() ||
498 !nonempty_event_.WaitForEventUntil(deadline)) {
499 // Producer might have pushed something in queue between .pop()
500 // and !producer_is_created_and_dead_ check. Check twice to avoid
501 // TOCTOU.
502 return DoPop(token, value);
503 }
504 }
505 return true;
506 }
507
508 template <typename Token>
509 [[nodiscard]] bool PopNoblock(Token& token, T& value) {
510 return DoPop(token, value);
511 }
512
513 void OnElementPushed() {
514 ++element_count_;
515 nonempty_event_.Send();
516 }
517
518 void StopBlockingOnPop() { nonempty_event_.Send(); }
519
520 void ResumeBlockingOnPop() {}
521
522 std::size_t GetElementCount() const { return element_count_; }
523
524 private:
525 template <typename Token>
526 [[nodiscard]] bool DoPop(Token& token, T& value) {
527 if (queue_.DoPop(token, value)) {
528 --element_count_;
529 nonempty_event_.Reset();
530 return true;
531 }
532 return false;
533 }
534
535 GenericQueue& queue_;
536 engine::SingleConsumerEvent nonempty_event_;
537 std::atomic<std::size_t> element_count_;
538};
539
540// Multi consumer ConsumerSide implementation
541template <typename T, typename QueuePolicy>
542class GenericQueue<T, QueuePolicy>::MultiConsumerSide final {
543 public:
544 explicit MultiConsumerSide(GenericQueue& queue)
545 : queue_(queue),
546 element_count_(kUnbounded),
547 element_count_control_(element_count_) {
548 const bool success = element_count_.try_lock_shared_count(kUnbounded);
549 UASSERT(success);
550 }
551
552 ~MultiConsumerSide() { element_count_.unlock_shared_count(kUnbounded); }
553
554 // Blocks only if queue is empty
555 template <typename Token>
556 [[nodiscard]] bool Pop(Token& token, T& value, engine::Deadline deadline) {
557 return element_count_.try_lock_shared_until(deadline) &&
558 DoPop(token, value);
559 }
560
561 template <typename Token>
562 [[nodiscard]] bool PopNoblock(Token& token, T& value) {
563 return element_count_.try_lock_shared() && DoPop(token, value);
564 }
565
566 void OnElementPushed() { element_count_.unlock_shared(); }
567
568 void StopBlockingOnPop() {
569 element_count_control_.SetCapacityOverride(kUnbounded +
570 kSemaphoreUnlockValue);
571 }
572
573 void ResumeBlockingOnPop() {
574 element_count_control_.RemoveCapacityOverride();
575 }
576
577 std::size_t GetElementCount() const {
578 const std::size_t cur_element_count = element_count_.RemainingApprox();
579 if (cur_element_count < kUnbounded) {
580 return cur_element_count;
581 } else if (cur_element_count <= kSemaphoreUnlockValue) {
582 return 0;
583 }
584 return cur_element_count - kSemaphoreUnlockValue;
585 }
586
587 private:
588 template <typename Token>
589 [[nodiscard]] bool DoPop(Token& token, T& value) {
590 while (true) {
591 if (queue_.DoPop(token, value)) {
592 return true;
593 }
594 if (queue_.NoMoreProducers()) {
595 element_count_.unlock_shared();
596 return false;
597 }
598 // We can get here if another consumer steals our element, leaving another
599 // element in a Moodycamel sub-queue that we have already passed.
600 }
601 }
602
603 GenericQueue& queue_;
604 engine::CancellableSemaphore element_count_;
605 concurrent::impl::SemaphoreCapacityControl element_count_control_;
606};
607
608/// @ingroup userver_concurrency
609///
610/// @brief Non FIFO multiple producers multiple consumers queue.
611///
612/// Items from the same producer are always delivered in the production order.
613/// Items from different producers (or when using a `MultiProducer` token) are
614/// delivered in an unspecified order. In other words, FIFO order is maintained
615/// only within producers, but not between them. This may lead to increased peak
616/// latency of item processing.
617///
618/// In exchange for this, the queue has lower contention and increased
619/// throughput compared to a conventional lock-free queue.
620///
621/// @see @ref scripts/docs/en/userver/synchronization.md
622template <typename T>
623using NonFifoMpmcQueue = GenericQueue<T, impl::SimpleQueuePolicy<true, true>>;
624
625/// @ingroup userver_concurrency
626///
627/// @brief Non FIFO multiple producers single consumer queue.
628///
629/// @see concurrent::NonFifoMpmcQueue for the description of what NonFifo means.
630/// @see @ref scripts/docs/en/userver/synchronization.md
631template <typename T>
632using NonFifoMpscQueue = GenericQueue<T, impl::SimpleQueuePolicy<true, false>>;
633
634/// @ingroup userver_concurrency
635///
636/// @brief Single producer multiple consumers queue.
637///
638/// @see @ref scripts/docs/en/userver/synchronization.md
639template <typename T>
640using SpmcQueue = GenericQueue<T, impl::SimpleQueuePolicy<false, true>>;
641
642/// @ingroup userver_concurrency
643///
644/// @brief Single producer single consumer queue.
645///
646/// @see @ref scripts/docs/en/userver/synchronization.md
647template <typename T>
648using SpscQueue = GenericQueue<T, impl::SimpleQueuePolicy<false, false>>;
649
650/// @ingroup userver_concurrency
651///
652/// @brief Single producer single consumer queue of std::string which is bounded
653/// bytes inside.
654///
655/// @see @ref scripts/docs/en/userver/synchronization.md
656using StringStreamQueue =
657 GenericQueue<std::string, impl::ContainerQueuePolicy<false, false>>;
658
659} // namespace concurrent
660
661USERVER_NAMESPACE_END