10#include <moodycamel/concurrentqueue.h>
12#include <userver/concurrent/impl/semaphore_capacity_control.hpp>
13#include <userver/concurrent/queue_helpers.hpp>
14#include <userver/engine/deadline.hpp>
15#include <userver/engine/semaphore.hpp>
16#include <userver/engine/single_consumer_event.hpp>
17#include <userver/engine/task/cancel.hpp>
18#include <userver/utils/assert.hpp>
19#include <userver/utils/atomic.hpp>
21USERVER_NAMESPACE_BEGIN
77template <
bool MultipleProducer,
bool MultipleConsumer>
79 static constexpr bool kIsMultipleProducer{MultipleProducer};
80 static constexpr bool kIsMultipleConsumer{MultipleConsumer};
84template <
bool MultipleProducer,
bool MultipleConsumer>
86 static constexpr bool kIsMultipleProducer{MultipleProducer};
87 static constexpr bool kIsMultipleConsumer{MultipleConsumer};
91template <
bool MultipleProducer,
bool MultipleConsumer>
94 static std::size_t GetElementSize(
const T& value) {
95 return std::size(value);
98 static constexpr bool kIsMultipleProducer{MultipleProducer};
99 static constexpr bool kIsMultipleConsumer{MultipleConsumer};
121template <
typename T,
typename QueuePolicy>
122class GenericQueue
final :
public std::enable_shared_from_this<GenericQueue<T, QueuePolicy>> {
123 struct EmplaceEnabler
final {
125 explicit EmplaceEnabler() =
default;
130 "QueuePolicy must inherit from concurrent::DefaultQueuePolicy"
135 using ProducerToken =
136 std::conditional_t<QueuePolicy::kIsMultipleProducer, moodycamel::ProducerToken, impl::NoToken>;
137 using ConsumerToken =
138 std::conditional_t<QueuePolicy::kIsMultipleProducer, moodycamel::ConsumerToken, impl::NoToken>;
139 using MultiProducerToken = impl::MultiToken;
140 using MultiConsumerToken = std::conditional_t<QueuePolicy::kIsMultipleProducer, impl::MultiToken, impl::NoToken>;
142 using SingleProducerToken =
143 std::conditional_t<!QueuePolicy::kIsMultipleProducer, moodycamel::ProducerToken, impl::NoToken>;
145 friend class Producer<GenericQueue, ProducerToken, EmplaceEnabler>;
146 friend class Producer<GenericQueue, MultiProducerToken, EmplaceEnabler>;
147 friend class Consumer<GenericQueue, ConsumerToken, EmplaceEnabler>;
148 friend class Consumer<GenericQueue, MultiConsumerToken, EmplaceEnabler>;
153 using Producer =
concurrent::Producer<GenericQueue, ProducerToken, EmplaceEnabler>;
154 using Consumer =
concurrent::Consumer<GenericQueue, ConsumerToken, EmplaceEnabler>;
155 using MultiProducer =
concurrent::Producer<GenericQueue, MultiProducerToken, EmplaceEnabler>;
156 using MultiConsumer =
concurrent::Consumer<GenericQueue, MultiConsumerToken, EmplaceEnabler>;
158 static constexpr std::size_t kUnbounded = std::numeric_limits<std::size_t>::max() / 4;
162 explicit GenericQueue(std::size_t max_size, EmplaceEnabler )
164 single_producer_token_(queue_),
165 producer_side_(*
this, std::min(max_size, kUnbounded)),
166 consumer_side_(*
this)
170 UASSERT(consumers_count_ == kCreatedAndDead || !consumers_count_);
171 UASSERT(producers_count_ == kCreatedAndDead || !producers_count_);
173 if (producers_count_ == kCreatedAndDead) {
175 consumer_side_.ResumeBlockingOnPop();
180 ConsumerToken token{queue_};
181 while (consumer_side_.PopNoblock(token, value)) {
185 GenericQueue(GenericQueue&&) =
delete;
186 GenericQueue(
const GenericQueue&) =
delete;
187 GenericQueue& operator=(GenericQueue&&) =
delete;
188 GenericQueue& operator=(
const GenericQueue&) =
delete;
192 static std::shared_ptr<GenericQueue>
Create(std::size_t max_size = kUnbounded) {
193 return std::make_shared<GenericQueue>(max_size, EmplaceEnabler{});
204 return Producer(
this->shared_from_this(), EmplaceEnabler{});
217 static_assert(QueuePolicy::kIsMultipleProducer,
"Trying to obtain MultiProducer for a single-producer queue");
219 return MultiProducer(
this->shared_from_this(), EmplaceEnabler{});
230 return Consumer(
this->shared_from_this(), EmplaceEnabler{});
243 static_assert(QueuePolicy::kIsMultipleConsumer,
"Trying to obtain MultiConsumer for a single-consumer queue");
245 return MultiConsumer(
this->shared_from_this(), EmplaceEnabler{});
250 void SetSoftMaxSize(std::size_t max_size) { producer_side_.SetSoftMaxSize(std::min(max_size, kUnbounded)); }
259 class SingleProducerSide;
260 class MultiProducerSide;
261 class NoMaxSizeProducerSide;
262 class SingleConsumerSide;
263 class MultiConsumerSide;
267 using ProducerSide = std::conditional_t<
269 NoMaxSizeProducerSide,
271 QueuePolicy::kIsMultipleProducer,
273 SingleProducerSide>>;
277 using ConsumerSide = std::conditional_t<QueuePolicy::kIsMultipleConsumer, MultiConsumerSide, SingleConsumerSide>;
279 template <
typename Token>
280 [[nodiscard]]
bool Push(Token& token, T&& value,
engine::Deadline deadline) {
281 const std::size_t value_size = QueuePolicy::GetElementSize(value);
282 return producer_side_.Push(token, std::move(value), deadline, value_size);
285 template <
typename Token>
286 [[nodiscard]]
bool PushNoblock(Token& token, T&& value) {
287 const std::size_t value_size = QueuePolicy::GetElementSize(value);
288 return producer_side_.PushNoblock(token, std::move(value), value_size);
291 template <
typename Token>
292 [[nodiscard]]
bool Pop(Token& token, T& value,
engine::Deadline deadline) {
293 return consumer_side_.Pop(token, value, deadline);
296 template <
typename Token>
297 [[nodiscard]]
bool PopNoblock(Token& token, T& value) {
298 return consumer_side_.PopNoblock(token, value);
301 void PrepareProducer() {
302 std::size_t old_producers_count{};
303 utils::AtomicUpdate(producers_count_, [&](
auto old_value) {
304 UINVARIANT(QueuePolicy::kIsMultipleProducer || old_value != 1,
"Incorrect usage of queue producers");
305 old_producers_count = old_value;
306 return old_value == kCreatedAndDead ? 1 : old_value + 1;
309 if (old_producers_count == kCreatedAndDead) {
310 consumer_side_.ResumeBlockingOnPop();
314 void PrepareConsumer() {
315 std::size_t old_consumers_count{};
316 utils::AtomicUpdate(consumers_count_, [&](
auto old_value) {
317 UINVARIANT(QueuePolicy::kIsMultipleConsumer || old_value != 1,
"Incorrect usage of queue consumers");
318 old_consumers_count = old_value;
319 return old_value == kCreatedAndDead ? 1 : old_value + 1;
322 if (old_consumers_count == kCreatedAndDead) {
323 producer_side_.ResumeBlockingOnPush();
327 void MarkConsumerIsDead() {
328 const auto new_consumers_count = utils::AtomicUpdate(consumers_count_, [](
auto old_value) {
329 return old_value == 1 ? kCreatedAndDead : old_value - 1;
331 if (new_consumers_count == kCreatedAndDead) {
332 producer_side_.StopBlockingOnPush();
336 void MarkProducerIsDead() {
337 const auto new_producers_count = utils::AtomicUpdate(producers_count_, [](
auto old_value) {
338 return old_value == 1 ? kCreatedAndDead : old_value - 1;
340 if (new_producers_count == kCreatedAndDead) {
341 consumer_side_.StopBlockingOnPop();
347 bool NoMoreConsumers()
const {
return consumers_count_ == kCreatedAndDead; }
349 bool NoMoreProducers()
const {
return producers_count_ == kCreatedAndDead; }
353 template <
typename Token>
354 void DoPush(Token& token, T&& value) {
355 if constexpr (std::is_same_v<Token, moodycamel::ProducerToken>) {
356 static_assert(QueuePolicy::kIsMultipleProducer);
357 queue_.enqueue(token, std::move(value));
358 }
else if constexpr (std::is_same_v<Token, MultiProducerToken>) {
359 static_assert(QueuePolicy::kIsMultipleProducer);
360 queue_.enqueue(std::move(value));
362 static_assert(std::is_same_v<Token, impl::NoToken>);
363 static_assert(!QueuePolicy::kIsMultipleProducer);
364 queue_.enqueue(single_producer_token_, std::move(value));
367 consumer_side_.OnElementPushed();
370 template <
typename Token>
371 [[nodiscard]]
bool DoPop(Token& token, T& value) {
374 if constexpr (std::is_same_v<Token, moodycamel::ConsumerToken>) {
375 static_assert(QueuePolicy::kIsMultipleProducer);
376 success = queue_.try_dequeue(token, value);
377 }
else if constexpr (std::is_same_v<Token, impl::MultiToken>) {
378 static_assert(QueuePolicy::kIsMultipleProducer);
379 success = queue_.try_dequeue(value);
381 static_assert(std::is_same_v<Token, impl::NoToken>);
382 static_assert(!QueuePolicy::kIsMultipleProducer);
383 success = queue_.try_dequeue_from_producer(single_producer_token_, value);
387 producer_side_.OnElementPopped(QueuePolicy::GetElementSize(value));
394 moodycamel::ConcurrentQueue<T> queue_{1};
395 std::atomic<std::size_t> consumers_count_{0};
396 std::atomic<std::size_t> producers_count_{0};
398 SingleProducerToken single_producer_token_;
400 ProducerSide producer_side_;
401 ConsumerSide consumer_side_;
403 static constexpr std::size_t kCreatedAndDead = std::numeric_limits<std::size_t>::max();
404 static constexpr std::size_t kSemaphoreUnlockValue = std::numeric_limits<std::size_t>::max() / 2;
407template <
typename T,
typename QueuePolicy>
408class GenericQueue<T, QueuePolicy>::SingleProducerSide
final {
410 SingleProducerSide(GenericQueue& queue, std::size_t capacity)
413 total_capacity_(capacity)
418 template <
typename Token>
419 [[nodiscard]]
bool Push(Token& token, T&& value,
engine::Deadline deadline, std::size_t value_size) {
420 bool no_more_consumers =
false;
421 const bool success = non_full_event_.WaitUntil(deadline, [&] {
422 if (queue_.NoMoreConsumers()) {
423 no_more_consumers =
true;
426 if (DoPush(token, std::move(value), value_size)) {
431 return success && !no_more_consumers;
434 template <
typename Token>
435 [[nodiscard]]
bool PushNoblock(Token& token, T&& value, std::size_t value_size) {
436 return !queue_.NoMoreConsumers() && DoPush(token, std::move(value), value_size);
439 void OnElementPopped(std::size_t released_capacity) {
440 used_capacity_.fetch_sub(released_capacity);
444 void StopBlockingOnPush() { non_full_event_
.Send(); }
446 void ResumeBlockingOnPush() {}
448 void SetSoftMaxSize(std::size_t new_capacity) {
449 const auto old_capacity = total_capacity_.exchange(new_capacity);
450 if (new_capacity > old_capacity) {
455 std::size_t GetSoftMaxSize()
const noexcept {
return total_capacity_.load(); }
457 std::size_t GetSizeApproximate()
const noexcept {
return used_capacity_.load(); }
460 template <
typename Token>
461 [[nodiscard]]
bool DoPush(Token& token, T&& value, std::size_t value_size) {
462 if (used_capacity_.load() + value_size > total_capacity_.load()) {
466 used_capacity_.fetch_add(value_size);
467 queue_.DoPush(token, std::move(value));
471 GenericQueue& queue_;
472 engine::SingleConsumerEvent non_full_event_;
473 std::atomic<std::size_t> used_capacity_;
474 std::atomic<std::size_t> total_capacity_;
477template <
typename T,
typename QueuePolicy>
478class GenericQueue<T, QueuePolicy>::MultiProducerSide
final {
480 MultiProducerSide(GenericQueue& queue, std::size_t capacity)
482 remaining_capacity_
(capacity
),
483 remaining_capacity_control_(remaining_capacity_)
488 template <
typename Token>
489 [[nodiscard]]
bool Push(Token& token, T&& value,
engine::Deadline deadline, std::size_t value_size) {
490 return remaining_capacity_.try_lock_shared_until_count(deadline, value_size) &&
491 DoPush(token, std::move(value), value_size);
494 template <
typename Token>
495 [[nodiscard]]
bool PushNoblock(Token& token, T&& value, std::size_t value_size) {
496 return remaining_capacity_.try_lock_shared_count(value_size) && DoPush(token, std::move(value), value_size);
499 void OnElementPopped(std::size_t value_size) { remaining_capacity_.unlock_shared_count(value_size); }
501 void StopBlockingOnPush() { remaining_capacity_control_.SetCapacityOverride(0); }
503 void ResumeBlockingOnPush() { remaining_capacity_control_.RemoveCapacityOverride(); }
505 void SetSoftMaxSize(std::size_t count) { remaining_capacity_control_.SetCapacity(count); }
507 std::size_t GetSizeApproximate()
const noexcept {
return remaining_capacity_
.UsedApprox(); }
509 std::size_t GetSoftMaxSize()
const noexcept {
return remaining_capacity_control_.GetCapacity(); }
512 template <
typename Token>
513 [[nodiscard]]
bool DoPush(Token& token, T&& value, std::size_t value_size) {
514 if (queue_.NoMoreConsumers()) {
515 remaining_capacity_.unlock_shared_count(value_size);
519 queue_.DoPush(token, std::move(value));
523 GenericQueue& queue_;
524 engine::CancellableSemaphore remaining_capacity_;
525 concurrent::impl::SemaphoreCapacityControl remaining_capacity_control_;
528template <
typename T,
typename QueuePolicy>
529class GenericQueue<T, QueuePolicy>::NoMaxSizeProducerSide
final {
531 NoMaxSizeProducerSide(GenericQueue& queue, std::size_t max_size)
534 SetSoftMaxSize(max_size);
537 template <
typename Token>
538 [[nodiscard]]
bool Push(Token& token, T&& value,
engine::Deadline , std::size_t ) {
539 if (queue_.NoMoreConsumers()) {
543 queue_.DoPush(token, std::move(value));
547 template <
typename Token>
548 [[nodiscard]]
bool PushNoblock(Token& token, T&& value, std::size_t value_size) {
549 return Push(token, std::move(value),
engine::Deadline
{}, value_size);
552 void OnElementPopped(std::size_t ) {}
554 void StopBlockingOnPush() {}
556 void ResumeBlockingOnPush() {}
558 void SetSoftMaxSize(std::size_t new_capacity) {
559 UINVARIANT(new_capacity == kUnbounded,
"Cannot set max size for a queue with QueueMaxSizeMode::kNone");
562 std::size_t GetSoftMaxSize()
const noexcept {
return kUnbounded; }
564 std::size_t GetSizeApproximate()
const noexcept {
return queue_.queue_.size_approx(); }
567 GenericQueue& queue_;
570template <
typename T,
typename QueuePolicy>
571class GenericQueue<T, QueuePolicy>::SingleConsumerSide
final {
573 explicit SingleConsumerSide(GenericQueue& queue)
579 template <
typename Token>
580 [[nodiscard]]
bool Pop(Token& token, T& value,
engine::Deadline deadline) {
581 bool no_more_producers =
false;
582 const bool success = nonempty_event_.WaitUntil(deadline, [&] {
583 if (DoPop(token, value)) {
586 if (queue_.NoMoreProducers()) {
590 if (!DoPop(token, value)) {
591 no_more_producers =
true;
597 return success && !no_more_producers;
600 template <
typename Token>
601 [[nodiscard]]
bool PopNoblock(Token& token, T& value) {
602 return DoPop(token, value);
605 void OnElementPushed() {
610 void StopBlockingOnPop() { nonempty_event_
.Send(); }
612 void ResumeBlockingOnPop() {}
614 std::size_t GetElementCount()
const {
return element_count_; }
617 template <
typename Token>
618 [[nodiscard]]
bool DoPop(Token& token, T& value) {
619 if (queue_.DoPop(token, value)) {
627 GenericQueue& queue_;
628 engine::SingleConsumerEvent nonempty_event_;
629 std::atomic<std::size_t> element_count_;
632template <
typename T,
typename QueuePolicy>
633class GenericQueue<T, QueuePolicy>::MultiConsumerSide
final {
635 explicit MultiConsumerSide(GenericQueue& queue)
637 element_count_
(kUnbounded
),
638 element_count_control_(element_count_)
640 const bool success = element_count_.try_lock_shared_count(kUnbounded);
644 ~MultiConsumerSide() { element_count_.unlock_shared_count(kUnbounded); }
647 template <
typename Token>
648 [[nodiscard]]
bool Pop(Token& token, T& value,
engine::Deadline deadline) {
649 return element_count_.try_lock_shared_until(deadline) && DoPop(token, value);
652 template <
typename Token>
653 [[nodiscard]]
bool PopNoblock(Token& token, T& value) {
654 return element_count_.try_lock_shared() && DoPop(token, value);
659 void StopBlockingOnPop() { element_count_control_.SetCapacityOverride(kUnbounded + kSemaphoreUnlockValue); }
661 void ResumeBlockingOnPop() { element_count_control_.RemoveCapacityOverride(); }
663 std::size_t GetElementCount()
const {
665 if (cur_element_count < kUnbounded) {
666 return cur_element_count;
667 }
else if (cur_element_count <= kSemaphoreUnlockValue) {
670 return cur_element_count - kSemaphoreUnlockValue;
674 template <
typename Token>
675 [[nodiscard]]
bool DoPop(Token& token, T& value) {
677 if (queue_.DoPop(token, value)) {
680 if (queue_.NoMoreProducers()) {
689 GenericQueue& queue_;
690 engine::CancellableSemaphore element_count_;
691 concurrent::impl::SemaphoreCapacityControl element_count_control_;
709using NonFifoMpmcQueue = GenericQueue<T, impl::SimpleQueuePolicy<
true,
true>>;
718using NonFifoMpscQueue = GenericQueue<T, impl::SimpleQueuePolicy<
true,
false>>;
726using SpmcQueue = GenericQueue<T, impl::SimpleQueuePolicy<
false,
true>>;
734using SpscQueue = GenericQueue<T, impl::SimpleQueuePolicy<
false,
false>>;
748using UnfairUnboundedNonFifoMpmcQueue = GenericQueue<T, impl::NoMaxSizeQueuePolicy<
true,
true>>;
758using UnboundedNonFifoMpscQueue = GenericQueue<T, impl::NoMaxSizeQueuePolicy<
true,
false>>;
766using UnboundedSpmcQueue = GenericQueue<T, impl::NoMaxSizeQueuePolicy<
false,
true>>;
774using UnboundedSpscQueue = GenericQueue<T, impl::NoMaxSizeQueuePolicy<
false,
false>>;
781using StringStreamQueue = GenericQueue<std::string, impl::ContainerQueuePolicy<
false,
false>>;