userver: userver/concurrent/queue.hpp Source File
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages Concepts
queue.hpp
Go to the documentation of this file.
1#pragma once
2
3/// @file userver/concurrent/queue.hpp
4/// @brief @copybrief concurrent::GenericQueue
5
6#include <atomic>
7#include <limits>
8#include <memory>
9
10#include <moodycamel/concurrentqueue.h>
11
12#include <userver/concurrent/impl/semaphore_capacity_control.hpp>
13#include <userver/concurrent/queue_helpers.hpp>
14#include <userver/engine/deadline.hpp>
15#include <userver/engine/semaphore.hpp>
16#include <userver/engine/single_consumer_event.hpp>
17#include <userver/engine/task/cancel.hpp>
18#include <userver/utils/assert.hpp>
19#include <userver/utils/atomic.hpp>
20
21USERVER_NAMESPACE_BEGIN
22
23namespace concurrent {
24
25/// @brief Represents the manner in which the queue enforces the configured
26/// max size limit.
27/// @see @ref GenericQueue
28/// @see @ref DefaultQueuePolicy
29enum class QueueMaxSizeMode {
30 /// No support for setting max size. Fastest.
32
33 /// Supports dynamically changing max size; supports awaiting non-fullness
34 /// in producers. Slightly slower than @ref kNone.
36};
37
38/// @brief The default queue policy for @ref GenericQueue.
39/// Custom queue policies must inherit from it.
41 /// If `true`, the queue gains support for multiple concurrent producers,
42 /// which adds some synchronization overhead. This also makes the queue
43 /// non-FIFO, because consumer(s) will randomly prioritize some producers
44 /// over the other ones.
45 static constexpr bool kIsMultipleProducer{false};
46
47 /// If `true`, the queue gains support for multiple concurrent consumers,
48 /// which adds some synchronization overhead.
49 static constexpr bool kIsMultipleConsumer{false};
50
51 /// Whether the queue has support for max size,
52 /// which adds some synchronization overhead.
54
55 /// Should return the size of an element, which accounts towards
56 /// the capacity limit. Only makes sense to set if `kHasMaxSize == true`.
57 ///
58 /// For the vast majority of queues, capacity is naturally counted
59 /// in queue elements, e.g. `GetSoftMaxSize() == 1000` means that we want
60 /// no more than 1000 elements in the queue.
61 ///
62 /// Sometimes we want a more complex limit, e.g. for a queue of strings we
63 /// want the total number of bytes in all the strings to be limited.
64 /// In that case we can set the element size equal to its `std::size`.
65 ///
66 /// @note Returning anything other than 1 adds some synchronization overhead.
67 /// @warning An element changing its capacity while inside the queue is UB.
68 /// @warning Overflow in the total queue size is UB.
69 template <typename T>
70 static constexpr std::size_t GetElementSize(const T&) {
71 return 1;
72 }
73};
74
75namespace impl {
76
77template <bool MultipleProducer, bool MultipleConsumer>
78struct SimpleQueuePolicy : public DefaultQueuePolicy {
79 static constexpr bool kIsMultipleProducer{MultipleProducer};
80 static constexpr bool kIsMultipleConsumer{MultipleConsumer};
81 static constexpr auto kMaxSizeMode = QueueMaxSizeMode::kDynamicSync;
82};
83
84template <bool MultipleProducer, bool MultipleConsumer>
85struct NoMaxSizeQueuePolicy : public DefaultQueuePolicy {
86 static constexpr bool kIsMultipleProducer{MultipleProducer};
87 static constexpr bool kIsMultipleConsumer{MultipleConsumer};
88 static constexpr auto kMaxSizeMode = QueueMaxSizeMode::kNone;
89};
90
91template <bool MultipleProducer, bool MultipleConsumer>
92struct ContainerQueuePolicy : public DefaultQueuePolicy {
93 template <typename T>
94 static std::size_t GetElementSize(const T& value) {
95 return std::size(value);
96 }
97
98 static constexpr bool kIsMultipleProducer{MultipleProducer};
99 static constexpr bool kIsMultipleConsumer{MultipleConsumer};
100};
101
102} // namespace impl
103
104/// @brief Queue with single and multi producer/consumer options.
105///
106/// @tparam T element type
107/// @tparam QueuePolicy policy type, see @ref DefaultQueuePolicy for details
108///
109/// On practice, instead of using `GenericQueue` directly, use an alias:
110///
111/// * concurrent::NonFifoMpmcQueue
112/// * concurrent::NonFifoMpscQueue
113/// * concurrent::SpmcQueue
114/// * concurrent::SpscQueue
115/// * concurrent::UnboundedNonFifoMpscQueue
116/// * concurrent::UnboundedSpmcQueue
117/// * concurrent::UnboundedSpscQueue
118/// * concurrent::StringStreamQueue
119///
120/// @see @ref concurrent_queues
121template <typename T, typename QueuePolicy>
122class GenericQueue final : public std::enable_shared_from_this<GenericQueue<T, QueuePolicy>> {
123 struct EmplaceEnabler final {
124 // Disable {}-initialization in Queue's constructor
125 explicit EmplaceEnabler() = default;
126 };
127
128 static_assert(
129 std::is_base_of_v<DefaultQueuePolicy, QueuePolicy>,
130 "QueuePolicy must inherit from concurrent::DefaultQueuePolicy"
131 );
132
133 static constexpr QueueMaxSizeMode kMaxSizeMode = QueuePolicy::kMaxSizeMode;
134
135 using ProducerToken =
136 std::conditional_t<QueuePolicy::kIsMultipleProducer, moodycamel::ProducerToken, impl::NoToken>;
137 using ConsumerToken =
138 std::conditional_t<QueuePolicy::kIsMultipleProducer, moodycamel::ConsumerToken, impl::NoToken>;
139 using MultiProducerToken = impl::MultiToken;
140 using MultiConsumerToken = std::conditional_t<QueuePolicy::kIsMultipleProducer, impl::MultiToken, impl::NoToken>;
141
142 using SingleProducerToken =
143 std::conditional_t<!QueuePolicy::kIsMultipleProducer, moodycamel::ProducerToken, impl::NoToken>;
144
145 friend class Producer<GenericQueue, ProducerToken, EmplaceEnabler>;
146 friend class Producer<GenericQueue, MultiProducerToken, EmplaceEnabler>;
147 friend class Consumer<GenericQueue, ConsumerToken, EmplaceEnabler>;
148 friend class Consumer<GenericQueue, MultiConsumerToken, EmplaceEnabler>;
149
150public:
151 using ValueType = T;
152
153 using Producer = concurrent::Producer<GenericQueue, ProducerToken, EmplaceEnabler>;
154 using Consumer = concurrent::Consumer<GenericQueue, ConsumerToken, EmplaceEnabler>;
155 using MultiProducer = concurrent::Producer<GenericQueue, MultiProducerToken, EmplaceEnabler>;
156 using MultiConsumer = concurrent::Consumer<GenericQueue, MultiConsumerToken, EmplaceEnabler>;
157
158 static constexpr std::size_t kUnbounded = std::numeric_limits<std::size_t>::max() / 4;
159
160 /// @cond
161 // For internal use only
162 explicit GenericQueue(std::size_t max_size, EmplaceEnabler /*unused*/)
163 : queue_(),
164 single_producer_token_(queue_),
165 producer_side_(*this, std::min(max_size, kUnbounded)),
166 consumer_side_(*this) {}
167
168 ~GenericQueue() {
169 UASSERT(consumers_count_ == kCreatedAndDead || !consumers_count_);
170 UASSERT(producers_count_ == kCreatedAndDead || !producers_count_);
171
172 if (producers_count_ == kCreatedAndDead) {
173 // To allow reading the remaining items
174 consumer_side_.ResumeBlockingOnPop();
175 }
176
177 // Clear remaining items in queue
178 T value;
179 ConsumerToken token{queue_};
180 while (consumer_side_.PopNoblock(token, value)) {
181 }
182 }
183
184 GenericQueue(GenericQueue&&) = delete;
185 GenericQueue(const GenericQueue&) = delete;
186 GenericQueue& operator=(GenericQueue&&) = delete;
187 GenericQueue& operator=(const GenericQueue&) = delete;
188 /// @endcond
189
190 /// Create a new queue
191 static std::shared_ptr<GenericQueue> Create(std::size_t max_size = kUnbounded) {
192 return std::make_shared<GenericQueue>(max_size, EmplaceEnabler{});
193 }
194
195 /// Get a `Producer` which makes it possible to push items into the queue.
196 /// Can be called multiple times. The resulting `Producer` is not thread-safe,
197 /// so you have to use multiple Producers of the same queue to simultaneously
198 /// write from multiple coroutines/threads.
199 ///
200 /// @note `Producer` may outlive the queue and consumers.
201 Producer GetProducer() {
202 PrepareProducer();
203 return Producer(this->shared_from_this(), EmplaceEnabler{});
204 }
205
206 /// Get a `MultiProducer` which makes it possible to push items into the
207 /// queue. Can be called multiple times. The resulting `MultiProducer` is
208 /// thread-safe, so it can be used simultaneously from multiple
209 /// coroutines/threads.
210 ///
211 /// @note `MultiProducer` may outlive the queue and consumers.
212 ///
213 /// @note Prefer `Producer` tokens when possible, because `MultiProducer`
214 /// token incurs some overhead.
215 MultiProducer GetMultiProducer() {
216 static_assert(QueuePolicy::kIsMultipleProducer, "Trying to obtain MultiProducer for a single-producer queue");
217 PrepareProducer();
218 return MultiProducer(this->shared_from_this(), EmplaceEnabler{});
219 }
220
221 /// Get a `Consumer` which makes it possible to read items from the queue.
222 /// Can be called multiple times. The resulting `Consumer` is not thread-safe,
223 /// so you have to use multiple `Consumer`s of the same queue to
224 /// simultaneously write from multiple coroutines/threads.
225 ///
226 /// @note `Consumer` may outlive the queue and producers.
227 Consumer GetConsumer() {
228 PrepareConsumer();
229 return Consumer(this->shared_from_this(), EmplaceEnabler{});
230 }
231
232 /// Get a `MultiConsumer` which makes it possible to read items from the
233 /// queue. Can be called multiple times. The resulting `MultiConsumer` is
234 /// thread-safe, so it can be used simultaneously from multiple
235 /// coroutines/threads.
236 ///
237 /// @note `MultiConsumer` may outlive the queue and producers.
238 ///
239 /// @note Prefer `Consumer` tokens when possible, because `MultiConsumer`
240 /// token incurs some overhead.
241 MultiConsumer GetMultiConsumer() {
242 static_assert(QueuePolicy::kIsMultipleConsumer, "Trying to obtain MultiConsumer for a single-consumer queue");
243 PrepareConsumer();
244 return MultiConsumer(this->shared_from_this(), EmplaceEnabler{});
245 }
246
247 /// @brief Sets the limit on the queue size, pushes over this limit will block
248 /// @note This is a soft limit and may be slightly overrun under load.
249 void SetSoftMaxSize(std::size_t max_size) { producer_side_.SetSoftMaxSize(std::min(max_size, kUnbounded)); }
250
251 /// @brief Gets the limit on the queue size
252 std::size_t GetSoftMaxSize() const { return producer_side_.GetSoftMaxSize(); }
253
254 /// @brief Gets the approximate size of queue
255 std::size_t GetSizeApproximate() const { return producer_side_.GetSizeApproximate(); }
256
257private:
258 class SingleProducerSide;
259 class MultiProducerSide;
260 class NoMaxSizeProducerSide;
261 class SingleConsumerSide;
262 class MultiConsumerSide;
263
264 /// Proxy-class makes synchronization of Push operations in multi or single
265 /// producer cases
266 using ProducerSide = std::conditional_t<
267 kMaxSizeMode == QueueMaxSizeMode::kNone,
268 NoMaxSizeProducerSide,
269 std::conditional_t< //
270 QueuePolicy::kIsMultipleProducer,
271 MultiProducerSide,
272 SingleProducerSide>>;
273
274 /// Proxy-class makes synchronization of Pop operations in multi or single
275 /// consumer cases
276 using ConsumerSide = std::conditional_t<QueuePolicy::kIsMultipleConsumer, MultiConsumerSide, SingleConsumerSide>;
277
278 template <typename Token>
279 [[nodiscard]] bool Push(Token& token, T&& value, engine::Deadline deadline) {
280 const std::size_t value_size = QueuePolicy::GetElementSize(value);
281 return producer_side_.Push(token, std::move(value), deadline, value_size);
282 }
283
284 template <typename Token>
285 [[nodiscard]] bool PushNoblock(Token& token, T&& value) {
286 const std::size_t value_size = QueuePolicy::GetElementSize(value);
287 return producer_side_.PushNoblock(token, std::move(value), value_size);
288 }
289
290 template <typename Token>
291 [[nodiscard]] bool Pop(Token& token, T& value, engine::Deadline deadline) {
292 return consumer_side_.Pop(token, value, deadline);
293 }
294
295 template <typename Token>
296 [[nodiscard]] bool PopNoblock(Token& token, T& value) {
297 return consumer_side_.PopNoblock(token, value);
298 }
299
300 void PrepareProducer() {
301 std::size_t old_producers_count{};
302 utils::AtomicUpdate(producers_count_, [&](auto old_value) {
303 UINVARIANT(QueuePolicy::kIsMultipleProducer || old_value != 1, "Incorrect usage of queue producers");
304 old_producers_count = old_value;
305 return old_value == kCreatedAndDead ? 1 : old_value + 1;
306 });
307
308 if (old_producers_count == kCreatedAndDead) {
309 consumer_side_.ResumeBlockingOnPop();
310 }
311 }
312
313 void PrepareConsumer() {
314 std::size_t old_consumers_count{};
315 utils::AtomicUpdate(consumers_count_, [&](auto old_value) {
316 UINVARIANT(QueuePolicy::kIsMultipleConsumer || old_value != 1, "Incorrect usage of queue consumers");
317 old_consumers_count = old_value;
318 return old_value == kCreatedAndDead ? 1 : old_value + 1;
319 });
320
321 if (old_consumers_count == kCreatedAndDead) {
322 producer_side_.ResumeBlockingOnPush();
323 }
324 }
325
326 void MarkConsumerIsDead() {
327 const auto new_consumers_count = utils::AtomicUpdate(consumers_count_, [](auto old_value) {
328 return old_value == 1 ? kCreatedAndDead : old_value - 1;
329 });
330 if (new_consumers_count == kCreatedAndDead) {
331 producer_side_.StopBlockingOnPush();
332 }
333 }
334
335 void MarkProducerIsDead() {
336 const auto new_producers_count = utils::AtomicUpdate(producers_count_, [](auto old_value) {
337 return old_value == 1 ? kCreatedAndDead : old_value - 1;
338 });
339 if (new_producers_count == kCreatedAndDead) {
340 consumer_side_.StopBlockingOnPop();
341 }
342 }
343
344public: // TODO
345 /// @cond
346 bool NoMoreConsumers() const { return consumers_count_ == kCreatedAndDead; }
347
348 bool NoMoreProducers() const { return producers_count_ == kCreatedAndDead; }
349 /// @endcond
350
351private:
352 template <typename Token>
353 void DoPush(Token& token, T&& value) {
354 if constexpr (std::is_same_v<Token, moodycamel::ProducerToken>) {
355 static_assert(QueuePolicy::kIsMultipleProducer);
356 queue_.enqueue(token, std::move(value));
357 } else if constexpr (std::is_same_v<Token, MultiProducerToken>) {
358 static_assert(QueuePolicy::kIsMultipleProducer);
359 queue_.enqueue(std::move(value));
360 } else {
361 static_assert(std::is_same_v<Token, impl::NoToken>);
362 static_assert(!QueuePolicy::kIsMultipleProducer);
363 queue_.enqueue(single_producer_token_, std::move(value));
364 }
365
366 consumer_side_.OnElementPushed();
367 }
368
369 template <typename Token>
370 [[nodiscard]] bool DoPop(Token& token, T& value) {
371 bool success{};
372
373 if constexpr (std::is_same_v<Token, moodycamel::ConsumerToken>) {
374 static_assert(QueuePolicy::kIsMultipleProducer);
375 success = queue_.try_dequeue(token, value);
376 } else if constexpr (std::is_same_v<Token, impl::MultiToken>) {
377 static_assert(QueuePolicy::kIsMultipleProducer);
378 success = queue_.try_dequeue(value);
379 } else {
380 static_assert(std::is_same_v<Token, impl::NoToken>);
381 static_assert(!QueuePolicy::kIsMultipleProducer);
382 success = queue_.try_dequeue_from_producer(single_producer_token_, value);
383 }
384
385 if (success) {
386 producer_side_.OnElementPopped(QueuePolicy::GetElementSize(value));
387 return true;
388 }
389
390 return false;
391 }
392
393 moodycamel::ConcurrentQueue<T> queue_{1};
394 std::atomic<std::size_t> consumers_count_{0};
395 std::atomic<std::size_t> producers_count_{0};
396
397 SingleProducerToken single_producer_token_;
398
399 ProducerSide producer_side_;
400 ConsumerSide consumer_side_;
401
402 static constexpr std::size_t kCreatedAndDead = std::numeric_limits<std::size_t>::max();
403 static constexpr std::size_t kSemaphoreUnlockValue = std::numeric_limits<std::size_t>::max() / 2;
404};
405
406template <typename T, typename QueuePolicy>
407class GenericQueue<T, QueuePolicy>::SingleProducerSide final {
408public:
409 SingleProducerSide(GenericQueue& queue, std::size_t capacity)
410 : queue_(queue), used_capacity_(0), total_capacity_(capacity) {}
411
412 // Blocks if there is a consumer to Pop the current value and task
413 // shouldn't cancel and queue if full
414 template <typename Token>
415 [[nodiscard]] bool Push(Token& token, T&& value, engine::Deadline deadline, std::size_t value_size) {
416 bool no_more_consumers = false;
417 const bool success = non_full_event_.WaitUntil(deadline, [&] {
418 if (queue_.NoMoreConsumers()) {
419 no_more_consumers = true;
420 return true;
421 }
422 if (DoPush(token, std::move(value), value_size)) {
423 return true;
424 }
425 return false;
426 });
427 return success && !no_more_consumers;
428 }
429
430 template <typename Token>
431 [[nodiscard]] bool PushNoblock(Token& token, T&& value, std::size_t value_size) {
432 return !queue_.NoMoreConsumers() && DoPush(token, std::move(value), value_size);
433 }
434
435 void OnElementPopped(std::size_t released_capacity) {
436 used_capacity_.fetch_sub(released_capacity);
437 non_full_event_.Send();
438 }
439
440 void StopBlockingOnPush() { non_full_event_.Send(); }
441
442 void ResumeBlockingOnPush() {}
443
444 void SetSoftMaxSize(std::size_t new_capacity) {
445 const auto old_capacity = total_capacity_.exchange(new_capacity);
446 if (new_capacity > old_capacity) non_full_event_.Send();
447 }
448
449 std::size_t GetSoftMaxSize() const noexcept { return total_capacity_.load(); }
450
451 std::size_t GetSizeApproximate() const noexcept { return used_capacity_.load(); }
452
453private:
454 template <typename Token>
455 [[nodiscard]] bool DoPush(Token& token, T&& value, std::size_t value_size) {
456 if (used_capacity_.load() + value_size > total_capacity_.load()) {
457 return false;
458 }
459
460 used_capacity_.fetch_add(value_size);
461 queue_.DoPush(token, std::move(value));
462 return true;
463 }
464
465 GenericQueue& queue_;
466 engine::SingleConsumerEvent non_full_event_;
467 std::atomic<std::size_t> used_capacity_;
468 std::atomic<std::size_t> total_capacity_;
469};
470
471template <typename T, typename QueuePolicy>
472class GenericQueue<T, QueuePolicy>::MultiProducerSide final {
473public:
474 MultiProducerSide(GenericQueue& queue, std::size_t capacity)
475 : queue_(queue), remaining_capacity_(capacity), remaining_capacity_control_(remaining_capacity_) {}
476
477 // Blocks if there is a consumer to Pop the current value and task
478 // shouldn't cancel and queue if full
479 template <typename Token>
480 [[nodiscard]] bool Push(Token& token, T&& value, engine::Deadline deadline, std::size_t value_size) {
481 return remaining_capacity_.try_lock_shared_until_count(deadline, value_size) &&
482 DoPush(token, std::move(value), value_size);
483 }
484
485 template <typename Token>
486 [[nodiscard]] bool PushNoblock(Token& token, T&& value, std::size_t value_size) {
487 return remaining_capacity_.try_lock_shared_count(value_size) && DoPush(token, std::move(value), value_size);
488 }
489
490 void OnElementPopped(std::size_t value_size) { remaining_capacity_.unlock_shared_count(value_size); }
491
492 void StopBlockingOnPush() { remaining_capacity_control_.SetCapacityOverride(0); }
493
494 void ResumeBlockingOnPush() { remaining_capacity_control_.RemoveCapacityOverride(); }
495
496 void SetSoftMaxSize(std::size_t count) { remaining_capacity_control_.SetCapacity(count); }
497
498 std::size_t GetSizeApproximate() const noexcept { return remaining_capacity_.UsedApprox(); }
499
500 std::size_t GetSoftMaxSize() const noexcept { return remaining_capacity_control_.GetCapacity(); }
501
502private:
503 template <typename Token>
504 [[nodiscard]] bool DoPush(Token& token, T&& value, std::size_t value_size) {
505 if (queue_.NoMoreConsumers()) {
506 remaining_capacity_.unlock_shared_count(value_size);
507 return false;
508 }
509
510 queue_.DoPush(token, std::move(value));
511 return true;
512 }
513
514 GenericQueue& queue_;
515 engine::CancellableSemaphore remaining_capacity_;
516 concurrent::impl::SemaphoreCapacityControl remaining_capacity_control_;
517};
518
519template <typename T, typename QueuePolicy>
520class GenericQueue<T, QueuePolicy>::NoMaxSizeProducerSide final {
521public:
522 NoMaxSizeProducerSide(GenericQueue& queue, std::size_t max_size) : queue_(queue) { SetSoftMaxSize(max_size); }
523
524 template <typename Token>
525 [[nodiscard]] bool Push(Token& token, T&& value, engine::Deadline /*deadline*/, std::size_t /*value_size*/) {
526 if (queue_.NoMoreConsumers()) {
527 return false;
528 }
529
530 queue_.DoPush(token, std::move(value));
531 return true;
532 }
533
534 template <typename Token>
535 [[nodiscard]] bool PushNoblock(Token& token, T&& value, std::size_t value_size) {
536 return Push(token, std::move(value), engine::Deadline{}, value_size);
537 }
538
539 void OnElementPopped(std::size_t /*released_capacity*/) {}
540
541 void StopBlockingOnPush() {}
542
543 void ResumeBlockingOnPush() {}
544
545 void SetSoftMaxSize(std::size_t new_capacity) {
546 UINVARIANT(new_capacity == kUnbounded, "Cannot set max size for a queue with QueueMaxSizeMode::kNone");
547 }
548
549 std::size_t GetSoftMaxSize() const noexcept { return kUnbounded; }
550
551 std::size_t GetSizeApproximate() const noexcept { return queue_.queue_.size_approx(); }
552
553private:
554 GenericQueue& queue_;
555};
556
557template <typename T, typename QueuePolicy>
558class GenericQueue<T, QueuePolicy>::SingleConsumerSide final {
559public:
560 explicit SingleConsumerSide(GenericQueue& queue) : queue_(queue), element_count_(0) {}
561
562 // Blocks only if queue is empty
563 template <typename Token>
564 [[nodiscard]] bool Pop(Token& token, T& value, engine::Deadline deadline) {
565 bool no_more_producers = false;
566 const bool success = nonempty_event_.WaitUntil(deadline, [&] {
567 if (DoPop(token, value)) {
568 return true;
569 }
570 if (queue_.NoMoreProducers()) {
571 // Producer might have pushed something in queue between .pop()
572 // and !producer_is_created_and_dead_ check. Check twice to avoid
573 // TOCTOU.
574 if (!DoPop(token, value)) {
575 no_more_producers = true;
576 }
577 return true;
578 }
579 return false;
580 });
581 return success && !no_more_producers;
582 }
583
584 template <typename Token>
585 [[nodiscard]] bool PopNoblock(Token& token, T& value) {
586 return DoPop(token, value);
587 }
588
589 void OnElementPushed() {
590 ++element_count_;
591 nonempty_event_.Send();
592 }
593
594 void StopBlockingOnPop() { nonempty_event_.Send(); }
595
596 void ResumeBlockingOnPop() {}
597
598 std::size_t GetElementCount() const { return element_count_; }
599
600private:
601 template <typename Token>
602 [[nodiscard]] bool DoPop(Token& token, T& value) {
603 if (queue_.DoPop(token, value)) {
604 --element_count_;
605 nonempty_event_.Reset();
606 return true;
607 }
608 return false;
609 }
610
611 GenericQueue& queue_;
612 engine::SingleConsumerEvent nonempty_event_;
613 std::atomic<std::size_t> element_count_;
614};
615
616template <typename T, typename QueuePolicy>
617class GenericQueue<T, QueuePolicy>::MultiConsumerSide final {
618public:
619 explicit MultiConsumerSide(GenericQueue& queue)
620 : queue_(queue), element_count_(kUnbounded), element_count_control_(element_count_) {
621 const bool success = element_count_.try_lock_shared_count(kUnbounded);
622 UASSERT(success);
623 }
624
625 ~MultiConsumerSide() { element_count_.unlock_shared_count(kUnbounded); }
626
627 // Blocks only if queue is empty
628 template <typename Token>
629 [[nodiscard]] bool Pop(Token& token, T& value, engine::Deadline deadline) {
630 return element_count_.try_lock_shared_until(deadline) && DoPop(token, value);
631 }
632
633 template <typename Token>
634 [[nodiscard]] bool PopNoblock(Token& token, T& value) {
635 return element_count_.try_lock_shared() && DoPop(token, value);
636 }
637
638 void OnElementPushed() { element_count_.unlock_shared(); }
639
640 void StopBlockingOnPop() { element_count_control_.SetCapacityOverride(kUnbounded + kSemaphoreUnlockValue); }
641
642 void ResumeBlockingOnPop() { element_count_control_.RemoveCapacityOverride(); }
643
644 std::size_t GetElementCount() const {
645 const std::size_t cur_element_count = element_count_.RemainingApprox();
646 if (cur_element_count < kUnbounded) {
647 return cur_element_count;
648 } else if (cur_element_count <= kSemaphoreUnlockValue) {
649 return 0;
650 }
651 return cur_element_count - kSemaphoreUnlockValue;
652 }
653
654private:
655 template <typename Token>
656 [[nodiscard]] bool DoPop(Token& token, T& value) {
657 while (true) {
658 if (queue_.DoPop(token, value)) {
659 return true;
660 }
661 if (queue_.NoMoreProducers()) {
662 element_count_.unlock_shared();
663 return false;
664 }
665 // We can get here if another consumer steals our element, leaving another
666 // element in a Moodycamel sub-queue that we have already passed.
667 }
668 }
669
670 GenericQueue& queue_;
671 engine::CancellableSemaphore element_count_;
672 concurrent::impl::SemaphoreCapacityControl element_count_control_;
673};
674
675/// @ingroup userver_concurrency
676///
677/// @brief Non FIFO multiple producers multiple consumers queue.
678///
679/// Items from the same producer are always delivered in the production order.
680/// Items from different producers (or when using a `MultiProducer` token) are
681/// delivered in an unspecified order. In other words, FIFO order is maintained
682/// only within producers, but not between them. This may lead to increased peak
683/// latency of item processing.
684///
685/// In exchange for this, the queue has lower contention and increased
686/// throughput compared to a conventional lock-free queue.
687///
688/// @see @ref scripts/docs/en/userver/synchronization.md
689template <typename T>
690using NonFifoMpmcQueue = GenericQueue<T, impl::SimpleQueuePolicy<true, true>>;
691
692/// @ingroup userver_concurrency
693///
694/// @brief Non FIFO multiple producers single consumer queue.
695///
696/// @see concurrent::NonFifoMpmcQueue for the description of what NonFifo means.
697/// @see @ref scripts/docs/en/userver/synchronization.md
698template <typename T>
699using NonFifoMpscQueue = GenericQueue<T, impl::SimpleQueuePolicy<true, false>>;
700
701/// @ingroup userver_concurrency
702///
703/// @brief Single producer multiple consumers queue.
704///
705/// @see @ref scripts/docs/en/userver/synchronization.md
706template <typename T>
707using SpmcQueue = GenericQueue<T, impl::SimpleQueuePolicy<false, true>>;
708
709/// @ingroup userver_concurrency
710///
711/// @brief Single producer single consumer queue.
712///
713/// @see @ref scripts/docs/en/userver/synchronization.md
714template <typename T>
715using SpscQueue = GenericQueue<T, impl::SimpleQueuePolicy<false, false>>;
716
717namespace impl {
718
719/// @ingroup userver_concurrency
720///
721/// @brief Like @see NonFifoMpmcQueue, but does not support setting max size and is thus slightly faster.
722///
723/// @warning The current implementation suffers from performance issues in multi-producer scenario under heavy load.
724/// Precisely speaking, producers always take priority over consumers (breaking thread fairness), and consumers starve,
725/// leading to increased latencies to the point of OOM. Use other queue types (unbounded or not) for the time being.
726///
727/// @see @ref scripts/docs/en/userver/synchronization.md
728template <typename T>
729using UnfairUnboundedNonFifoMpmcQueue = GenericQueue<T, impl::NoMaxSizeQueuePolicy<true, true>>;
730
731} // namespace impl
732
733/// @ingroup userver_concurrency
734///
735/// @brief Like @see NonFifoMpscQueue, but does not support setting max size and is thus slightly faster.
736///
737/// @see @ref scripts/docs/en/userver/synchronization.md
738template <typename T>
739using UnboundedNonFifoMpscQueue = GenericQueue<T, impl::NoMaxSizeQueuePolicy<true, false>>;
740
741/// @ingroup userver_concurrency
742///
743/// @brief Like @see SpmcQueue, but does not support setting max size and is thus slightly faster.
744///
745/// @see @ref scripts/docs/en/userver/synchronization.md
746template <typename T>
747using UnboundedSpmcQueue = GenericQueue<T, impl::NoMaxSizeQueuePolicy<false, true>>;
748
749/// @ingroup userver_concurrency
750///
751/// @brief Like @see SpscQueue, but does not support setting max size and is thus slightly faster.
752///
753/// @see @ref scripts/docs/en/userver/synchronization.md
754template <typename T>
755using UnboundedSpscQueue = GenericQueue<T, impl::NoMaxSizeQueuePolicy<false, false>>;
756
757/// @ingroup userver_concurrency
758///
759/// @brief Single producer single consumer queue of std::string which is bounded by the total bytes inside the strings.
760///
761/// @see @ref scripts/docs/en/userver/synchronization.md
762using StringStreamQueue = GenericQueue<std::string, impl::ContainerQueuePolicy<false, false>>;
763
764} // namespace concurrent
765
766USERVER_NAMESPACE_END