userver: userver/concurrent/queue.hpp Source File
Loading...
Searching...
No Matches
queue.hpp
Go to the documentation of this file.
1#pragma once
2
3/// @file userver/concurrent/queue.hpp
4/// @brief @copybrief concurrent::GenericQueue
5
6#include <atomic>
7#include <limits>
8#include <memory>
9
10#include <moodycamel/concurrentqueue.h>
11
12#include <userver/concurrent/impl/semaphore_capacity_control.hpp>
13#include <userver/concurrent/queue_helpers.hpp>
14#include <userver/engine/deadline.hpp>
15#include <userver/engine/semaphore.hpp>
16#include <userver/engine/single_consumer_event.hpp>
17#include <userver/engine/task/cancel.hpp>
18#include <userver/utils/assert.hpp>
19#include <userver/utils/atomic.hpp>
20
21USERVER_NAMESPACE_BEGIN
22
23namespace concurrent {
24
25/// @brief Represents the manner in which the queue enforces the configured
26/// max size limit.
27/// @see @ref GenericQueue
28/// @see @ref DefaultQueuePolicy
29enum class QueueMaxSizeMode {
30 /// No support for setting max size. Fastest.
31 kNone,
32
33 /// Supports dynamically changing max size; supports awaiting non-fullness
34 /// in producers. Slightly slower than @ref kNone.
36};
37
38/// @brief The default queue policy for @ref GenericQueue.
39/// Custom queue policies must inherit from it.
41 /// If `true`, the queue gains support for multiple concurrent producers,
42 /// which adds some synchronization overhead. This also makes the queue
43 /// non-FIFO, because consumer(s) will randomly prioritize some producers
44 /// over the other ones.
45 static constexpr bool kIsMultipleProducer{false};
46
47 /// If `true`, the queue gains support for multiple concurrent consumers,
48 /// which adds some synchronization overhead.
49 static constexpr bool kIsMultipleConsumer{false};
50
51 /// Whether the queue has support for max size,
52 /// which adds some synchronization overhead.
54
55 /// Should return the size of an element, which accounts towards
56 /// the capacity limit. Only makes sense to set if `kHasMaxSize == true`.
57 ///
58 /// For the vast majority of queues, capacity is naturally counted
59 /// in queue elements, e.g. `GetSoftMaxSize() == 1000` means that we want
60 /// no more than 1000 elements in the queue.
61 ///
62 /// Sometimes we want a more complex limit, e.g. for a queue of strings we
63 /// want the total number of bytes in all the strings to be limited.
64 /// In that case we can set the element size equal to its `std::size`.
65 ///
66 /// @note Returning anything other than 1 adds some synchronization overhead.
67 /// @warning An element changing its capacity while inside the queue is UB.
68 /// @warning Overflow in the total queue size is UB.
69 template <typename T>
70 static constexpr std::size_t GetElementSize(const T&) {
71 return 1;
72 }
73};
74
75namespace impl {
76
77template <bool MultipleProducer, bool MultipleConsumer>
78struct SimpleQueuePolicy : public DefaultQueuePolicy {
79 static constexpr bool kIsMultipleProducer{MultipleProducer};
80 static constexpr bool kIsMultipleConsumer{MultipleConsumer};
81 static constexpr auto kMaxSizeMode = QueueMaxSizeMode::kDynamicSync;
82};
83
84template <bool MultipleProducer, bool MultipleConsumer>
85struct NoMaxSizeQueuePolicy : public DefaultQueuePolicy {
86 static constexpr bool kIsMultipleProducer{MultipleProducer};
87 static constexpr bool kIsMultipleConsumer{MultipleConsumer};
88 static constexpr auto kMaxSizeMode = QueueMaxSizeMode::kNone;
89};
90
91template <bool MultipleProducer, bool MultipleConsumer>
92struct ContainerQueuePolicy : public DefaultQueuePolicy {
93 template <typename T>
94 static std::size_t GetElementSize(const T& value) {
95 return std::size(value);
96 }
97
98 static constexpr bool kIsMultipleProducer{MultipleProducer};
99 static constexpr bool kIsMultipleConsumer{MultipleConsumer};
100};
101
102} // namespace impl
103
104/// @brief Queue with single and multi producer/consumer options.
105///
106/// @tparam T element type
107/// @tparam QueuePolicy policy type, see @ref DefaultQueuePolicy for details
108///
109/// On practice, instead of using `GenericQueue` directly, use an alias:
110///
111/// * concurrent::NonFifoMpmcQueue
112/// * concurrent::NonFifoMpscQueue
113/// * concurrent::SpmcQueue
114/// * concurrent::SpscQueue
115/// * concurrent::UnboundedNonFifoMpscQueue
116/// * concurrent::UnboundedSpmcQueue
117/// * concurrent::UnboundedSpscQueue
118/// * concurrent::StringStreamQueue
119///
120/// @see @ref concurrent_queues
121template <typename T, typename QueuePolicy>
122class GenericQueue final : public std::enable_shared_from_this<GenericQueue<T, QueuePolicy>> {
123 struct EmplaceEnabler final {
124 // Disable {}-initialization in Queue's constructor
125 explicit EmplaceEnabler() = default;
126 };
127
128 static_assert(
129 std::is_base_of_v<DefaultQueuePolicy, QueuePolicy>,
130 "QueuePolicy must inherit from concurrent::DefaultQueuePolicy"
131 );
132
133 static constexpr QueueMaxSizeMode kMaxSizeMode = QueuePolicy::kMaxSizeMode;
134
135 using ProducerToken =
136 std::conditional_t<QueuePolicy::kIsMultipleProducer, moodycamel::ProducerToken, impl::NoToken>;
137 using ConsumerToken =
138 std::conditional_t<QueuePolicy::kIsMultipleProducer, moodycamel::ConsumerToken, impl::NoToken>;
139 using MultiProducerToken = impl::MultiToken;
140 using MultiConsumerToken = std::conditional_t<QueuePolicy::kIsMultipleProducer, impl::MultiToken, impl::NoToken>;
141
142 using SingleProducerToken =
143 std::conditional_t<!QueuePolicy::kIsMultipleProducer, moodycamel::ProducerToken, impl::NoToken>;
144
145 friend class Producer<GenericQueue, ProducerToken, EmplaceEnabler>;
146 friend class Producer<GenericQueue, MultiProducerToken, EmplaceEnabler>;
147 friend class Consumer<GenericQueue, ConsumerToken, EmplaceEnabler>;
148 friend class Consumer<GenericQueue, MultiConsumerToken, EmplaceEnabler>;
149
150public:
151 using ValueType = T;
152
153 using Producer = concurrent::Producer<GenericQueue, ProducerToken, EmplaceEnabler>;
154 using Consumer = concurrent::Consumer<GenericQueue, ConsumerToken, EmplaceEnabler>;
155 using MultiProducer = concurrent::Producer<GenericQueue, MultiProducerToken, EmplaceEnabler>;
156 using MultiConsumer = concurrent::Consumer<GenericQueue, MultiConsumerToken, EmplaceEnabler>;
157
158 static constexpr std::size_t kUnbounded = std::numeric_limits<std::size_t>::max() / 4;
159
160 /// @cond
161 // For internal use only
162 explicit GenericQueue(std::size_t max_size, EmplaceEnabler /*unused*/)
163 : queue_(),
164 single_producer_token_(queue_),
165 producer_side_(*this, std::min(max_size, kUnbounded)),
166 consumer_side_(*this)
167 {}
168
169 ~GenericQueue() {
170 UASSERT(consumers_count_ == kCreatedAndDead || !consumers_count_);
171 UASSERT(producers_count_ == kCreatedAndDead || !producers_count_);
172
173 if (producers_count_ == kCreatedAndDead) {
174 // To allow reading the remaining items
175 consumer_side_.ResumeBlockingOnPop();
176 }
177
178 // Clear remaining items in queue
179 T value;
180 ConsumerToken token{queue_};
181 while (consumer_side_.PopNoblock(token, value)) {
182 }
183 }
184
185 GenericQueue(GenericQueue&&) = delete;
186 GenericQueue(const GenericQueue&) = delete;
187 GenericQueue& operator=(GenericQueue&&) = delete;
188 GenericQueue& operator=(const GenericQueue&) = delete;
189 /// @endcond
190
191 /// Create a new queue
192 static std::shared_ptr<GenericQueue> Create(std::size_t max_size = kUnbounded) {
193 return std::make_shared<GenericQueue>(max_size, EmplaceEnabler{});
194 }
195
196 /// Get a `Producer` which makes it possible to push items into the queue.
197 /// Can be called multiple times. The resulting `Producer` is not thread-safe,
198 /// so you have to use multiple Producers of the same queue to simultaneously
199 /// write from multiple coroutines/threads.
200 ///
201 /// @note `Producer` may outlive the queue and consumers.
202 Producer GetProducer() {
203 PrepareProducer();
204 return Producer(this->shared_from_this(), EmplaceEnabler{});
205 }
206
207 /// Get a `MultiProducer` which makes it possible to push items into the
208 /// queue. Can be called multiple times. The resulting `MultiProducer` is
209 /// thread-safe, so it can be used simultaneously from multiple
210 /// coroutines/threads.
211 ///
212 /// @note `MultiProducer` may outlive the queue and consumers.
213 ///
214 /// @note Prefer `Producer` tokens when possible, because `MultiProducer`
215 /// token incurs some overhead.
216 MultiProducer GetMultiProducer() {
217 static_assert(QueuePolicy::kIsMultipleProducer, "Trying to obtain MultiProducer for a single-producer queue");
218 PrepareProducer();
219 return MultiProducer(this->shared_from_this(), EmplaceEnabler{});
220 }
221
222 /// Get a `Consumer` which makes it possible to read items from the queue.
223 /// Can be called multiple times. The resulting `Consumer` is not thread-safe,
224 /// so you have to use multiple `Consumer`s of the same queue to
225 /// simultaneously write from multiple coroutines/threads.
226 ///
227 /// @note `Consumer` may outlive the queue and producers.
228 Consumer GetConsumer() {
229 PrepareConsumer();
230 return Consumer(this->shared_from_this(), EmplaceEnabler{});
231 }
232
233 /// Get a `MultiConsumer` which makes it possible to read items from the
234 /// queue. Can be called multiple times. The resulting `MultiConsumer` is
235 /// thread-safe, so it can be used simultaneously from multiple
236 /// coroutines/threads.
237 ///
238 /// @note `MultiConsumer` may outlive the queue and producers.
239 ///
240 /// @note Prefer `Consumer` tokens when possible, because `MultiConsumer`
241 /// token incurs some overhead.
242 MultiConsumer GetMultiConsumer() {
243 static_assert(QueuePolicy::kIsMultipleConsumer, "Trying to obtain MultiConsumer for a single-consumer queue");
244 PrepareConsumer();
245 return MultiConsumer(this->shared_from_this(), EmplaceEnabler{});
246 }
247
248 /// @brief Sets the limit on the queue size, pushes over this limit will block
249 /// @note This is a soft limit and may be slightly overrun under load.
250 void SetSoftMaxSize(std::size_t max_size) { producer_side_.SetSoftMaxSize(std::min(max_size, kUnbounded)); }
251
252 /// @brief Gets the limit on the queue size
253 std::size_t GetSoftMaxSize() const { return producer_side_.GetSoftMaxSize(); }
254
255 /// @brief Gets the approximate size of queue
256 std::size_t GetSizeApproximate() const { return producer_side_.GetSizeApproximate(); }
257
258private:
259 class SingleProducerSide;
260 class MultiProducerSide;
261 class NoMaxSizeProducerSide;
262 class SingleConsumerSide;
263 class MultiConsumerSide;
264
265 /// Proxy-class makes synchronization of Push operations in multi or single
266 /// producer cases
267 using ProducerSide = std::conditional_t<
268 kMaxSizeMode == QueueMaxSizeMode::kNone,
269 NoMaxSizeProducerSide,
270 std::conditional_t< //
271 QueuePolicy::kIsMultipleProducer,
272 MultiProducerSide,
273 SingleProducerSide>>;
274
275 /// Proxy-class makes synchronization of Pop operations in multi or single
276 /// consumer cases
277 using ConsumerSide = std::conditional_t<QueuePolicy::kIsMultipleConsumer, MultiConsumerSide, SingleConsumerSide>;
278
279 template <typename Token>
280 [[nodiscard]] bool Push(Token& token, T&& value, engine::Deadline deadline) {
281 const std::size_t value_size = QueuePolicy::GetElementSize(value);
282 return producer_side_.Push(token, std::move(value), deadline, value_size);
283 }
284
285 template <typename Token>
286 [[nodiscard]] bool PushNoblock(Token& token, T&& value) {
287 const std::size_t value_size = QueuePolicy::GetElementSize(value);
288 return producer_side_.PushNoblock(token, std::move(value), value_size);
289 }
290
291 template <typename Token>
292 [[nodiscard]] bool Pop(Token& token, T& value, engine::Deadline deadline) {
293 return consumer_side_.Pop(token, value, deadline);
294 }
295
296 template <typename Token>
297 [[nodiscard]] bool PopNoblock(Token& token, T& value) {
298 return consumer_side_.PopNoblock(token, value);
299 }
300
301 void PrepareProducer() {
302 std::size_t old_producers_count{};
303 utils::AtomicUpdate(producers_count_, [&](auto old_value) {
304 UINVARIANT(QueuePolicy::kIsMultipleProducer || old_value != 1, "Incorrect usage of queue producers");
305 old_producers_count = old_value;
306 return old_value == kCreatedAndDead ? 1 : old_value + 1;
307 });
308
309 if (old_producers_count == kCreatedAndDead) {
310 consumer_side_.ResumeBlockingOnPop();
311 }
312 }
313
314 void PrepareConsumer() {
315 std::size_t old_consumers_count{};
316 utils::AtomicUpdate(consumers_count_, [&](auto old_value) {
317 UINVARIANT(QueuePolicy::kIsMultipleConsumer || old_value != 1, "Incorrect usage of queue consumers");
318 old_consumers_count = old_value;
319 return old_value == kCreatedAndDead ? 1 : old_value + 1;
320 });
321
322 if (old_consumers_count == kCreatedAndDead) {
323 producer_side_.ResumeBlockingOnPush();
324 }
325 }
326
327 void MarkConsumerIsDead() {
328 const auto new_consumers_count = utils::AtomicUpdate(consumers_count_, [](auto old_value) {
329 return old_value == 1 ? kCreatedAndDead : old_value - 1;
330 });
331 if (new_consumers_count == kCreatedAndDead) {
332 producer_side_.StopBlockingOnPush();
333 }
334 }
335
336 void MarkProducerIsDead() {
337 const auto new_producers_count = utils::AtomicUpdate(producers_count_, [](auto old_value) {
338 return old_value == 1 ? kCreatedAndDead : old_value - 1;
339 });
340 if (new_producers_count == kCreatedAndDead) {
341 consumer_side_.StopBlockingOnPop();
342 }
343 }
344
345public: // TODO
346 /// @cond
347 bool NoMoreConsumers() const { return consumers_count_ == kCreatedAndDead; }
348
349 bool NoMoreProducers() const { return producers_count_ == kCreatedAndDead; }
350 /// @endcond
351
352private:
353 template <typename Token>
354 void DoPush(Token& token, T&& value) {
355 if constexpr (std::is_same_v<Token, moodycamel::ProducerToken>) {
356 static_assert(QueuePolicy::kIsMultipleProducer);
357 queue_.enqueue(token, std::move(value));
358 } else if constexpr (std::is_same_v<Token, MultiProducerToken>) {
359 static_assert(QueuePolicy::kIsMultipleProducer);
360 queue_.enqueue(std::move(value));
361 } else {
362 static_assert(std::is_same_v<Token, impl::NoToken>);
363 static_assert(!QueuePolicy::kIsMultipleProducer);
364 queue_.enqueue(single_producer_token_, std::move(value));
365 }
366
367 consumer_side_.OnElementPushed();
368 }
369
370 template <typename Token>
371 [[nodiscard]] bool DoPop(Token& token, T& value) {
372 bool success{};
373
374 if constexpr (std::is_same_v<Token, moodycamel::ConsumerToken>) {
375 static_assert(QueuePolicy::kIsMultipleProducer);
376 success = queue_.try_dequeue(token, value);
377 } else if constexpr (std::is_same_v<Token, impl::MultiToken>) {
378 static_assert(QueuePolicy::kIsMultipleProducer);
379 success = queue_.try_dequeue(value);
380 } else {
381 static_assert(std::is_same_v<Token, impl::NoToken>);
382 static_assert(!QueuePolicy::kIsMultipleProducer);
383 success = queue_.try_dequeue_from_producer(single_producer_token_, value);
384 }
385
386 if (success) {
387 producer_side_.OnElementPopped(QueuePolicy::GetElementSize(value));
388 return true;
389 }
390
391 return false;
392 }
393
394 moodycamel::ConcurrentQueue<T> queue_{1};
395 std::atomic<std::size_t> consumers_count_{0};
396 std::atomic<std::size_t> producers_count_{0};
397
398 SingleProducerToken single_producer_token_;
399
400 ProducerSide producer_side_;
401 ConsumerSide consumer_side_;
402
403 static constexpr std::size_t kCreatedAndDead = std::numeric_limits<std::size_t>::max();
404 static constexpr std::size_t kSemaphoreUnlockValue = std::numeric_limits<std::size_t>::max() / 2;
405};
406
407template <typename T, typename QueuePolicy>
408class GenericQueue<T, QueuePolicy>::SingleProducerSide final {
409public:
410 SingleProducerSide(GenericQueue& queue, std::size_t capacity)
411 : queue_(queue),
412 used_capacity_(0),
413 total_capacity_(capacity)
414 {}
415
416 // Blocks if there is a consumer to Pop the current value and task
417 // shouldn't cancel and queue if full
418 template <typename Token>
419 [[nodiscard]] bool Push(Token& token, T&& value, engine::Deadline deadline, std::size_t value_size) {
420 bool no_more_consumers = false;
421 const bool success = non_full_event_.WaitUntil(deadline, [&] {
422 if (queue_.NoMoreConsumers()) {
423 no_more_consumers = true;
424 return true;
425 }
426 if (DoPush(token, std::move(value), value_size)) {
427 return true;
428 }
429 return false;
430 });
431 return success && !no_more_consumers;
432 }
433
434 template <typename Token>
435 [[nodiscard]] bool PushNoblock(Token& token, T&& value, std::size_t value_size) {
436 return !queue_.NoMoreConsumers() && DoPush(token, std::move(value), value_size);
437 }
438
439 void OnElementPopped(std::size_t released_capacity) {
440 used_capacity_.fetch_sub(released_capacity);
441 non_full_event_.Send();
442 }
443
444 void StopBlockingOnPush() { non_full_event_.Send(); }
445
446 void ResumeBlockingOnPush() {}
447
448 void SetSoftMaxSize(std::size_t new_capacity) {
449 const auto old_capacity = total_capacity_.exchange(new_capacity);
450 if (new_capacity > old_capacity) {
451 non_full_event_.Send();
452 }
453 }
454
455 std::size_t GetSoftMaxSize() const noexcept { return total_capacity_.load(); }
456
457 std::size_t GetSizeApproximate() const noexcept { return used_capacity_.load(); }
458
459private:
460 template <typename Token>
461 [[nodiscard]] bool DoPush(Token& token, T&& value, std::size_t value_size) {
462 if (used_capacity_.load() + value_size > total_capacity_.load()) {
463 return false;
464 }
465
466 used_capacity_.fetch_add(value_size);
467 queue_.DoPush(token, std::move(value));
468 return true;
469 }
470
471 GenericQueue& queue_;
472 engine::SingleConsumerEvent non_full_event_;
473 std::atomic<std::size_t> used_capacity_;
474 std::atomic<std::size_t> total_capacity_;
475};
476
477template <typename T, typename QueuePolicy>
478class GenericQueue<T, QueuePolicy>::MultiProducerSide final {
479public:
480 MultiProducerSide(GenericQueue& queue, std::size_t capacity)
481 : queue_(queue),
482 remaining_capacity_(capacity),
483 remaining_capacity_control_(remaining_capacity_)
484 {}
485
486 // Blocks if there is a consumer to Pop the current value and task
487 // shouldn't cancel and queue if full
488 template <typename Token>
489 [[nodiscard]] bool Push(Token& token, T&& value, engine::Deadline deadline, std::size_t value_size) {
490 return remaining_capacity_.try_lock_shared_until_count(deadline, value_size) &&
491 DoPush(token, std::move(value), value_size);
492 }
493
494 template <typename Token>
495 [[nodiscard]] bool PushNoblock(Token& token, T&& value, std::size_t value_size) {
496 return remaining_capacity_.try_lock_shared_count(value_size) && DoPush(token, std::move(value), value_size);
497 }
498
499 void OnElementPopped(std::size_t value_size) { remaining_capacity_.unlock_shared_count(value_size); }
500
501 void StopBlockingOnPush() { remaining_capacity_control_.SetCapacityOverride(0); }
502
503 void ResumeBlockingOnPush() { remaining_capacity_control_.RemoveCapacityOverride(); }
504
505 void SetSoftMaxSize(std::size_t count) { remaining_capacity_control_.SetCapacity(count); }
506
507 std::size_t GetSizeApproximate() const noexcept { return remaining_capacity_.UsedApprox(); }
508
509 std::size_t GetSoftMaxSize() const noexcept { return remaining_capacity_control_.GetCapacity(); }
510
511private:
512 template <typename Token>
513 [[nodiscard]] bool DoPush(Token& token, T&& value, std::size_t value_size) {
514 if (queue_.NoMoreConsumers()) {
515 remaining_capacity_.unlock_shared_count(value_size);
516 return false;
517 }
518
519 queue_.DoPush(token, std::move(value));
520 return true;
521 }
522
523 GenericQueue& queue_;
524 engine::CancellableSemaphore remaining_capacity_;
525 concurrent::impl::SemaphoreCapacityControl remaining_capacity_control_;
526};
527
528template <typename T, typename QueuePolicy>
529class GenericQueue<T, QueuePolicy>::NoMaxSizeProducerSide final {
530public:
531 NoMaxSizeProducerSide(GenericQueue& queue, std::size_t max_size)
532 : queue_(queue)
533 {
534 SetSoftMaxSize(max_size);
535 }
536
537 template <typename Token>
538 [[nodiscard]] bool Push(Token& token, T&& value, engine::Deadline /*deadline*/, std::size_t /*value_size*/) {
539 if (queue_.NoMoreConsumers()) {
540 return false;
541 }
542
543 queue_.DoPush(token, std::move(value));
544 return true;
545 }
546
547 template <typename Token>
548 [[nodiscard]] bool PushNoblock(Token& token, T&& value, std::size_t value_size) {
549 return Push(token, std::move(value), engine::Deadline{}, value_size);
550 }
551
552 void OnElementPopped(std::size_t /*released_capacity*/) {}
553
554 void StopBlockingOnPush() {}
555
556 void ResumeBlockingOnPush() {}
557
558 void SetSoftMaxSize(std::size_t new_capacity) {
559 UINVARIANT(new_capacity == kUnbounded, "Cannot set max size for a queue with QueueMaxSizeMode::kNone");
560 }
561
562 std::size_t GetSoftMaxSize() const noexcept { return kUnbounded; }
563
564 std::size_t GetSizeApproximate() const noexcept { return queue_.queue_.size_approx(); }
565
566private:
567 GenericQueue& queue_;
568};
569
570template <typename T, typename QueuePolicy>
571class GenericQueue<T, QueuePolicy>::SingleConsumerSide final {
572public:
573 explicit SingleConsumerSide(GenericQueue& queue)
574 : queue_(queue),
575 element_count_(0)
576 {}
577
578 // Blocks only if queue is empty
579 template <typename Token>
580 [[nodiscard]] bool Pop(Token& token, T& value, engine::Deadline deadline) {
581 bool no_more_producers = false;
582 const bool success = nonempty_event_.WaitUntil(deadline, [&] {
583 if (DoPop(token, value)) {
584 return true;
585 }
586 if (queue_.NoMoreProducers()) {
587 // Producer might have pushed something in queue between .pop()
588 // and !producer_is_created_and_dead_ check. Check twice to avoid
589 // TOCTOU.
590 if (!DoPop(token, value)) {
591 no_more_producers = true;
592 }
593 return true;
594 }
595 return false;
596 });
597 return success && !no_more_producers;
598 }
599
600 template <typename Token>
601 [[nodiscard]] bool PopNoblock(Token& token, T& value) {
602 return DoPop(token, value);
603 }
604
605 void OnElementPushed() {
606 ++element_count_;
607 nonempty_event_.Send();
608 }
609
610 void StopBlockingOnPop() { nonempty_event_.Send(); }
611
612 void ResumeBlockingOnPop() {}
613
614 std::size_t GetElementCount() const { return element_count_; }
615
616private:
617 template <typename Token>
618 [[nodiscard]] bool DoPop(Token& token, T& value) {
619 if (queue_.DoPop(token, value)) {
620 --element_count_;
621 nonempty_event_.Reset();
622 return true;
623 }
624 return false;
625 }
626
627 GenericQueue& queue_;
628 engine::SingleConsumerEvent nonempty_event_;
629 std::atomic<std::size_t> element_count_;
630};
631
632template <typename T, typename QueuePolicy>
633class GenericQueue<T, QueuePolicy>::MultiConsumerSide final {
634public:
635 explicit MultiConsumerSide(GenericQueue& queue)
636 : queue_(queue),
637 element_count_(kUnbounded),
638 element_count_control_(element_count_)
639 {
640 const bool success = element_count_.try_lock_shared_count(kUnbounded);
641 UASSERT(success);
642 }
643
644 ~MultiConsumerSide() { element_count_.unlock_shared_count(kUnbounded); }
645
646 // Blocks only if queue is empty
647 template <typename Token>
648 [[nodiscard]] bool Pop(Token& token, T& value, engine::Deadline deadline) {
649 return element_count_.try_lock_shared_until(deadline) && DoPop(token, value);
650 }
651
652 template <typename Token>
653 [[nodiscard]] bool PopNoblock(Token& token, T& value) {
654 return element_count_.try_lock_shared() && DoPop(token, value);
655 }
656
657 void OnElementPushed() { element_count_.unlock_shared(); }
658
659 void StopBlockingOnPop() { element_count_control_.SetCapacityOverride(kUnbounded + kSemaphoreUnlockValue); }
660
661 void ResumeBlockingOnPop() { element_count_control_.RemoveCapacityOverride(); }
662
663 std::size_t GetElementCount() const {
664 const std::size_t cur_element_count = element_count_.RemainingApprox();
665 if (cur_element_count < kUnbounded) {
666 return cur_element_count;
667 } else if (cur_element_count <= kSemaphoreUnlockValue) {
668 return 0;
669 }
670 return cur_element_count - kSemaphoreUnlockValue;
671 }
672
673private:
674 template <typename Token>
675 [[nodiscard]] bool DoPop(Token& token, T& value) {
676 while (true) {
677 if (queue_.DoPop(token, value)) {
678 return true;
679 }
680 if (queue_.NoMoreProducers()) {
681 element_count_.unlock_shared();
682 return false;
683 }
684 // We can get here if another consumer steals our element, leaving another
685 // element in a Moodycamel sub-queue that we have already passed.
686 }
687 }
688
689 GenericQueue& queue_;
690 engine::CancellableSemaphore element_count_;
691 concurrent::impl::SemaphoreCapacityControl element_count_control_;
692};
693
694/// @ingroup userver_concurrency
695///
696/// @brief Non FIFO multiple producers multiple consumers queue.
697///
698/// Items from the same producer are always delivered in the production order.
699/// Items from different producers (or when using a `MultiProducer` token) are
700/// delivered in an unspecified order. In other words, FIFO order is maintained
701/// only within producers, but not between them. This may lead to increased peak
702/// latency of item processing.
703///
704/// In exchange for this, the queue has lower contention and increased
705/// throughput compared to a conventional lock-free queue.
706///
707/// @see @ref scripts/docs/en/userver/synchronization.md
708template <typename T>
709using NonFifoMpmcQueue = GenericQueue<T, impl::SimpleQueuePolicy<true, true>>;
710
711/// @ingroup userver_concurrency
712///
713/// @brief Non FIFO multiple producers single consumer queue.
714///
715/// @see concurrent::NonFifoMpmcQueue for the description of what NonFifo means.
716/// @see @ref scripts/docs/en/userver/synchronization.md
717template <typename T>
718using NonFifoMpscQueue = GenericQueue<T, impl::SimpleQueuePolicy<true, false>>;
719
720/// @ingroup userver_concurrency
721///
722/// @brief Single producer multiple consumers queue.
723///
724/// @see @ref scripts/docs/en/userver/synchronization.md
725template <typename T>
726using SpmcQueue = GenericQueue<T, impl::SimpleQueuePolicy<false, true>>;
727
728/// @ingroup userver_concurrency
729///
730/// @brief Single producer single consumer queue.
731///
732/// @see @ref scripts/docs/en/userver/synchronization.md
733template <typename T>
734using SpscQueue = GenericQueue<T, impl::SimpleQueuePolicy<false, false>>;
735
736namespace impl {
737
738/// @ingroup userver_concurrency
739///
740/// @brief Like @see NonFifoMpmcQueue, but does not support setting max size and is thus slightly faster.
741///
742/// @warning The current implementation suffers from performance issues in multi-producer scenario under heavy load.
743/// Precisely speaking, producers always take priority over consumers (breaking thread fairness), and consumers starve,
744/// leading to increased latencies to the point of OOM. Use other queue types (unbounded or not) for the time being.
745///
746/// @see @ref scripts/docs/en/userver/synchronization.md
747template <typename T>
748using UnfairUnboundedNonFifoMpmcQueue = GenericQueue<T, impl::NoMaxSizeQueuePolicy<true, true>>;
749
750} // namespace impl
751
752/// @ingroup userver_concurrency
753///
754/// @brief Like @see NonFifoMpscQueue, but does not support setting max size and is thus slightly faster.
755///
756/// @see @ref scripts/docs/en/userver/synchronization.md
757template <typename T>
758using UnboundedNonFifoMpscQueue = GenericQueue<T, impl::NoMaxSizeQueuePolicy<true, false>>;
759
760/// @ingroup userver_concurrency
761///
762/// @brief Like @see SpmcQueue, but does not support setting max size and is thus slightly faster.
763///
764/// @see @ref scripts/docs/en/userver/synchronization.md
765template <typename T>
766using UnboundedSpmcQueue = GenericQueue<T, impl::NoMaxSizeQueuePolicy<false, true>>;
767
768/// @ingroup userver_concurrency
769///
770/// @brief Like @see SpscQueue, but does not support setting max size and is thus slightly faster.
771///
772/// @see @ref scripts/docs/en/userver/synchronization.md
773template <typename T>
774using UnboundedSpscQueue = GenericQueue<T, impl::NoMaxSizeQueuePolicy<false, false>>;
775
776/// @ingroup userver_concurrency
777///
778/// @brief Single producer single consumer queue of std::string which is bounded by the total bytes inside the strings.
779///
780/// @see @ref scripts/docs/en/userver/synchronization.md
781using StringStreamQueue = GenericQueue<std::string, impl::ContainerQueuePolicy<false, false>>;
782
783} // namespace concurrent
784
785USERVER_NAMESPACE_END