userver: userver/concurrent/queue.hpp Source File
Loading...
Searching...
No Matches
queue.hpp
Go to the documentation of this file.
1#pragma once
2
3/// @file userver/concurrent/queue.hpp
4/// @brief @copybrief concurrent::GenericQueue
5
6#include <atomic>
7#include <limits>
8#include <memory>
9
10#include <moodycamel/concurrentqueue.h>
11
12#include <userver/concurrent/impl/semaphore_capacity_control.hpp>
13#include <userver/concurrent/queue_helpers.hpp>
14#include <userver/engine/deadline.hpp>
15#include <userver/engine/semaphore.hpp>
16#include <userver/engine/single_consumer_event.hpp>
17#include <userver/engine/task/cancel.hpp>
18#include <userver/utils/assert.hpp>
19#include <userver/utils/atomic.hpp>
20
21USERVER_NAMESPACE_BEGIN
22
23namespace concurrent {
24
25/// @brief Represents the manner in which the queue enforces the configured
26/// max size limit.
27/// @see @ref GenericQueue
28/// @see @ref DefaultQueuePolicy
29enum class QueueMaxSizeMode {
30 /// No support for setting max size. Fastest.
31 kNone,
32
33 /// Supports dynamically changing max size; supports awaiting non-fullness
34 /// in producers. Slightly slower than @ref kNone.
36};
37
38/// @brief The default queue policy for @ref GenericQueue.
39/// Custom queue policies must inherit from it.
41 /// If `true`, the queue gains support for multiple concurrent producers,
42 /// which adds some synchronization overhead. This also makes the queue
43 /// non-FIFO, because consumer(s) will randomly prioritize some producers
44 /// over the other ones.
45 static constexpr bool kIsMultipleProducer{false};
46
47 /// If `true`, the queue gains support for multiple concurrent consumers,
48 /// which adds some synchronization overhead.
49 static constexpr bool kIsMultipleConsumer{false};
50
51 /// Whether the queue has support for max size,
52 /// which adds some synchronization overhead.
54
55 /// Should return the size of an element, which accounts towards
56 /// the capacity limit. Only makes sense to set if `kHasMaxSize == true`.
57 ///
58 /// For the vast majority of queues, capacity is naturally counted
59 /// in queue elements, e.g. `GetSoftMaxSize() == 1000` means that we want
60 /// no more than 1000 elements in the queue.
61 ///
62 /// Sometimes we want a more complex limit, e.g. for a queue of strings we
63 /// want the total number of bytes in all the strings to be limited.
64 /// In that case we can set the element size equal to its `std::size`.
65 ///
66 /// @note Returning anything other than 1 adds some synchronization overhead.
67 /// @warning An element changing its capacity while inside the queue is UB.
68 /// @warning Overflow in the total queue size is UB.
69 template <typename T>
70 static constexpr std::size_t GetElementSize(const T&) {
71 return 1;
72 }
73};
74
75namespace impl {
76
77template <bool MultipleProducer, bool MultipleConsumer>
78struct SimpleQueuePolicy : public DefaultQueuePolicy {
79 static constexpr bool kIsMultipleProducer{MultipleProducer};
80 static constexpr bool kIsMultipleConsumer{MultipleConsumer};
81 static constexpr auto kMaxSizeMode = QueueMaxSizeMode::kDynamicSync;
82};
83
84template <bool MultipleProducer, bool MultipleConsumer>
85struct NoMaxSizeQueuePolicy : public DefaultQueuePolicy {
86 static constexpr bool kIsMultipleProducer{MultipleProducer};
87 static constexpr bool kIsMultipleConsumer{MultipleConsumer};
88 static constexpr auto kMaxSizeMode = QueueMaxSizeMode::kNone;
89};
90
91template <bool MultipleProducer, bool MultipleConsumer>
92struct ContainerQueuePolicy : public DefaultQueuePolicy {
93 template <typename T>
94 static std::size_t GetElementSize(const T& value) {
95 return std::size(value);
96 }
97
98 static constexpr bool kIsMultipleProducer{MultipleProducer};
99 static constexpr bool kIsMultipleConsumer{MultipleConsumer};
100};
101
102} // namespace impl
103
104/// @brief Queue with single and multi producer/consumer options.
105///
106/// @tparam T element type
107/// @tparam QueuePolicy policy type, see @ref DefaultQueuePolicy for details
108///
109/// On practice, instead of using `GenericQueue` directly, use an alias:
110///
111/// * concurrent::NonFifoMpmcQueue
112/// * concurrent::NonFifoMpscQueue
113/// * concurrent::SpmcQueue
114/// * concurrent::SpscQueue
115/// * concurrent::UnboundedNonFifoMpscQueue
116/// * concurrent::UnboundedSpmcQueue
117/// * concurrent::UnboundedSpscQueue
118/// * concurrent::StringStreamQueue
119///
120/// @see @ref concurrent_queues
121template <typename T, typename QueuePolicy>
122class GenericQueue final : public std::enable_shared_from_this<GenericQueue<T, QueuePolicy>> {
123 struct EmplaceEnabler final {
124 // Disable {}-initialization in Queue's constructor
125 explicit EmplaceEnabler() = default;
126 };
127
128 static_assert(
129 std::is_base_of_v<DefaultQueuePolicy, QueuePolicy>,
130 "QueuePolicy must inherit from concurrent::DefaultQueuePolicy"
131 );
132
133 static constexpr QueueMaxSizeMode kMaxSizeMode = QueuePolicy::kMaxSizeMode;
134
135 using ProducerToken =
136 std::conditional_t<QueuePolicy::kIsMultipleProducer, moodycamel::ProducerToken, impl::NoToken>;
137 using ConsumerToken =
138 std::conditional_t<QueuePolicy::kIsMultipleProducer, moodycamel::ConsumerToken, impl::NoToken>;
139 using MultiProducerToken = impl::MultiToken;
140 using MultiConsumerToken = std::conditional_t<QueuePolicy::kIsMultipleProducer, impl::MultiToken, impl::NoToken>;
141
142 using SingleProducerToken =
143 std::conditional_t<!QueuePolicy::kIsMultipleProducer, moodycamel::ProducerToken, impl::NoToken>;
144
145 friend class Producer<GenericQueue, ProducerToken, EmplaceEnabler>;
146 friend class Producer<GenericQueue, MultiProducerToken, EmplaceEnabler>;
147 friend class Consumer<GenericQueue, ConsumerToken, EmplaceEnabler>;
148 friend class Consumer<GenericQueue, MultiConsumerToken, EmplaceEnabler>;
149
150public:
151 using ValueType = T;
152
153 using Producer = concurrent::Producer<GenericQueue, ProducerToken, EmplaceEnabler>;
154 using Consumer = concurrent::Consumer<GenericQueue, ConsumerToken, EmplaceEnabler>;
155 using MultiProducer = concurrent::Producer<GenericQueue, MultiProducerToken, EmplaceEnabler>;
156 using MultiConsumer = concurrent::Consumer<GenericQueue, MultiConsumerToken, EmplaceEnabler>;
157
158 static constexpr std::size_t kUnbounded = std::numeric_limits<std::size_t>::max() / 4;
159
160 /// @cond
161 // For internal use only
162 explicit GenericQueue(std::size_t max_size, EmplaceEnabler /*unused*/)
163 : queue_(),
164 single_producer_token_(queue_),
165 producer_side_(*this, std::min(max_size, kUnbounded)),
166 consumer_side_(*this) {}
167
168 ~GenericQueue() {
169 UASSERT(consumers_count_ == kCreatedAndDead || !consumers_count_);
170 UASSERT(producers_count_ == kCreatedAndDead || !producers_count_);
171
172 if (producers_count_ == kCreatedAndDead) {
173 // To allow reading the remaining items
174 consumer_side_.ResumeBlockingOnPop();
175 }
176
177 // Clear remaining items in queue
178 T value;
179 ConsumerToken token{queue_};
180 while (consumer_side_.PopNoblock(token, value)) {
181 }
182 }
183
184 GenericQueue(GenericQueue&&) = delete;
185 GenericQueue(const GenericQueue&) = delete;
186 GenericQueue& operator=(GenericQueue&&) = delete;
187 GenericQueue& operator=(const GenericQueue&) = delete;
188 /// @endcond
189
190 /// Create a new queue
191 static std::shared_ptr<GenericQueue> Create(std::size_t max_size = kUnbounded) {
192 return std::make_shared<GenericQueue>(max_size, EmplaceEnabler{});
193 }
194
195 /// Get a `Producer` which makes it possible to push items into the queue.
196 /// Can be called multiple times. The resulting `Producer` is not thread-safe,
197 /// so you have to use multiple Producers of the same queue to simultaneously
198 /// write from multiple coroutines/threads.
199 ///
200 /// @note `Producer` may outlive the queue and consumers.
202 PrepareProducer();
203 return Producer(this->shared_from_this(), EmplaceEnabler{});
204 }
205
206 /// Get a `MultiProducer` which makes it possible to push items into the
207 /// queue. Can be called multiple times. The resulting `MultiProducer` is
208 /// thread-safe, so it can be used simultaneously from multiple
209 /// coroutines/threads.
210 ///
211 /// @note `MultiProducer` may outlive the queue and consumers.
212 ///
213 /// @note Prefer `Producer` tokens when possible, because `MultiProducer`
214 /// token incurs some overhead.
215 MultiProducer GetMultiProducer() {
216 static_assert(QueuePolicy::kIsMultipleProducer, "Trying to obtain MultiProducer for a single-producer queue");
217 PrepareProducer();
218 return MultiProducer(this->shared_from_this(), EmplaceEnabler{});
219 }
220
221 /// Get a `Consumer` which makes it possible to read items from the queue.
222 /// Can be called multiple times. The resulting `Consumer` is not thread-safe,
223 /// so you have to use multiple `Consumer`s of the same queue to
224 /// simultaneously write from multiple coroutines/threads.
225 ///
226 /// @note `Consumer` may outlive the queue and producers.
228 PrepareConsumer();
229 return Consumer(this->shared_from_this(), EmplaceEnabler{});
230 }
231
232 /// Get a `MultiConsumer` which makes it possible to read items from the
233 /// queue. Can be called multiple times. The resulting `MultiConsumer` is
234 /// thread-safe, so it can be used simultaneously from multiple
235 /// coroutines/threads.
236 ///
237 /// @note `MultiConsumer` may outlive the queue and producers.
238 ///
239 /// @note Prefer `Consumer` tokens when possible, because `MultiConsumer`
240 /// token incurs some overhead.
242 static_assert(QueuePolicy::kIsMultipleConsumer, "Trying to obtain MultiConsumer for a single-consumer queue");
243 PrepareConsumer();
244 return MultiConsumer(this->shared_from_this(), EmplaceEnabler{});
245 }
246
247 /// @brief Sets the limit on the queue size, pushes over this limit will block
248 /// @note This is a soft limit and may be slightly overrun under load.
249 void SetSoftMaxSize(std::size_t max_size) { producer_side_.SetSoftMaxSize(std::min(max_size, kUnbounded)); }
250
251 /// @brief Gets the limit on the queue size
252 std::size_t GetSoftMaxSize() const { return producer_side_.GetSoftMaxSize(); }
253
254 /// @brief Gets the approximate size of queue
255 std::size_t GetSizeApproximate() const { return producer_side_.GetSizeApproximate(); }
256
257private:
258 class SingleProducerSide;
259 class MultiProducerSide;
260 class NoMaxSizeProducerSide;
261 class SingleConsumerSide;
262 class MultiConsumerSide;
263
264 /// Proxy-class makes synchronization of Push operations in multi or single
265 /// producer cases
266 using ProducerSide = std::conditional_t<
267 kMaxSizeMode == QueueMaxSizeMode::kNone,
268 NoMaxSizeProducerSide,
269 std::conditional_t< //
270 QueuePolicy::kIsMultipleProducer,
271 MultiProducerSide,
272 SingleProducerSide>>;
273
274 /// Proxy-class makes synchronization of Pop operations in multi or single
275 /// consumer cases
276 using ConsumerSide = std::conditional_t<QueuePolicy::kIsMultipleConsumer, MultiConsumerSide, SingleConsumerSide>;
277
278 template <typename Token>
279 [[nodiscard]] bool Push(Token& token, T&& value, engine::Deadline deadline) {
280 const std::size_t value_size = QueuePolicy::GetElementSize(value);
281 UASSERT(value_size > 0);
282 return producer_side_.Push(token, std::move(value), deadline, value_size);
283 }
284
285 template <typename Token>
286 [[nodiscard]] bool PushNoblock(Token& token, T&& value) {
287 const std::size_t value_size = QueuePolicy::GetElementSize(value);
288 UASSERT(value_size > 0);
289 return producer_side_.PushNoblock(token, std::move(value), value_size);
290 }
291
292 template <typename Token>
293 [[nodiscard]] bool Pop(Token& token, T& value, engine::Deadline deadline) {
294 return consumer_side_.Pop(token, value, deadline);
295 }
296
297 template <typename Token>
298 [[nodiscard]] bool PopNoblock(Token& token, T& value) {
299 return consumer_side_.PopNoblock(token, value);
300 }
301
302 void PrepareProducer() {
303 std::size_t old_producers_count{};
304 utils::AtomicUpdate(producers_count_, [&](auto old_value) {
305 UINVARIANT(QueuePolicy::kIsMultipleProducer || old_value != 1, "Incorrect usage of queue producers");
306 old_producers_count = old_value;
307 return old_value == kCreatedAndDead ? 1 : old_value + 1;
308 });
309
310 if (old_producers_count == kCreatedAndDead) {
311 consumer_side_.ResumeBlockingOnPop();
312 }
313 }
314
315 void PrepareConsumer() {
316 std::size_t old_consumers_count{};
317 utils::AtomicUpdate(consumers_count_, [&](auto old_value) {
318 UINVARIANT(QueuePolicy::kIsMultipleConsumer || old_value != 1, "Incorrect usage of queue consumers");
319 old_consumers_count = old_value;
320 return old_value == kCreatedAndDead ? 1 : old_value + 1;
321 });
322
323 if (old_consumers_count == kCreatedAndDead) {
324 producer_side_.ResumeBlockingOnPush();
325 }
326 }
327
328 void MarkConsumerIsDead() {
329 const auto new_consumers_count = utils::AtomicUpdate(consumers_count_, [](auto old_value) {
330 return old_value == 1 ? kCreatedAndDead : old_value - 1;
331 });
332 if (new_consumers_count == kCreatedAndDead) {
333 producer_side_.StopBlockingOnPush();
334 }
335 }
336
337 void MarkProducerIsDead() {
338 const auto new_producers_count = utils::AtomicUpdate(producers_count_, [](auto old_value) {
339 return old_value == 1 ? kCreatedAndDead : old_value - 1;
340 });
341 if (new_producers_count == kCreatedAndDead) {
342 consumer_side_.StopBlockingOnPop();
343 }
344 }
345
346public: // TODO
347 /// @cond
348 bool NoMoreConsumers() const { return consumers_count_ == kCreatedAndDead; }
349
350 bool NoMoreProducers() const { return producers_count_ == kCreatedAndDead; }
351 /// @endcond
352
353private:
354 template <typename Token>
355 void DoPush(Token& token, T&& value) {
356 if constexpr (std::is_same_v<Token, moodycamel::ProducerToken>) {
357 static_assert(QueuePolicy::kIsMultipleProducer);
358 queue_.enqueue(token, std::move(value));
359 } else if constexpr (std::is_same_v<Token, MultiProducerToken>) {
360 static_assert(QueuePolicy::kIsMultipleProducer);
361 queue_.enqueue(std::move(value));
362 } else {
363 static_assert(std::is_same_v<Token, impl::NoToken>);
364 static_assert(!QueuePolicy::kIsMultipleProducer);
365 queue_.enqueue(single_producer_token_, std::move(value));
366 }
367
368 consumer_side_.OnElementPushed();
369 }
370
371 template <typename Token>
372 [[nodiscard]] bool DoPop(Token& token, T& value) {
373 bool success{};
374
375 if constexpr (std::is_same_v<Token, moodycamel::ConsumerToken>) {
376 static_assert(QueuePolicy::kIsMultipleProducer);
377 success = queue_.try_dequeue(token, value);
378 } else if constexpr (std::is_same_v<Token, impl::MultiToken>) {
379 static_assert(QueuePolicy::kIsMultipleProducer);
380 success = queue_.try_dequeue(value);
381 } else {
382 static_assert(std::is_same_v<Token, impl::NoToken>);
383 static_assert(!QueuePolicy::kIsMultipleProducer);
384 success = queue_.try_dequeue_from_producer(single_producer_token_, value);
385 }
386
387 if (success) {
388 producer_side_.OnElementPopped(QueuePolicy::GetElementSize(value));
389 return true;
390 }
391
392 return false;
393 }
394
395 moodycamel::ConcurrentQueue<T> queue_{1};
396 std::atomic<std::size_t> consumers_count_{0};
397 std::atomic<std::size_t> producers_count_{0};
398
399 SingleProducerToken single_producer_token_;
400
401 ProducerSide producer_side_;
402 ConsumerSide consumer_side_;
403
404 static constexpr std::size_t kCreatedAndDead = std::numeric_limits<std::size_t>::max();
405 static constexpr std::size_t kSemaphoreUnlockValue = std::numeric_limits<std::size_t>::max() / 2;
406};
407
408template <typename T, typename QueuePolicy>
409class GenericQueue<T, QueuePolicy>::SingleProducerSide final {
410public:
411 SingleProducerSide(GenericQueue& queue, std::size_t capacity)
412 : queue_(queue), used_capacity_(0), total_capacity_(capacity) {}
413
414 // Blocks if there is a consumer to Pop the current value and task
415 // shouldn't cancel and queue if full
416 template <typename Token>
417 [[nodiscard]] bool Push(Token& token, T&& value, engine::Deadline deadline, std::size_t value_size) {
418 bool no_more_consumers = false;
419 const bool success = non_full_event_.WaitUntil(deadline, [&] {
420 if (queue_.NoMoreConsumers()) {
421 no_more_consumers = true;
422 return true;
423 }
424 if (DoPush(token, std::move(value), value_size)) {
425 return true;
426 }
427 return false;
428 });
429 return success && !no_more_consumers;
430 }
431
432 template <typename Token>
433 [[nodiscard]] bool PushNoblock(Token& token, T&& value, std::size_t value_size) {
434 return !queue_.NoMoreConsumers() && DoPush(token, std::move(value), value_size);
435 }
436
437 void OnElementPopped(std::size_t released_capacity) {
438 used_capacity_.fetch_sub(released_capacity);
439 non_full_event_.Send();
440 }
441
442 void StopBlockingOnPush() { non_full_event_.Send(); }
443
444 void ResumeBlockingOnPush() {}
445
446 void SetSoftMaxSize(std::size_t new_capacity) {
447 const auto old_capacity = total_capacity_.exchange(new_capacity);
448 if (new_capacity > old_capacity) non_full_event_.Send();
449 }
450
451 std::size_t GetSoftMaxSize() const noexcept { return total_capacity_.load(); }
452
453 std::size_t GetSizeApproximate() const noexcept { return used_capacity_.load(); }
454
455private:
456 template <typename Token>
457 [[nodiscard]] bool DoPush(Token& token, T&& value, std::size_t value_size) {
458 if (used_capacity_.load() + value_size > total_capacity_.load()) {
459 return false;
460 }
461
462 used_capacity_.fetch_add(value_size);
463 queue_.DoPush(token, std::move(value));
464 return true;
465 }
466
467 GenericQueue& queue_;
468 engine::SingleConsumerEvent non_full_event_;
469 std::atomic<std::size_t> used_capacity_;
470 std::atomic<std::size_t> total_capacity_;
471};
472
473template <typename T, typename QueuePolicy>
474class GenericQueue<T, QueuePolicy>::MultiProducerSide final {
475public:
476 MultiProducerSide(GenericQueue& queue, std::size_t capacity)
477 : queue_(queue), remaining_capacity_(capacity), remaining_capacity_control_(remaining_capacity_) {}
478
479 // Blocks if there is a consumer to Pop the current value and task
480 // shouldn't cancel and queue if full
481 template <typename Token>
482 [[nodiscard]] bool Push(Token& token, T&& value, engine::Deadline deadline, std::size_t value_size) {
483 return remaining_capacity_.try_lock_shared_until_count(deadline, value_size) &&
484 DoPush(token, std::move(value), value_size);
485 }
486
487 template <typename Token>
488 [[nodiscard]] bool PushNoblock(Token& token, T&& value, std::size_t value_size) {
489 return remaining_capacity_.try_lock_shared_count(value_size) && DoPush(token, std::move(value), value_size);
490 }
491
492 void OnElementPopped(std::size_t value_size) { remaining_capacity_.unlock_shared_count(value_size); }
493
494 void StopBlockingOnPush() { remaining_capacity_control_.SetCapacityOverride(0); }
495
496 void ResumeBlockingOnPush() { remaining_capacity_control_.RemoveCapacityOverride(); }
497
498 void SetSoftMaxSize(std::size_t count) { remaining_capacity_control_.SetCapacity(count); }
499
500 std::size_t GetSizeApproximate() const noexcept { return remaining_capacity_.UsedApprox(); }
501
502 std::size_t GetSoftMaxSize() const noexcept { return remaining_capacity_control_.GetCapacity(); }
503
504private:
505 template <typename Token>
506 [[nodiscard]] bool DoPush(Token& token, T&& value, std::size_t value_size) {
507 if (queue_.NoMoreConsumers()) {
508 remaining_capacity_.unlock_shared_count(value_size);
509 return false;
510 }
511
512 queue_.DoPush(token, std::move(value));
513 return true;
514 }
515
516 GenericQueue& queue_;
517 engine::CancellableSemaphore remaining_capacity_;
518 concurrent::impl::SemaphoreCapacityControl remaining_capacity_control_;
519};
520
521template <typename T, typename QueuePolicy>
522class GenericQueue<T, QueuePolicy>::NoMaxSizeProducerSide final {
523public:
524 NoMaxSizeProducerSide(GenericQueue& queue, std::size_t max_size) : queue_(queue) { SetSoftMaxSize(max_size); }
525
526 template <typename Token>
527 [[nodiscard]] bool Push(Token& token, T&& value, engine::Deadline /*deadline*/, std::size_t /*value_size*/) {
528 if (queue_.NoMoreConsumers()) {
529 return false;
530 }
531
532 queue_.DoPush(token, std::move(value));
533 return true;
534 }
535
536 template <typename Token>
537 [[nodiscard]] bool PushNoblock(Token& token, T&& value, std::size_t value_size) {
538 return Push(token, std::move(value), engine::Deadline{}, value_size);
539 }
540
541 void OnElementPopped(std::size_t /*released_capacity*/) {}
542
543 void StopBlockingOnPush() {}
544
545 void ResumeBlockingOnPush() {}
546
547 void SetSoftMaxSize(std::size_t new_capacity) {
548 UINVARIANT(new_capacity == kUnbounded, "Cannot set max size for a queue with QueueMaxSizeMode::kNone");
549 }
550
551 std::size_t GetSoftMaxSize() const noexcept { return kUnbounded; }
552
553 std::size_t GetSizeApproximate() const noexcept { return queue_.queue_.size_approx(); }
554
555private:
556 GenericQueue& queue_;
557};
558
559template <typename T, typename QueuePolicy>
560class GenericQueue<T, QueuePolicy>::SingleConsumerSide final {
561public:
562 explicit SingleConsumerSide(GenericQueue& queue) : queue_(queue), element_count_(0) {}
563
564 // Blocks only if queue is empty
565 template <typename Token>
566 [[nodiscard]] bool Pop(Token& token, T& value, engine::Deadline deadline) {
567 bool no_more_producers = false;
568 const bool success = nonempty_event_.WaitUntil(deadline, [&] {
569 if (DoPop(token, value)) {
570 return true;
571 }
572 if (queue_.NoMoreProducers()) {
573 // Producer might have pushed something in queue between .pop()
574 // and !producer_is_created_and_dead_ check. Check twice to avoid
575 // TOCTOU.
576 if (!DoPop(token, value)) {
577 no_more_producers = true;
578 }
579 return true;
580 }
581 return false;
582 });
583 return success && !no_more_producers;
584 }
585
586 template <typename Token>
587 [[nodiscard]] bool PopNoblock(Token& token, T& value) {
588 return DoPop(token, value);
589 }
590
591 void OnElementPushed() {
592 ++element_count_;
593 nonempty_event_.Send();
594 }
595
596 void StopBlockingOnPop() { nonempty_event_.Send(); }
597
598 void ResumeBlockingOnPop() {}
599
600 std::size_t GetElementCount() const { return element_count_; }
601
602private:
603 template <typename Token>
604 [[nodiscard]] bool DoPop(Token& token, T& value) {
605 if (queue_.DoPop(token, value)) {
606 --element_count_;
607 nonempty_event_.Reset();
608 return true;
609 }
610 return false;
611 }
612
613 GenericQueue& queue_;
614 engine::SingleConsumerEvent nonempty_event_;
615 std::atomic<std::size_t> element_count_;
616};
617
618template <typename T, typename QueuePolicy>
619class GenericQueue<T, QueuePolicy>::MultiConsumerSide final {
620public:
621 explicit MultiConsumerSide(GenericQueue& queue)
622 : queue_(queue), element_count_(kUnbounded), element_count_control_(element_count_) {
623 const bool success = element_count_.try_lock_shared_count(kUnbounded);
624 UASSERT(success);
625 }
626
627 ~MultiConsumerSide() { element_count_.unlock_shared_count(kUnbounded); }
628
629 // Blocks only if queue is empty
630 template <typename Token>
631 [[nodiscard]] bool Pop(Token& token, T& value, engine::Deadline deadline) {
632 return element_count_.try_lock_shared_until(deadline) && DoPop(token, value);
633 }
634
635 template <typename Token>
636 [[nodiscard]] bool PopNoblock(Token& token, T& value) {
637 return element_count_.try_lock_shared() && DoPop(token, value);
638 }
639
640 void OnElementPushed() { element_count_.unlock_shared(); }
641
642 void StopBlockingOnPop() { element_count_control_.SetCapacityOverride(kUnbounded + kSemaphoreUnlockValue); }
643
644 void ResumeBlockingOnPop() { element_count_control_.RemoveCapacityOverride(); }
645
646 std::size_t GetElementCount() const {
647 const std::size_t cur_element_count = element_count_.RemainingApprox();
648 if (cur_element_count < kUnbounded) {
649 return cur_element_count;
650 } else if (cur_element_count <= kSemaphoreUnlockValue) {
651 return 0;
652 }
653 return cur_element_count - kSemaphoreUnlockValue;
654 }
655
656private:
657 template <typename Token>
658 [[nodiscard]] bool DoPop(Token& token, T& value) {
659 while (true) {
660 if (queue_.DoPop(token, value)) {
661 return true;
662 }
663 if (queue_.NoMoreProducers()) {
664 element_count_.unlock_shared();
665 return false;
666 }
667 // We can get here if another consumer steals our element, leaving another
668 // element in a Moodycamel sub-queue that we have already passed.
669 }
670 }
671
672 GenericQueue& queue_;
673 engine::CancellableSemaphore element_count_;
674 concurrent::impl::SemaphoreCapacityControl element_count_control_;
675};
676
677/// @ingroup userver_concurrency
678///
679/// @brief Non FIFO multiple producers multiple consumers queue.
680///
681/// Items from the same producer are always delivered in the production order.
682/// Items from different producers (or when using a `MultiProducer` token) are
683/// delivered in an unspecified order. In other words, FIFO order is maintained
684/// only within producers, but not between them. This may lead to increased peak
685/// latency of item processing.
686///
687/// In exchange for this, the queue has lower contention and increased
688/// throughput compared to a conventional lock-free queue.
689///
690/// @see @ref scripts/docs/en/userver/synchronization.md
691template <typename T>
692using NonFifoMpmcQueue = GenericQueue<T, impl::SimpleQueuePolicy<true, true>>;
693
694/// @ingroup userver_concurrency
695///
696/// @brief Non FIFO multiple producers single consumer queue.
697///
698/// @see concurrent::NonFifoMpmcQueue for the description of what NonFifo means.
699/// @see @ref scripts/docs/en/userver/synchronization.md
700template <typename T>
701using NonFifoMpscQueue = GenericQueue<T, impl::SimpleQueuePolicy<true, false>>;
702
703/// @ingroup userver_concurrency
704///
705/// @brief Single producer multiple consumers queue.
706///
707/// @see @ref scripts/docs/en/userver/synchronization.md
708template <typename T>
709using SpmcQueue = GenericQueue<T, impl::SimpleQueuePolicy<false, true>>;
710
711/// @ingroup userver_concurrency
712///
713/// @brief Single producer single consumer queue.
714///
715/// @see @ref scripts/docs/en/userver/synchronization.md
716template <typename T>
717using SpscQueue = GenericQueue<T, impl::SimpleQueuePolicy<false, false>>;
718
719namespace impl {
720
721/// @ingroup userver_concurrency
722///
723/// @brief Like @see NonFifoMpmcQueue, but does not support setting max size and is thus slightly faster.
724///
725/// @warning The current implementation suffers from performance issues in multi-producer scenario under heavy load.
726/// Precisely speaking, producers always take priority over consumers (breaking thread fairness), and consumers starve,
727/// leading to increased latencies to the point of OOM. Use other queue types (unbounded or not) for the time being.
728///
729/// @see @ref scripts/docs/en/userver/synchronization.md
730template <typename T>
731using UnfairUnboundedNonFifoMpmcQueue = GenericQueue<T, impl::NoMaxSizeQueuePolicy<true, true>>;
732
733} // namespace impl
734
735/// @ingroup userver_concurrency
736///
737/// @brief Like @see NonFifoMpscQueue, but does not support setting max size and is thus slightly faster.
738///
739/// @see @ref scripts/docs/en/userver/synchronization.md
740template <typename T>
741using UnboundedNonFifoMpscQueue = GenericQueue<T, impl::NoMaxSizeQueuePolicy<true, false>>;
742
743/// @ingroup userver_concurrency
744///
745/// @brief Like @see SpmcQueue, but does not support setting max size and is thus slightly faster.
746///
747/// @see @ref scripts/docs/en/userver/synchronization.md
748template <typename T>
749using UnboundedSpmcQueue = GenericQueue<T, impl::NoMaxSizeQueuePolicy<false, true>>;
750
751/// @ingroup userver_concurrency
752///
753/// @brief Like @see SpscQueue, but does not support setting max size and is thus slightly faster.
754///
755/// @see @ref scripts/docs/en/userver/synchronization.md
756template <typename T>
757using UnboundedSpscQueue = GenericQueue<T, impl::NoMaxSizeQueuePolicy<false, false>>;
758
759/// @ingroup userver_concurrency
760///
761/// @brief Single producer single consumer queue of std::string which is bounded by the total bytes inside the strings.
762///
763/// @see @ref scripts/docs/en/userver/synchronization.md
764using StringStreamQueue = GenericQueue<std::string, impl::ContainerQueuePolicy<false, false>>;
765
766} // namespace concurrent
767
768USERVER_NAMESPACE_END