7#include <moodycamel/concurrentqueue.h>
9#include <userver/concurrent/impl/semaphore_capacity_control.hpp>
10#include <userver/concurrent/queue_helpers.hpp>
11#include <userver/engine/deadline.hpp>
12#include <userver/engine/semaphore.hpp>
13#include <userver/engine/single_consumer_event.hpp>
14#include <userver/engine/task/cancel.hpp>
15#include <userver/utils/assert.hpp>
16#include <userver/utils/atomic.hpp>
18USERVER_NAMESPACE_BEGIN
24template <
bool MultipleProducer,
bool MultipleConsumer>
25struct SimpleQueuePolicy {
27 static constexpr std::size_t GetElementSize(
const T&) {
31 static constexpr bool kIsMultipleProducer{MultipleProducer};
32 static constexpr bool kIsMultipleConsumer{MultipleConsumer};
35template <
bool MultipleProducer,
bool MultipleConsumer>
36struct ContainerQueuePolicy {
38 static std::size_t GetElementSize(
const T& value) {
39 return std::size(value);
42 static constexpr bool kIsMultipleProducer{MultipleProducer};
43 static constexpr bool kIsMultipleConsumer{MultipleConsumer};
51template <
typename T,
typename QueuePolicy>
52class GenericQueue
final
53 :
public std::enable_shared_from_this<GenericQueue<T, QueuePolicy>> {
54 struct EmplaceEnabler
final {
56 explicit EmplaceEnabler() =
default;
60 std::conditional_t<QueuePolicy::kIsMultipleProducer,
61 moodycamel::ProducerToken, impl::NoToken>;
63 std::conditional_t<QueuePolicy::kIsMultipleProducer,
64 moodycamel::ConsumerToken, impl::NoToken>;
65 using MultiProducerToken = impl::MultiToken;
66 using MultiConsumerToken =
67 std::conditional_t<QueuePolicy::kIsMultipleProducer, impl::MultiToken,
70 using SingleProducerToken =
71 std::conditional_t<!QueuePolicy::kIsMultipleProducer,
72 moodycamel::ProducerToken, impl::NoToken>;
74 friend class Producer<GenericQueue, ProducerToken, EmplaceEnabler>;
75 friend class Producer<GenericQueue, MultiProducerToken, EmplaceEnabler>;
76 friend class Consumer<GenericQueue, ConsumerToken, EmplaceEnabler>;
77 friend class Consumer<GenericQueue, MultiConsumerToken, EmplaceEnabler>;
83 concurrent::Producer<GenericQueue, ProducerToken, EmplaceEnabler>;
85 concurrent::Consumer<GenericQueue, ConsumerToken, EmplaceEnabler>;
87 concurrent::Producer<GenericQueue, MultiProducerToken, EmplaceEnabler>;
89 concurrent::Consumer<GenericQueue, MultiConsumerToken, EmplaceEnabler>;
91 static constexpr std::size_t kUnbounded =
92 std::numeric_limits<std::size_t>::max() / 4;
96 explicit GenericQueue(std::size_t max_size, EmplaceEnabler )
98 single_producer_token_(queue_),
99 producer_side_(*
this, std::min(max_size, kUnbounded)),
100 consumer_side_(*
this) {}
103 UASSERT(consumers_count_ == kCreatedAndDead || !consumers_count_);
104 UASSERT(producers_count_ == kCreatedAndDead || !producers_count_);
106 if (producers_count_ == kCreatedAndDead) {
108 consumer_side_.ResumeBlockingOnPop();
113 ConsumerToken token{queue_};
114 while (consumer_side_.PopNoblock(token, value)) {
118 GenericQueue(GenericQueue&&) =
delete;
119 GenericQueue(
const GenericQueue&) =
delete;
120 GenericQueue& operator=(GenericQueue&&) =
delete;
121 GenericQueue& operator=(
const GenericQueue&) =
delete;
125 static std::shared_ptr<GenericQueue>
Create(
126 std::size_t max_size = kUnbounded) {
127 return std::make_shared<GenericQueue>(max_size, EmplaceEnabler{});
138 return Producer(
this->shared_from_this(), EmplaceEnabler{});
151 static_assert(QueuePolicy::kIsMultipleProducer,
152 "Trying to obtain MultiProducer for a single-producer queue");
154 return MultiProducer(
this->shared_from_this(), EmplaceEnabler{});
165 return Consumer(
this->shared_from_this(), EmplaceEnabler{});
178 static_assert(QueuePolicy::kIsMultipleConsumer,
179 "Trying to obtain MultiConsumer for a single-consumer queue");
181 return MultiConsumer(
this->shared_from_this(), EmplaceEnabler{});
187 producer_side_.SetSoftMaxSize(std::min(max_size, kUnbounded));
191 std::size_t
GetSoftMaxSize()
const {
return producer_side_.GetSoftMaxSize(); }
195 return producer_side_.GetSizeApproximate();
199 class SingleProducerSide;
200 class MultiProducerSide;
201 class SingleConsumerSide;
202 class MultiConsumerSide;
207 std::conditional_t<QueuePolicy::kIsMultipleProducer, MultiProducerSide,
213 std::conditional_t<QueuePolicy::kIsMultipleConsumer, MultiConsumerSide,
216 template <
typename Token>
217 [[nodiscard]]
bool Push(Token& token, T&& value, engine::Deadline deadline) {
218 return producer_side_.Push(token, std::move(value), deadline);
221 template <
typename Token>
222 [[nodiscard]]
bool PushNoblock(Token& token, T&& value) {
223 return producer_side_.PushNoblock(token, std::move(value));
226 template <
typename Token>
227 [[nodiscard]]
bool Pop(Token& token, T& value, engine::Deadline deadline) {
228 return consumer_side_.Pop(token, value, deadline);
231 template <
typename Token>
232 [[nodiscard]]
bool PopNoblock(Token& token, T& value) {
233 return consumer_side_.PopNoblock(token, value);
236 void PrepareProducer() {
237 std::size_t old_producers_count{};
238 utils::AtomicUpdate(producers_count_, [&](
auto old_value) {
239 old_producers_count = old_value;
240 return old_value == kCreatedAndDead ? 1 : old_value + 1;
243 if (old_producers_count == kCreatedAndDead) {
244 consumer_side_.ResumeBlockingOnPop();
246 UASSERT(QueuePolicy::kIsMultipleProducer || old_producers_count != 1);
249 void PrepareConsumer() {
250 std::size_t old_consumers_count{};
251 utils::AtomicUpdate(consumers_count_, [&](
auto old_value) {
252 old_consumers_count = old_value;
253 return old_value == kCreatedAndDead ? 1 : old_value + 1;
256 if (old_consumers_count == kCreatedAndDead) {
257 producer_side_.ResumeBlockingOnPush();
259 UASSERT(QueuePolicy::kIsMultipleConsumer || old_consumers_count != 1);
262 void MarkConsumerIsDead() {
263 const auto new_consumers_count =
264 utils::AtomicUpdate(consumers_count_, [](
auto old_value) {
265 return old_value == 1 ? kCreatedAndDead : old_value - 1;
267 if (new_consumers_count == kCreatedAndDead) {
268 producer_side_.StopBlockingOnPush();
272 void MarkProducerIsDead() {
273 const auto new_producers_count =
274 utils::AtomicUpdate(producers_count_, [](
auto old_value) {
275 return old_value == 1 ? kCreatedAndDead : old_value - 1;
277 if (new_producers_count == kCreatedAndDead) {
278 consumer_side_.StopBlockingOnPop();
284 bool NoMoreConsumers()
const {
return consumers_count_ == kCreatedAndDead; }
286 bool NoMoreProducers()
const {
return producers_count_ == kCreatedAndDead; }
290 template <
typename Token>
291 void DoPush(Token& token, T&& value) {
292 if constexpr (std::is_same_v<Token, moodycamel::ProducerToken>) {
293 static_assert(QueuePolicy::kIsMultipleProducer);
294 queue_.enqueue(token, std::move(value));
295 }
else if constexpr (std::is_same_v<Token, MultiProducerToken>) {
296 static_assert(QueuePolicy::kIsMultipleProducer);
297 queue_.enqueue(std::move(value));
299 static_assert(std::is_same_v<Token, impl::NoToken>);
300 static_assert(!QueuePolicy::kIsMultipleProducer);
301 queue_.enqueue(single_producer_token_, std::move(value));
304 consumer_side_.OnElementPushed();
307 template <
typename Token>
308 [[nodiscard]]
bool DoPop(Token& token, T& value) {
311 if constexpr (std::is_same_v<Token, moodycamel::ConsumerToken>) {
312 static_assert(QueuePolicy::kIsMultipleProducer);
313 success = queue_.try_dequeue(token, value);
314 }
else if constexpr (std::is_same_v<Token, impl::MultiToken>) {
315 static_assert(QueuePolicy::kIsMultipleProducer);
316 success = queue_.try_dequeue(value);
318 static_assert(std::is_same_v<Token, impl::NoToken>);
319 static_assert(!QueuePolicy::kIsMultipleProducer);
320 success = queue_.try_dequeue_from_producer(single_producer_token_, value);
324 producer_side_.OnElementPopped(QueuePolicy::GetElementSize(value));
331 moodycamel::ConcurrentQueue<T> queue_{1};
332 std::atomic<std::size_t> consumers_count_{0};
333 std::atomic<std::size_t> producers_count_{0};
335 SingleProducerToken single_producer_token_;
337 ProducerSide producer_side_;
338 ConsumerSide consumer_side_;
340 static constexpr std::size_t kCreatedAndDead =
341 std::numeric_limits<std::size_t>::max();
342 static constexpr std::size_t kSemaphoreUnlockValue =
343 std::numeric_limits<std::size_t>::max() / 2;
347template <
typename T,
typename QueuePolicy>
348class GenericQueue<T, QueuePolicy>::SingleProducerSide
final {
350 explicit SingleProducerSide(GenericQueue& queue, std::size_t capacity)
351 : queue_(queue), used_capacity_(0), total_capacity_(capacity) {}
355 template <
typename Token>
356 [[nodiscard]]
bool Push(Token& token, T&& value, engine::Deadline deadline) {
357 if (DoPush(token, std::move(value))) {
361 return non_full_event_.WaitForEventUntil(deadline) &&
363 DoPush(token, std::move(value));
366 template <
typename Token>
367 [[nodiscard]]
bool PushNoblock(Token& token, T&& value) {
368 return DoPush(token, std::move(value));
371 void OnElementPopped(std::size_t released_capacity) {
372 used_capacity_.fetch_sub(released_capacity);
373 non_full_event_.Send();
376 void StopBlockingOnPush() {
377 total_capacity_ += kSemaphoreUnlockValue;
378 non_full_event_.Send();
381 void ResumeBlockingOnPush() { total_capacity_ -= kSemaphoreUnlockValue; }
383 void SetSoftMaxSize(std::size_t new_capacity) {
384 const auto old_capacity = total_capacity_.exchange(new_capacity);
385 if (new_capacity > old_capacity) non_full_event_.Send();
388 std::size_t GetSoftMaxSize()
const noexcept {
return total_capacity_.load(); }
390 std::size_t GetSizeApproximate()
const noexcept {
391 return used_capacity_.load();
395 template <
typename Token>
396 [[nodiscard]]
bool DoPush(Token& token, T&& value) {
397 const std::size_t value_size = QueuePolicy::GetElementSize(value);
398 if (queue_.NoMoreConsumers() ||
399 used_capacity_.load() + value_size > total_capacity_.load()) {
403 used_capacity_.fetch_add(value_size);
404 queue_.DoPush(token, std::move(value));
405 non_full_event_.Reset();
409 GenericQueue& queue_;
410 engine::SingleConsumerEvent non_full_event_;
411 std::atomic<std::size_t> used_capacity_;
412 std::atomic<std::size_t> total_capacity_;
416template <
typename T,
typename QueuePolicy>
417class GenericQueue<T, QueuePolicy>::MultiProducerSide
final {
419 explicit MultiProducerSide(GenericQueue& queue, std::size_t capacity)
421 remaining_capacity_(capacity),
422 remaining_capacity_control_(remaining_capacity_) {}
426 template <
typename Token>
427 [[nodiscard]]
bool Push(Token& token, T&& value, engine::Deadline deadline) {
428 const std::size_t value_size = QueuePolicy::GetElementSize(value);
429 return remaining_capacity_.try_lock_shared_until_count(deadline,
431 DoPush(token, std::move(value));
434 template <
typename Token>
435 [[nodiscard]]
bool PushNoblock(Token& token, T&& value) {
436 const std::size_t value_size = QueuePolicy::GetElementSize(value);
437 return remaining_capacity_.try_lock_shared_count(value_size) &&
438 DoPush(token, std::move(value));
441 void OnElementPopped(std::size_t value_size) {
442 remaining_capacity_.unlock_shared_count(value_size);
445 void StopBlockingOnPush() {
446 remaining_capacity_control_.SetCapacityOverride(0);
449 void ResumeBlockingOnPush() {
450 remaining_capacity_control_.RemoveCapacityOverride();
453 void SetSoftMaxSize(std::size_t count) {
454 remaining_capacity_control_.SetCapacity(count);
457 std::size_t GetSizeApproximate()
const noexcept {
458 return remaining_capacity_.UsedApprox();
461 std::size_t GetSoftMaxSize()
const noexcept {
462 return remaining_capacity_control_.GetCapacity();
466 template <
typename Token>
467 [[nodiscard]]
bool DoPush(Token& token, T&& value) {
468 const std::size_t value_size = QueuePolicy::GetElementSize(value);
470 if (queue_.NoMoreConsumers()) {
471 remaining_capacity_.unlock_shared_count(value_size);
475 queue_.DoPush(token, std::move(value));
479 GenericQueue& queue_;
480 engine::CancellableSemaphore remaining_capacity_;
481 concurrent::impl::SemaphoreCapacityControl remaining_capacity_control_;
485template <
typename T,
typename QueuePolicy>
486class GenericQueue<T, QueuePolicy>::SingleConsumerSide
final {
488 explicit SingleConsumerSide(GenericQueue& queue)
489 : queue_(queue), element_count_(0) {}
492 template <
typename Token>
493 [[nodiscard]]
bool Pop(Token& token, T& value, engine::Deadline deadline) {
494 while (!DoPop(token, value)) {
495 if (queue_.NoMoreProducers() ||
496 !nonempty_event_.WaitForEventUntil(deadline)) {
500 return DoPop(token, value);
506 template <
typename Token>
507 [[nodiscard]]
bool PopNoblock(Token& token, T& value) {
508 return DoPop(token, value);
511 void OnElementPushed() {
513 nonempty_event_.Send();
516 void StopBlockingOnPop() { nonempty_event_.Send(); }
518 void ResumeBlockingOnPop() {}
520 std::size_t GetElementCount()
const {
return element_count_; }
523 template <
typename Token>
524 [[nodiscard]]
bool DoPop(Token& token, T& value) {
525 if (queue_.DoPop(token, value)) {
527 nonempty_event_.Reset();
533 GenericQueue& queue_;
534 engine::SingleConsumerEvent nonempty_event_;
535 std::atomic<std::size_t> element_count_;
539template <
typename T,
typename QueuePolicy>
540class GenericQueue<T, QueuePolicy>::MultiConsumerSide
final {
542 explicit MultiConsumerSide(GenericQueue& queue)
544 element_count_(kUnbounded),
545 element_count_control_(element_count_) {
546 const bool success = element_count_.try_lock_shared_count(kUnbounded);
550 ~MultiConsumerSide() { element_count_.unlock_shared_count(kUnbounded); }
553 template <
typename Token>
554 [[nodiscard]]
bool Pop(Token& token, T& value, engine::Deadline deadline) {
555 return element_count_.try_lock_shared_until(deadline) &&
559 template <
typename Token>
560 [[nodiscard]]
bool PopNoblock(Token& token, T& value) {
561 return element_count_.try_lock_shared() && DoPop(token, value);
564 void OnElementPushed() { element_count_.unlock_shared(); }
566 void StopBlockingOnPop() {
567 element_count_control_.SetCapacityOverride(kUnbounded +
568 kSemaphoreUnlockValue);
571 void ResumeBlockingOnPop() {
572 element_count_control_.RemoveCapacityOverride();
575 std::size_t GetElementCount()
const {
576 const std::size_t cur_element_count = element_count_.RemainingApprox();
577 if (cur_element_count < kUnbounded) {
578 return cur_element_count;
579 }
else if (cur_element_count <= kSemaphoreUnlockValue) {
582 return cur_element_count - kSemaphoreUnlockValue;
586 template <
typename Token>
587 [[nodiscard]]
bool DoPop(Token& token, T& value) {
589 if (queue_.DoPop(token, value)) {
592 if (queue_.NoMoreProducers()) {
593 element_count_.unlock_shared();
601 GenericQueue& queue_;
602 engine::CancellableSemaphore element_count_;
603 concurrent::impl::SemaphoreCapacityControl element_count_control_;
621using NonFifoMpmcQueue = GenericQueue<T, impl::SimpleQueuePolicy<
true,
true>>;
630using NonFifoMpscQueue = GenericQueue<T, impl::SimpleQueuePolicy<
true,
false>>;
638using SpmcQueue = GenericQueue<T, impl::SimpleQueuePolicy<
false,
true>>;
646using SpscQueue = GenericQueue<T, impl::SimpleQueuePolicy<
false,
false>>;
654using StringStreamQueue =
655 GenericQueue<std::string, impl::ContainerQueuePolicy<
false,
false>>;