7#include <moodycamel/concurrentqueue.h>
9#include <userver/concurrent/impl/semaphore_capacity_control.hpp>
10#include <userver/concurrent/queue_helpers.hpp>
11#include <userver/engine/deadline.hpp>
12#include <userver/engine/semaphore.hpp>
13#include <userver/engine/single_consumer_event.hpp>
14#include <userver/engine/task/cancel.hpp>
15#include <userver/utils/assert.hpp>
16#include <userver/utils/atomic.hpp>
18USERVER_NAMESPACE_BEGIN
24template <
bool MultipleProducer,
bool MultipleConsumer>
25struct SimpleQueuePolicy {
27 static constexpr std::size_t GetElementSize(
const T&) {
31 static constexpr bool kIsMultipleProducer{MultipleProducer};
32 static constexpr bool kIsMultipleConsumer{MultipleConsumer};
35template <
bool MultipleProducer,
bool MultipleConsumer>
36struct ContainerQueuePolicy {
38 static std::size_t GetElementSize(
const T& value) {
39 return std::size(value);
42 static constexpr bool kIsMultipleProducer{MultipleProducer};
43 static constexpr bool kIsMultipleConsumer{MultipleConsumer};
64template <
typename T,
typename QueuePolicy>
65class GenericQueue
final
66 :
public std::enable_shared_from_this<GenericQueue<T, QueuePolicy>> {
67 struct EmplaceEnabler
final {
69 explicit EmplaceEnabler() =
default;
73 std::conditional_t<QueuePolicy::kIsMultipleProducer,
74 moodycamel::ProducerToken, impl::NoToken>;
76 std::conditional_t<QueuePolicy::kIsMultipleProducer,
77 moodycamel::ConsumerToken, impl::NoToken>;
78 using MultiProducerToken = impl::MultiToken;
79 using MultiConsumerToken =
80 std::conditional_t<QueuePolicy::kIsMultipleProducer, impl::MultiToken,
83 using SingleProducerToken =
84 std::conditional_t<!QueuePolicy::kIsMultipleProducer,
85 moodycamel::ProducerToken, impl::NoToken>;
87 friend class Producer<GenericQueue, ProducerToken, EmplaceEnabler>;
88 friend class Producer<GenericQueue, MultiProducerToken, EmplaceEnabler>;
89 friend class Consumer<GenericQueue, ConsumerToken, EmplaceEnabler>;
90 friend class Consumer<GenericQueue, MultiConsumerToken, EmplaceEnabler>;
96 concurrent::Producer<GenericQueue, ProducerToken, EmplaceEnabler>;
98 concurrent::Consumer<GenericQueue, ConsumerToken, EmplaceEnabler>;
100 concurrent::Producer<GenericQueue, MultiProducerToken, EmplaceEnabler>;
101 using MultiConsumer =
102 concurrent::Consumer<GenericQueue, MultiConsumerToken, EmplaceEnabler>;
104 static constexpr std::size_t kUnbounded =
105 std::numeric_limits<std::size_t>::max() / 4;
109 explicit GenericQueue(std::size_t max_size, EmplaceEnabler )
111 single_producer_token_(queue_),
112 producer_side_(*
this, std::min(max_size, kUnbounded)),
113 consumer_side_(*
this) {}
116 UASSERT(consumers_count_ == kCreatedAndDead || !consumers_count_);
117 UASSERT(producers_count_ == kCreatedAndDead || !producers_count_);
119 if (producers_count_ == kCreatedAndDead) {
121 consumer_side_.ResumeBlockingOnPop();
126 ConsumerToken token{queue_};
127 while (consumer_side_.PopNoblock(token, value)) {
131 GenericQueue(GenericQueue&&) =
delete;
132 GenericQueue(
const GenericQueue&) =
delete;
133 GenericQueue& operator=(GenericQueue&&) =
delete;
134 GenericQueue& operator=(
const GenericQueue&) =
delete;
138 static std::shared_ptr<GenericQueue>
Create(
139 std::size_t max_size = kUnbounded) {
140 return std::make_shared<GenericQueue>(max_size, EmplaceEnabler{});
151 return Producer(
this->shared_from_this(), EmplaceEnabler{});
164 static_assert(QueuePolicy::kIsMultipleProducer,
165 "Trying to obtain MultiProducer for a single-producer queue");
167 return MultiProducer(
this->shared_from_this(), EmplaceEnabler{});
178 return Consumer(
this->shared_from_this(), EmplaceEnabler{});
191 static_assert(QueuePolicy::kIsMultipleConsumer,
192 "Trying to obtain MultiConsumer for a single-consumer queue");
194 return MultiConsumer(
this->shared_from_this(), EmplaceEnabler{});
200 producer_side_.SetSoftMaxSize(std::min(max_size, kUnbounded));
204 std::size_t
GetSoftMaxSize()
const {
return producer_side_.GetSoftMaxSize(); }
208 return producer_side_.GetSizeApproximate();
212 class SingleProducerSide;
213 class MultiProducerSide;
214 class SingleConsumerSide;
215 class MultiConsumerSide;
220 std::conditional_t<QueuePolicy::kIsMultipleProducer, MultiProducerSide,
226 std::conditional_t<QueuePolicy::kIsMultipleConsumer, MultiConsumerSide,
229 template <
typename Token>
230 [[nodiscard]]
bool Push(Token& token, T&& value, engine::Deadline deadline) {
231 const std::size_t value_size = QueuePolicy::GetElementSize(value);
233 return producer_side_.Push(token, std::move(value), deadline, value_size);
236 template <
typename Token>
237 [[nodiscard]]
bool PushNoblock(Token& token, T&& value) {
238 const std::size_t value_size = QueuePolicy::GetElementSize(value);
240 return producer_side_.PushNoblock(token, std::move(value), value_size);
243 template <
typename Token>
244 [[nodiscard]]
bool Pop(Token& token, T& value, engine::Deadline deadline) {
245 return consumer_side_.Pop(token, value, deadline);
248 template <
typename Token>
249 [[nodiscard]]
bool PopNoblock(Token& token, T& value) {
250 return consumer_side_.PopNoblock(token, value);
253 void PrepareProducer() {
254 std::size_t old_producers_count{};
255 utils::AtomicUpdate(producers_count_, [&](
auto old_value) {
256 UINVARIANT(QueuePolicy::kIsMultipleProducer || old_value != 1,
257 "Incorrect usage of queue producers");
258 old_producers_count = old_value;
259 return old_value == kCreatedAndDead ? 1 : old_value + 1;
262 if (old_producers_count == kCreatedAndDead) {
263 consumer_side_.ResumeBlockingOnPop();
267 void PrepareConsumer() {
268 std::size_t old_consumers_count{};
269 utils::AtomicUpdate(consumers_count_, [&](
auto old_value) {
270 UINVARIANT(QueuePolicy::kIsMultipleConsumer || old_value != 1,
271 "Incorrect usage of queue consumers");
272 old_consumers_count = old_value;
273 return old_value == kCreatedAndDead ? 1 : old_value + 1;
276 if (old_consumers_count == kCreatedAndDead) {
277 producer_side_.ResumeBlockingOnPush();
281 void MarkConsumerIsDead() {
282 const auto new_consumers_count =
283 utils::AtomicUpdate(consumers_count_, [](
auto old_value) {
284 return old_value == 1 ? kCreatedAndDead : old_value - 1;
286 if (new_consumers_count == kCreatedAndDead) {
287 producer_side_.StopBlockingOnPush();
291 void MarkProducerIsDead() {
292 const auto new_producers_count =
293 utils::AtomicUpdate(producers_count_, [](
auto old_value) {
294 return old_value == 1 ? kCreatedAndDead : old_value - 1;
296 if (new_producers_count == kCreatedAndDead) {
297 consumer_side_.StopBlockingOnPop();
303 bool NoMoreConsumers()
const {
return consumers_count_ == kCreatedAndDead; }
305 bool NoMoreProducers()
const {
return producers_count_ == kCreatedAndDead; }
309 template <
typename Token>
310 void DoPush(Token& token, T&& value) {
311 if constexpr (std::is_same_v<Token, moodycamel::ProducerToken>) {
312 static_assert(QueuePolicy::kIsMultipleProducer);
313 queue_.enqueue(token, std::move(value));
314 }
else if constexpr (std::is_same_v<Token, MultiProducerToken>) {
315 static_assert(QueuePolicy::kIsMultipleProducer);
316 queue_.enqueue(std::move(value));
318 static_assert(std::is_same_v<Token, impl::NoToken>);
319 static_assert(!QueuePolicy::kIsMultipleProducer);
320 queue_.enqueue(single_producer_token_, std::move(value));
323 consumer_side_.OnElementPushed();
326 template <
typename Token>
327 [[nodiscard]]
bool DoPop(Token& token, T& value) {
330 if constexpr (std::is_same_v<Token, moodycamel::ConsumerToken>) {
331 static_assert(QueuePolicy::kIsMultipleProducer);
332 success = queue_.try_dequeue(token, value);
333 }
else if constexpr (std::is_same_v<Token, impl::MultiToken>) {
334 static_assert(QueuePolicy::kIsMultipleProducer);
335 success = queue_.try_dequeue(value);
337 static_assert(std::is_same_v<Token, impl::NoToken>);
338 static_assert(!QueuePolicy::kIsMultipleProducer);
339 success = queue_.try_dequeue_from_producer(single_producer_token_, value);
343 producer_side_.OnElementPopped(QueuePolicy::GetElementSize(value));
350 moodycamel::ConcurrentQueue<T> queue_{1};
351 std::atomic<std::size_t> consumers_count_{0};
352 std::atomic<std::size_t> producers_count_{0};
354 SingleProducerToken single_producer_token_;
356 ProducerSide producer_side_;
357 ConsumerSide consumer_side_;
359 static constexpr std::size_t kCreatedAndDead =
360 std::numeric_limits<std::size_t>::max();
361 static constexpr std::size_t kSemaphoreUnlockValue =
362 std::numeric_limits<std::size_t>::max() / 2;
366template <
typename T,
typename QueuePolicy>
367class GenericQueue<T, QueuePolicy>::SingleProducerSide
final {
369 explicit SingleProducerSide(GenericQueue& queue, std::size_t capacity)
370 : queue_(queue), used_capacity_(0), total_capacity_(capacity) {}
374 template <
typename Token>
375 [[nodiscard]]
bool Push(Token& token, T&& value, engine::Deadline deadline,
376 std::size_t value_size) {
377 bool no_more_consumers =
false;
378 const bool success = non_full_event_.WaitUntil(deadline, [&] {
379 if (queue_.NoMoreConsumers()) {
380 no_more_consumers =
true;
383 if (DoPush(token, std::move(value), value_size)) {
388 return success && !no_more_consumers;
391 template <
typename Token>
392 [[nodiscard]]
bool PushNoblock(Token& token, T&& value,
393 std::size_t value_size) {
394 return !queue_.NoMoreConsumers() &&
395 DoPush(token, std::move(value), value_size);
398 void OnElementPopped(std::size_t released_capacity) {
399 used_capacity_.fetch_sub(released_capacity);
400 non_full_event_.Send();
403 void StopBlockingOnPush() { non_full_event_.Send(); }
405 void ResumeBlockingOnPush() {}
407 void SetSoftMaxSize(std::size_t new_capacity) {
408 const auto old_capacity = total_capacity_.exchange(new_capacity);
409 if (new_capacity > old_capacity) non_full_event_.Send();
412 std::size_t GetSoftMaxSize()
const noexcept {
return total_capacity_.load(); }
414 std::size_t GetSizeApproximate()
const noexcept {
415 return used_capacity_.load();
419 template <
typename Token>
420 [[nodiscard]]
bool DoPush(Token& token, T&& value, std::size_t value_size) {
421 if (used_capacity_.load() + value_size > total_capacity_.load()) {
425 used_capacity_.fetch_add(value_size);
426 queue_.DoPush(token, std::move(value));
430 GenericQueue& queue_;
431 engine::SingleConsumerEvent non_full_event_;
432 std::atomic<std::size_t> used_capacity_;
433 std::atomic<std::size_t> total_capacity_;
437template <
typename T,
typename QueuePolicy>
438class GenericQueue<T, QueuePolicy>::MultiProducerSide
final {
440 explicit MultiProducerSide(GenericQueue& queue, std::size_t capacity)
442 remaining_capacity_(capacity),
443 remaining_capacity_control_(remaining_capacity_) {}
447 template <
typename Token>
448 [[nodiscard]]
bool Push(Token& token, T&& value, engine::Deadline deadline,
449 std::size_t value_size) {
450 return remaining_capacity_.try_lock_shared_until_count(deadline,
452 DoPush(token, std::move(value), value_size);
455 template <
typename Token>
456 [[nodiscard]]
bool PushNoblock(Token& token, T&& value,
457 std::size_t value_size) {
458 return remaining_capacity_.try_lock_shared_count(value_size) &&
459 DoPush(token, std::move(value), value_size);
462 void OnElementPopped(std::size_t value_size) {
463 remaining_capacity_.unlock_shared_count(value_size);
466 void StopBlockingOnPush() {
467 remaining_capacity_control_.SetCapacityOverride(0);
470 void ResumeBlockingOnPush() {
471 remaining_capacity_control_.RemoveCapacityOverride();
474 void SetSoftMaxSize(std::size_t count) {
475 remaining_capacity_control_.SetCapacity(count);
478 std::size_t GetSizeApproximate()
const noexcept {
479 return remaining_capacity_.UsedApprox();
482 std::size_t GetSoftMaxSize()
const noexcept {
483 return remaining_capacity_control_.GetCapacity();
487 template <
typename Token>
488 [[nodiscard]]
bool DoPush(Token& token, T&& value, std::size_t value_size) {
489 if (queue_.NoMoreConsumers()) {
490 remaining_capacity_.unlock_shared_count(value_size);
494 queue_.DoPush(token, std::move(value));
498 GenericQueue& queue_;
499 engine::CancellableSemaphore remaining_capacity_;
500 concurrent::impl::SemaphoreCapacityControl remaining_capacity_control_;
504template <
typename T,
typename QueuePolicy>
505class GenericQueue<T, QueuePolicy>::SingleConsumerSide
final {
507 explicit SingleConsumerSide(GenericQueue& queue)
508 : queue_(queue), element_count_(0) {}
511 template <
typename Token>
512 [[nodiscard]]
bool Pop(Token& token, T& value, engine::Deadline deadline) {
513 bool no_more_producers =
false;
514 const bool success = nonempty_event_.WaitUntil(deadline, [&] {
515 if (DoPop(token, value)) {
518 if (queue_.NoMoreProducers()) {
522 if (!DoPop(token, value)) {
523 no_more_producers =
true;
529 return success && !no_more_producers;
532 template <
typename Token>
533 [[nodiscard]]
bool PopNoblock(Token& token, T& value) {
534 return DoPop(token, value);
537 void OnElementPushed() {
539 nonempty_event_.Send();
542 void StopBlockingOnPop() { nonempty_event_.Send(); }
544 void ResumeBlockingOnPop() {}
546 std::size_t GetElementCount()
const {
return element_count_; }
549 template <
typename Token>
550 [[nodiscard]]
bool DoPop(Token& token, T& value) {
551 if (queue_.DoPop(token, value)) {
553 nonempty_event_.Reset();
559 GenericQueue& queue_;
560 engine::SingleConsumerEvent nonempty_event_;
561 std::atomic<std::size_t> element_count_;
565template <
typename T,
typename QueuePolicy>
566class GenericQueue<T, QueuePolicy>::MultiConsumerSide
final {
568 explicit MultiConsumerSide(GenericQueue& queue)
570 element_count_(kUnbounded),
571 element_count_control_(element_count_) {
572 const bool success = element_count_.try_lock_shared_count(kUnbounded);
576 ~MultiConsumerSide() { element_count_.unlock_shared_count(kUnbounded); }
579 template <
typename Token>
580 [[nodiscard]]
bool Pop(Token& token, T& value, engine::Deadline deadline) {
581 return element_count_.try_lock_shared_until(deadline) &&
585 template <
typename Token>
586 [[nodiscard]]
bool PopNoblock(Token& token, T& value) {
587 return element_count_.try_lock_shared() && DoPop(token, value);
590 void OnElementPushed() { element_count_.unlock_shared(); }
592 void StopBlockingOnPop() {
593 element_count_control_.SetCapacityOverride(kUnbounded +
594 kSemaphoreUnlockValue);
597 void ResumeBlockingOnPop() {
598 element_count_control_.RemoveCapacityOverride();
601 std::size_t GetElementCount()
const {
602 const std::size_t cur_element_count = element_count_.RemainingApprox();
603 if (cur_element_count < kUnbounded) {
604 return cur_element_count;
605 }
else if (cur_element_count <= kSemaphoreUnlockValue) {
608 return cur_element_count - kSemaphoreUnlockValue;
612 template <
typename Token>
613 [[nodiscard]]
bool DoPop(Token& token, T& value) {
615 if (queue_.DoPop(token, value)) {
618 if (queue_.NoMoreProducers()) {
619 element_count_.unlock_shared();
627 GenericQueue& queue_;
628 engine::CancellableSemaphore element_count_;
629 concurrent::impl::SemaphoreCapacityControl element_count_control_;
647using NonFifoMpmcQueue = GenericQueue<T, impl::SimpleQueuePolicy<
true,
true>>;
656using NonFifoMpscQueue = GenericQueue<T, impl::SimpleQueuePolicy<
true,
false>>;
664using SpmcQueue = GenericQueue<T, impl::SimpleQueuePolicy<
false,
true>>;
672using SpscQueue = GenericQueue<T, impl::SimpleQueuePolicy<
false,
false>>;
680using StringStreamQueue =
681 GenericQueue<std::string, impl::ContainerQueuePolicy<
false,
false>>;