10#include <moodycamel/concurrentqueue.h>
12#include <userver/concurrent/impl/semaphore_capacity_control.hpp>
13#include <userver/concurrent/queue_helpers.hpp>
14#include <userver/engine/deadline.hpp>
15#include <userver/engine/semaphore.hpp>
16#include <userver/engine/single_consumer_event.hpp>
17#include <userver/engine/task/cancel.hpp>
18#include <userver/utils/assert.hpp>
19#include <userver/utils/atomic.hpp>
21USERVER_NAMESPACE_BEGIN
77template <
bool MultipleProducer,
bool MultipleConsumer>
79 static constexpr bool kIsMultipleProducer{MultipleProducer};
80 static constexpr bool kIsMultipleConsumer{MultipleConsumer};
84template <
bool MultipleProducer,
bool MultipleConsumer>
86 static constexpr bool kIsMultipleProducer{MultipleProducer};
87 static constexpr bool kIsMultipleConsumer{MultipleConsumer};
91template <
bool MultipleProducer,
bool MultipleConsumer>
94 static std::size_t GetElementSize(
const T& value) {
95 return std::size(value);
98 static constexpr bool kIsMultipleProducer{MultipleProducer};
99 static constexpr bool kIsMultipleConsumer{MultipleConsumer};
121template <
typename T,
typename QueuePolicy>
122class GenericQueue
final :
public std::enable_shared_from_this<GenericQueue<T, QueuePolicy>> {
123 struct EmplaceEnabler
final {
125 explicit EmplaceEnabler() =
default;
130 "QueuePolicy must inherit from concurrent::DefaultQueuePolicy"
135 using ProducerToken =
136 std::conditional_t<QueuePolicy::kIsMultipleProducer, moodycamel::ProducerToken, impl::NoToken>;
137 using ConsumerToken =
138 std::conditional_t<QueuePolicy::kIsMultipleProducer, moodycamel::ConsumerToken, impl::NoToken>;
139 using MultiProducerToken = impl::MultiToken;
140 using MultiConsumerToken = std::conditional_t<QueuePolicy::kIsMultipleProducer, impl::MultiToken, impl::NoToken>;
142 using SingleProducerToken =
143 std::conditional_t<!QueuePolicy::kIsMultipleProducer, moodycamel::ProducerToken, impl::NoToken>;
145 friend class Producer<GenericQueue, ProducerToken, EmplaceEnabler>;
146 friend class Producer<GenericQueue, MultiProducerToken, EmplaceEnabler>;
147 friend class Consumer<GenericQueue, ConsumerToken, EmplaceEnabler>;
148 friend class Consumer<GenericQueue, MultiConsumerToken, EmplaceEnabler>;
153 using Producer = concurrent::Producer<GenericQueue, ProducerToken, EmplaceEnabler>;
154 using Consumer = concurrent::Consumer<GenericQueue, ConsumerToken, EmplaceEnabler>;
155 using MultiProducer =
concurrent::Producer<GenericQueue, MultiProducerToken, EmplaceEnabler>;
156 using MultiConsumer = concurrent::Consumer<GenericQueue, MultiConsumerToken, EmplaceEnabler>;
158 static constexpr std::size_t kUnbounded = std::numeric_limits<std::size_t>::max() / 4;
162 explicit GenericQueue(std::size_t max_size, EmplaceEnabler )
164 single_producer_token_(queue_),
165 producer_side_(*
this, std::min(max_size, kUnbounded)),
166 consumer_side_(*
this) {}
169 UASSERT(consumers_count_ == kCreatedAndDead || !consumers_count_);
170 UASSERT(producers_count_ == kCreatedAndDead || !producers_count_);
172 if (producers_count_ == kCreatedAndDead) {
174 consumer_side_.ResumeBlockingOnPop();
179 ConsumerToken token{queue_};
180 while (consumer_side_.PopNoblock(token, value)) {
184 GenericQueue(GenericQueue&&) =
delete;
185 GenericQueue(
const GenericQueue&) =
delete;
186 GenericQueue& operator=(GenericQueue&&) =
delete;
187 GenericQueue& operator=(
const GenericQueue&) =
delete;
191 static std::shared_ptr<GenericQueue>
Create(std::size_t max_size = kUnbounded) {
192 return std::make_shared<GenericQueue>(max_size, EmplaceEnabler{});
203 return Producer(
this->shared_from_this(), EmplaceEnabler{});
216 static_assert(QueuePolicy::kIsMultipleProducer,
"Trying to obtain MultiProducer for a single-producer queue");
218 return MultiProducer(
this->shared_from_this(), EmplaceEnabler{});
229 return Consumer(
this->shared_from_this(), EmplaceEnabler{});
242 static_assert(QueuePolicy::kIsMultipleConsumer,
"Trying to obtain MultiConsumer for a single-consumer queue");
244 return MultiConsumer(
this->shared_from_this(), EmplaceEnabler{});
249 void SetSoftMaxSize(std::size_t max_size) { producer_side_.SetSoftMaxSize(std::min(max_size, kUnbounded)); }
258 class SingleProducerSide;
259 class MultiProducerSide;
260 class NoMaxSizeProducerSide;
261 class SingleConsumerSide;
262 class MultiConsumerSide;
266 using ProducerSide = std::conditional_t<
267 kMaxSizeMode == QueueMaxSizeMode::kNone,
268 NoMaxSizeProducerSide,
270 QueuePolicy::kIsMultipleProducer,
272 SingleProducerSide>>;
276 using ConsumerSide = std::conditional_t<QueuePolicy::kIsMultipleConsumer, MultiConsumerSide, SingleConsumerSide>;
278 template <
typename Token>
279 [[nodiscard]]
bool Push(Token& token, T&& value, engine::Deadline deadline) {
280 const std::size_t value_size = QueuePolicy::GetElementSize(value);
282 return producer_side_.Push(token, std::move(value), deadline, value_size);
285 template <
typename Token>
286 [[nodiscard]]
bool PushNoblock(Token& token, T&& value) {
287 const std::size_t value_size = QueuePolicy::GetElementSize(value);
289 return producer_side_.PushNoblock(token, std::move(value), value_size);
292 template <
typename Token>
293 [[nodiscard]]
bool Pop(Token& token, T& value, engine::Deadline deadline) {
294 return consumer_side_.Pop(token, value, deadline);
297 template <
typename Token>
298 [[nodiscard]]
bool PopNoblock(Token& token, T& value) {
299 return consumer_side_.PopNoblock(token, value);
302 void PrepareProducer() {
303 std::size_t old_producers_count{};
304 utils::AtomicUpdate(producers_count_, [&](
auto old_value) {
305 UINVARIANT(QueuePolicy::kIsMultipleProducer || old_value != 1,
"Incorrect usage of queue producers");
306 old_producers_count = old_value;
307 return old_value == kCreatedAndDead ? 1 : old_value + 1;
310 if (old_producers_count == kCreatedAndDead) {
311 consumer_side_.ResumeBlockingOnPop();
315 void PrepareConsumer() {
316 std::size_t old_consumers_count{};
317 utils::AtomicUpdate(consumers_count_, [&](
auto old_value) {
318 UINVARIANT(QueuePolicy::kIsMultipleConsumer || old_value != 1,
"Incorrect usage of queue consumers");
319 old_consumers_count = old_value;
320 return old_value == kCreatedAndDead ? 1 : old_value + 1;
323 if (old_consumers_count == kCreatedAndDead) {
324 producer_side_.ResumeBlockingOnPush();
328 void MarkConsumerIsDead() {
329 const auto new_consumers_count = utils::AtomicUpdate(consumers_count_, [](
auto old_value) {
330 return old_value == 1 ? kCreatedAndDead : old_value - 1;
332 if (new_consumers_count == kCreatedAndDead) {
333 producer_side_.StopBlockingOnPush();
337 void MarkProducerIsDead() {
338 const auto new_producers_count = utils::AtomicUpdate(producers_count_, [](
auto old_value) {
339 return old_value == 1 ? kCreatedAndDead : old_value - 1;
341 if (new_producers_count == kCreatedAndDead) {
342 consumer_side_.StopBlockingOnPop();
348 bool NoMoreConsumers()
const {
return consumers_count_ == kCreatedAndDead; }
350 bool NoMoreProducers()
const {
return producers_count_ == kCreatedAndDead; }
354 template <
typename Token>
355 void DoPush(Token& token, T&& value) {
356 if constexpr (std::is_same_v<Token, moodycamel::ProducerToken>) {
357 static_assert(QueuePolicy::kIsMultipleProducer);
358 queue_.enqueue(token, std::move(value));
359 }
else if constexpr (std::is_same_v<Token, MultiProducerToken>) {
360 static_assert(QueuePolicy::kIsMultipleProducer);
361 queue_.enqueue(std::move(value));
363 static_assert(std::is_same_v<Token, impl::NoToken>);
364 static_assert(!QueuePolicy::kIsMultipleProducer);
365 queue_.enqueue(single_producer_token_, std::move(value));
368 consumer_side_.OnElementPushed();
371 template <
typename Token>
372 [[nodiscard]]
bool DoPop(Token& token, T& value) {
375 if constexpr (std::is_same_v<Token, moodycamel::ConsumerToken>) {
376 static_assert(QueuePolicy::kIsMultipleProducer);
377 success = queue_.try_dequeue(token, value);
378 }
else if constexpr (std::is_same_v<Token, impl::MultiToken>) {
379 static_assert(QueuePolicy::kIsMultipleProducer);
380 success = queue_.try_dequeue(value);
382 static_assert(std::is_same_v<Token, impl::NoToken>);
383 static_assert(!QueuePolicy::kIsMultipleProducer);
384 success = queue_.try_dequeue_from_producer(single_producer_token_, value);
388 producer_side_.OnElementPopped(QueuePolicy::GetElementSize(value));
395 moodycamel::ConcurrentQueue<T> queue_{1};
396 std::atomic<std::size_t> consumers_count_{0};
397 std::atomic<std::size_t> producers_count_{0};
399 SingleProducerToken single_producer_token_;
401 ProducerSide producer_side_;
402 ConsumerSide consumer_side_;
404 static constexpr std::size_t kCreatedAndDead = std::numeric_limits<std::size_t>::max();
405 static constexpr std::size_t kSemaphoreUnlockValue = std::numeric_limits<std::size_t>::max() / 2;
408template <
typename T,
typename QueuePolicy>
409class GenericQueue<T, QueuePolicy>::SingleProducerSide
final {
411 SingleProducerSide(GenericQueue& queue, std::size_t capacity)
412 : queue_(queue), used_capacity_(0), total_capacity_(capacity) {}
416 template <
typename Token>
417 [[nodiscard]]
bool Push(Token& token, T&& value, engine::Deadline deadline, std::size_t value_size) {
418 bool no_more_consumers =
false;
419 const bool success = non_full_event_.WaitUntil(deadline, [&] {
420 if (queue_.NoMoreConsumers()) {
421 no_more_consumers =
true;
424 if (DoPush(token, std::move(value), value_size)) {
429 return success && !no_more_consumers;
432 template <
typename Token>
433 [[nodiscard]]
bool PushNoblock(Token& token, T&& value, std::size_t value_size) {
434 return !queue_.NoMoreConsumers() && DoPush(token, std::move(value), value_size);
437 void OnElementPopped(std::size_t released_capacity) {
438 used_capacity_.fetch_sub(released_capacity);
439 non_full_event_.Send();
442 void StopBlockingOnPush() { non_full_event_.Send(); }
444 void ResumeBlockingOnPush() {}
446 void SetSoftMaxSize(std::size_t new_capacity) {
447 const auto old_capacity = total_capacity_.exchange(new_capacity);
448 if (new_capacity > old_capacity) non_full_event_.Send();
451 std::size_t GetSoftMaxSize()
const noexcept {
return total_capacity_.load(); }
453 std::size_t GetSizeApproximate()
const noexcept {
return used_capacity_.load(); }
456 template <
typename Token>
457 [[nodiscard]]
bool DoPush(Token& token, T&& value, std::size_t value_size) {
458 if (used_capacity_.load() + value_size > total_capacity_.load()) {
462 used_capacity_.fetch_add(value_size);
463 queue_.DoPush(token, std::move(value));
467 GenericQueue& queue_;
468 engine::SingleConsumerEvent non_full_event_;
469 std::atomic<std::size_t> used_capacity_;
470 std::atomic<std::size_t> total_capacity_;
473template <
typename T,
typename QueuePolicy>
474class GenericQueue<T, QueuePolicy>::MultiProducerSide
final {
476 MultiProducerSide(GenericQueue& queue, std::size_t capacity)
477 : queue_(queue), remaining_capacity_(capacity), remaining_capacity_control_(remaining_capacity_) {}
481 template <
typename Token>
482 [[nodiscard]]
bool Push(Token& token, T&& value, engine::Deadline deadline, std::size_t value_size) {
483 return remaining_capacity_.try_lock_shared_until_count(deadline, value_size) &&
484 DoPush(token, std::move(value), value_size);
487 template <
typename Token>
488 [[nodiscard]]
bool PushNoblock(Token& token, T&& value, std::size_t value_size) {
489 return remaining_capacity_.try_lock_shared_count(value_size) && DoPush(token, std::move(value), value_size);
492 void OnElementPopped(std::size_t value_size) { remaining_capacity_.unlock_shared_count(value_size); }
494 void StopBlockingOnPush() { remaining_capacity_control_.SetCapacityOverride(0); }
496 void ResumeBlockingOnPush() { remaining_capacity_control_.RemoveCapacityOverride(); }
498 void SetSoftMaxSize(std::size_t count) { remaining_capacity_control_.SetCapacity(count); }
500 std::size_t GetSizeApproximate()
const noexcept {
return remaining_capacity_.UsedApprox(); }
502 std::size_t GetSoftMaxSize()
const noexcept {
return remaining_capacity_control_.GetCapacity(); }
505 template <
typename Token>
506 [[nodiscard]]
bool DoPush(Token& token, T&& value, std::size_t value_size) {
507 if (queue_.NoMoreConsumers()) {
508 remaining_capacity_.unlock_shared_count(value_size);
512 queue_.DoPush(token, std::move(value));
516 GenericQueue& queue_;
517 engine::CancellableSemaphore remaining_capacity_;
518 concurrent::impl::SemaphoreCapacityControl remaining_capacity_control_;
521template <
typename T,
typename QueuePolicy>
522class GenericQueue<T, QueuePolicy>::NoMaxSizeProducerSide
final {
524 NoMaxSizeProducerSide(GenericQueue& queue, std::size_t max_size) : queue_(queue) { SetSoftMaxSize(max_size); }
526 template <
typename Token>
527 [[nodiscard]]
bool Push(Token& token, T&& value, engine::Deadline , std::size_t ) {
528 if (queue_.NoMoreConsumers()) {
532 queue_.DoPush(token, std::move(value));
536 template <
typename Token>
537 [[nodiscard]]
bool PushNoblock(Token& token, T&& value, std::size_t value_size) {
538 return Push(token, std::move(value), engine::Deadline{}, value_size);
541 void OnElementPopped(std::size_t ) {}
543 void StopBlockingOnPush() {}
545 void ResumeBlockingOnPush() {}
547 void SetSoftMaxSize(std::size_t new_capacity) {
548 UINVARIANT(new_capacity == kUnbounded,
"Cannot set max size for a queue with QueueMaxSizeMode::kNone");
551 std::size_t GetSoftMaxSize()
const noexcept {
return kUnbounded; }
553 std::size_t GetSizeApproximate()
const noexcept {
return queue_.queue_.size_approx(); }
556 GenericQueue& queue_;
559template <
typename T,
typename QueuePolicy>
560class GenericQueue<T, QueuePolicy>::SingleConsumerSide
final {
562 explicit SingleConsumerSide(GenericQueue& queue) : queue_(queue), element_count_(0) {}
565 template <
typename Token>
566 [[nodiscard]]
bool Pop(Token& token, T& value, engine::Deadline deadline) {
567 bool no_more_producers =
false;
568 const bool success = nonempty_event_.WaitUntil(deadline, [&] {
569 if (DoPop(token, value)) {
572 if (queue_.NoMoreProducers()) {
576 if (!DoPop(token, value)) {
577 no_more_producers =
true;
583 return success && !no_more_producers;
586 template <
typename Token>
587 [[nodiscard]]
bool PopNoblock(Token& token, T& value) {
588 return DoPop(token, value);
591 void OnElementPushed() {
593 nonempty_event_.Send();
596 void StopBlockingOnPop() { nonempty_event_.Send(); }
598 void ResumeBlockingOnPop() {}
600 std::size_t GetElementCount()
const {
return element_count_; }
603 template <
typename Token>
604 [[nodiscard]]
bool DoPop(Token& token, T& value) {
605 if (queue_.DoPop(token, value)) {
607 nonempty_event_.Reset();
613 GenericQueue& queue_;
614 engine::SingleConsumerEvent nonempty_event_;
615 std::atomic<std::size_t> element_count_;
618template <
typename T,
typename QueuePolicy>
619class GenericQueue<T, QueuePolicy>::MultiConsumerSide
final {
621 explicit MultiConsumerSide(GenericQueue& queue)
622 : queue_(queue), element_count_(kUnbounded), element_count_control_(element_count_) {
623 const bool success = element_count_.try_lock_shared_count(kUnbounded);
627 ~MultiConsumerSide() { element_count_.unlock_shared_count(kUnbounded); }
630 template <
typename Token>
631 [[nodiscard]]
bool Pop(Token& token, T& value, engine::Deadline deadline) {
632 return element_count_.try_lock_shared_until(deadline) && DoPop(token, value);
635 template <
typename Token>
636 [[nodiscard]]
bool PopNoblock(Token& token, T& value) {
637 return element_count_.try_lock_shared() && DoPop(token, value);
640 void OnElementPushed() { element_count_.unlock_shared(); }
642 void StopBlockingOnPop() { element_count_control_.SetCapacityOverride(kUnbounded + kSemaphoreUnlockValue); }
644 void ResumeBlockingOnPop() { element_count_control_.RemoveCapacityOverride(); }
646 std::size_t GetElementCount()
const {
647 const std::size_t cur_element_count = element_count_.RemainingApprox();
648 if (cur_element_count < kUnbounded) {
649 return cur_element_count;
650 }
else if (cur_element_count <= kSemaphoreUnlockValue) {
653 return cur_element_count - kSemaphoreUnlockValue;
657 template <
typename Token>
658 [[nodiscard]]
bool DoPop(Token& token, T& value) {
660 if (queue_.DoPop(token, value)) {
663 if (queue_.NoMoreProducers()) {
664 element_count_.unlock_shared();
672 GenericQueue& queue_;
673 engine::CancellableSemaphore element_count_;
674 concurrent::impl::SemaphoreCapacityControl element_count_control_;
692using NonFifoMpmcQueue = GenericQueue<T, impl::SimpleQueuePolicy<
true,
true>>;
701using NonFifoMpscQueue = GenericQueue<T, impl::SimpleQueuePolicy<
true,
false>>;
709using SpmcQueue = GenericQueue<T, impl::SimpleQueuePolicy<
false,
true>>;
717using SpscQueue = GenericQueue<T, impl::SimpleQueuePolicy<
false,
false>>;
731using UnfairUnboundedNonFifoMpmcQueue = GenericQueue<T, impl::NoMaxSizeQueuePolicy<
true,
true>>;
741using UnboundedNonFifoMpscQueue = GenericQueue<T, impl::NoMaxSizeQueuePolicy<
true,
false>>;
749using UnboundedSpmcQueue = GenericQueue<T, impl::NoMaxSizeQueuePolicy<
false,
true>>;
757using UnboundedSpscQueue = GenericQueue<T, impl::NoMaxSizeQueuePolicy<
false,
false>>;
764using StringStreamQueue = GenericQueue<std::string, impl::ContainerQueuePolicy<
false,
false>>;