122class GenericQueue
final :
public std::enable_shared_from_this<GenericQueue<T, QueuePolicy>> {
123 struct EmplaceEnabler
final {
125 explicit EmplaceEnabler() =
default;
130 "QueuePolicy must inherit from concurrent::DefaultQueuePolicy"
135 using ProducerToken =
136 std::conditional_t<QueuePolicy::kIsMultipleProducer, moodycamel::ProducerToken, impl::NoToken>;
137 using ConsumerToken =
138 std::conditional_t<QueuePolicy::kIsMultipleProducer, moodycamel::ConsumerToken, impl::NoToken>;
139 using MultiProducerToken = impl::MultiToken;
140 using MultiConsumerToken = std::conditional_t<QueuePolicy::kIsMultipleProducer, impl::MultiToken, impl::NoToken>;
142 using SingleProducerToken =
143 std::conditional_t<!QueuePolicy::kIsMultipleProducer, moodycamel::ProducerToken, impl::NoToken>;
145 friend class Producer<GenericQueue, ProducerToken, EmplaceEnabler>;
146 friend class Producer<GenericQueue, MultiProducerToken, EmplaceEnabler>;
147 friend class Consumer<GenericQueue, ConsumerToken, EmplaceEnabler>;
148 friend class Consumer<GenericQueue, MultiConsumerToken, EmplaceEnabler>;
153 using Producer =
concurrent::Producer<GenericQueue, ProducerToken, EmplaceEnabler>;
154 using Consumer =
concurrent::Consumer<GenericQueue, ConsumerToken, EmplaceEnabler>;
155 using MultiProducer =
concurrent::Producer<GenericQueue, MultiProducerToken, EmplaceEnabler>;
156 using MultiConsumer =
concurrent::Consumer<GenericQueue, MultiConsumerToken, EmplaceEnabler>;
158 static constexpr std::size_t kUnbounded = std::numeric_limits<std::size_t>::max() / 4;
162 explicit GenericQueue(std::size_t max_size, EmplaceEnabler )
164 single_producer_token_(queue_),
165 producer_side_(*
this, std::min(max_size, kUnbounded)),
166 consumer_side_(*
this) {}
169 UASSERT(consumers_count_ == kCreatedAndDead || !consumers_count_);
170 UASSERT(producers_count_ == kCreatedAndDead || !producers_count_);
172 if (producers_count_ == kCreatedAndDead) {
174 consumer_side_.ResumeBlockingOnPop();
179 ConsumerToken token{queue_};
180 while (consumer_side_.PopNoblock(token, value)) {
184 GenericQueue(GenericQueue&&) =
delete;
185 GenericQueue(
const GenericQueue&) =
delete;
186 GenericQueue& operator=(GenericQueue&&) =
delete;
187 GenericQueue& operator=(
const GenericQueue&) =
delete;
191 static std::shared_ptr<GenericQueue>
Create(std::size_t max_size = kUnbounded) {
192 return std::make_shared<GenericQueue>(max_size, EmplaceEnabler{});
191 static std::shared_ptr<GenericQueue>
Create(std::size_t max_size = kUnbounded) {
…}
203 return Producer(
this->shared_from_this(), EmplaceEnabler{});
216 static_assert(QueuePolicy::kIsMultipleProducer,
"Trying to obtain MultiProducer for a single-producer queue");
218 return MultiProducer(
this->shared_from_this(), EmplaceEnabler{});
229 return Consumer(
this->shared_from_this(), EmplaceEnabler{});
242 static_assert(QueuePolicy::kIsMultipleConsumer,
"Trying to obtain MultiConsumer for a single-consumer queue");
244 return MultiConsumer(
this->shared_from_this(), EmplaceEnabler{});
249 void SetSoftMaxSize(std::size_t max_size) { producer_side_.SetSoftMaxSize(std::min(max_size, kUnbounded)); }
258 class SingleProducerSide;
259 class MultiProducerSide;
260 class NoMaxSizeProducerSide;
261 class SingleConsumerSide;
262 class MultiConsumerSide;
266 using ProducerSide = std::conditional_t<
268 NoMaxSizeProducerSide,
270 QueuePolicy::kIsMultipleProducer,
272 SingleProducerSide>>;
276 using ConsumerSide = std::conditional_t<QueuePolicy::kIsMultipleConsumer, MultiConsumerSide, SingleConsumerSide>;
278 template <
typename Token>
279 [[nodiscard]]
bool Push(Token& token, T&& value, engine::Deadline deadline) {
280 const std::size_t value_size = QueuePolicy::GetElementSize(value);
281 return producer_side_.Push(token, std::move(value), deadline, value_size);
284 template <
typename Token>
285 [[nodiscard]]
bool PushNoblock(Token& token, T&& value) {
286 const std::size_t value_size = QueuePolicy::GetElementSize(value);
287 return producer_side_.PushNoblock(token, std::move(value), value_size);
290 template <
typename Token>
291 [[nodiscard]]
bool Pop(Token& token, T& value, engine::Deadline deadline) {
292 return consumer_side_.Pop(token, value, deadline);
295 template <
typename Token>
296 [[nodiscard]]
bool PopNoblock(Token& token, T& value) {
297 return consumer_side_.PopNoblock(token, value);
300 void PrepareProducer() {
301 std::size_t old_producers_count{};
302 utils::AtomicUpdate(producers_count_, [&](
auto old_value) {
303 UINVARIANT(QueuePolicy::kIsMultipleProducer || old_value != 1,
"Incorrect usage of queue producers");
304 old_producers_count = old_value;
305 return old_value == kCreatedAndDead ? 1 : old_value + 1;
308 if (old_producers_count == kCreatedAndDead) {
309 consumer_side_.ResumeBlockingOnPop();
313 void PrepareConsumer() {
314 std::size_t old_consumers_count{};
315 utils::AtomicUpdate(consumers_count_, [&](
auto old_value) {
316 UINVARIANT(QueuePolicy::kIsMultipleConsumer || old_value != 1,
"Incorrect usage of queue consumers");
317 old_consumers_count = old_value;
318 return old_value == kCreatedAndDead ? 1 : old_value + 1;
321 if (old_consumers_count == kCreatedAndDead) {
322 producer_side_.ResumeBlockingOnPush();
326 void MarkConsumerIsDead() {
327 const auto new_consumers_count =
utils::AtomicUpdate(consumers_count_, [](
auto old_value) {
328 return old_value == 1 ? kCreatedAndDead : old_value - 1;
330 if (new_consumers_count == kCreatedAndDead) {
331 producer_side_.StopBlockingOnPush();
335 void MarkProducerIsDead() {
336 const auto new_producers_count =
utils::AtomicUpdate(producers_count_, [](
auto old_value) {
337 return old_value == 1 ? kCreatedAndDead : old_value - 1;
339 if (new_producers_count == kCreatedAndDead) {
340 consumer_side_.StopBlockingOnPop();
346 bool NoMoreConsumers()
const {
return consumers_count_ == kCreatedAndDead; }
348 bool NoMoreProducers()
const {
return producers_count_ == kCreatedAndDead; }
352 template <
typename Token>
353 void DoPush(Token& token, T&& value) {
354 if constexpr (std::is_same_v<Token, moodycamel::ProducerToken>) {
355 static_assert(QueuePolicy::kIsMultipleProducer);
356 queue_.enqueue(token, std::move(value));
357 }
else if constexpr (std::is_same_v<Token, MultiProducerToken>) {
358 static_assert(QueuePolicy::kIsMultipleProducer);
359 queue_.enqueue(std::move(value));
361 static_assert(std::is_same_v<Token, impl::NoToken>);
362 static_assert(!QueuePolicy::kIsMultipleProducer);
363 queue_.enqueue(single_producer_token_, std::move(value));
366 consumer_side_.OnElementPushed();
369 template <
typename Token>
370 [[nodiscard]]
bool DoPop(Token& token, T& value) {
373 if constexpr (std::is_same_v<Token, moodycamel::ConsumerToken>) {
374 static_assert(QueuePolicy::kIsMultipleProducer);
375 success = queue_.try_dequeue(token, value);
376 }
else if constexpr (std::is_same_v<Token, impl::MultiToken>) {
377 static_assert(QueuePolicy::kIsMultipleProducer);
378 success = queue_.try_dequeue(value);
380 static_assert(std::is_same_v<Token, impl::NoToken>);
381 static_assert(!QueuePolicy::kIsMultipleProducer);
382 success = queue_.try_dequeue_from_producer(single_producer_token_, value);
386 producer_side_.OnElementPopped(QueuePolicy::GetElementSize(value));
393 moodycamel::ConcurrentQueue<T> queue_{1};
394 std::atomic<std::size_t> consumers_count_{0};
395 std::atomic<std::size_t> producers_count_{0};
397 SingleProducerToken single_producer_token_;
399 ProducerSide producer_side_;
400 ConsumerSide consumer_side_;
402 static constexpr std::size_t kCreatedAndDead = std::numeric_limits<std::size_t>::max();
403 static constexpr std::size_t kSemaphoreUnlockValue = std::numeric_limits<std::size_t>::max() / 2;
407class GenericQueue<T, QueuePolicy>::SingleProducerSide
final {
409 SingleProducerSide(GenericQueue& queue, std::size_t capacity)
410 : queue_(queue), used_capacity_(0), total_capacity_(capacity) {}
414 template <
typename Token>
415 [[nodiscard]]
bool Push(Token& token, T&& value, engine::Deadline deadline, std::size_t value_size) {
416 bool no_more_consumers =
false;
417 const bool success = non_full_event_.WaitUntil(deadline, [&] {
418 if (queue_.NoMoreConsumers()) {
419 no_more_consumers =
true;
422 if (DoPush(token, std::move(value), value_size)) {
427 return success && !no_more_consumers;
430 template <
typename Token>
431 [[nodiscard]]
bool PushNoblock(Token& token, T&& value, std::size_t value_size) {
432 return !queue_.NoMoreConsumers() && DoPush(token, std::move(value), value_size);
435 void OnElementPopped(std::size_t released_capacity) {
436 used_capacity_.fetch_sub(released_capacity);
440 void StopBlockingOnPush() { non_full_event_
.Send(); }
442 void ResumeBlockingOnPush() {}
444 void SetSoftMaxSize(std::size_t new_capacity) {
445 const auto old_capacity = total_capacity_.exchange(new_capacity);
446 if (new_capacity > old_capacity) non_full_event_
.Send();
449 std::size_t GetSoftMaxSize()
const noexcept {
return total_capacity_.load(); }
451 std::size_t GetSizeApproximate()
const noexcept {
return used_capacity_.load(); }
454 template <
typename Token>
455 [[nodiscard]]
bool DoPush(Token& token, T&& value, std::size_t value_size) {
456 if (used_capacity_.load() + value_size > total_capacity_.load()) {
460 used_capacity_.fetch_add(value_size);
461 queue_.DoPush(token, std::move(value));
465 GenericQueue& queue_;
466 engine::SingleConsumerEvent non_full_event_;
467 std::atomic<std::size_t> used_capacity_;
468 std::atomic<std::size_t> total_capacity_;
472class GenericQueue<T, QueuePolicy>::MultiProducerSide
final {
474 MultiProducerSide(GenericQueue& queue, std::size_t capacity)
475 : queue_(queue), remaining_capacity_
(capacity
), remaining_capacity_control_(remaining_capacity_) {}
479 template <
typename Token>
480 [[nodiscard]]
bool Push(Token& token, T&& value, engine::Deadline deadline, std::size_t value_size) {
481 return remaining_capacity_.try_lock_shared_until_count(deadline, value_size) &&
482 DoPush(token, std::move(value), value_size);
485 template <
typename Token>
486 [[nodiscard]]
bool PushNoblock(Token& token, T&& value, std::size_t value_size) {
487 return remaining_capacity_.try_lock_shared_count(value_size) && DoPush(token, std::move(value), value_size);
490 void OnElementPopped(std::size_t value_size) { remaining_capacity_.unlock_shared_count(value_size); }
492 void StopBlockingOnPush() { remaining_capacity_control_.SetCapacityOverride(0); }
494 void ResumeBlockingOnPush() { remaining_capacity_control_.RemoveCapacityOverride(); }
496 void SetSoftMaxSize(std::size_t count) { remaining_capacity_control_.SetCapacity(count); }
498 std::size_t GetSizeApproximate()
const noexcept {
return remaining_capacity_
.UsedApprox(); }
500 std::size_t GetSoftMaxSize()
const noexcept {
return remaining_capacity_control_.GetCapacity(); }
503 template <
typename Token>
504 [[nodiscard]]
bool DoPush(Token& token, T&& value, std::size_t value_size) {
505 if (queue_.NoMoreConsumers()) {
506 remaining_capacity_.unlock_shared_count(value_size);
510 queue_.DoPush(token, std::move(value));
514 GenericQueue& queue_;
515 engine::CancellableSemaphore remaining_capacity_;
516 concurrent::impl::SemaphoreCapacityControl remaining_capacity_control_;
617class GenericQueue<T, QueuePolicy>::MultiConsumerSide
final {
619 explicit MultiConsumerSide(GenericQueue& queue)
620 : queue_(queue), element_count_
(kUnbounded
), element_count_control_(element_count_) {
621 const bool success = element_count_.try_lock_shared_count(kUnbounded);
625 ~MultiConsumerSide() { element_count_.unlock_shared_count(kUnbounded); }
628 template <
typename Token>
629 [[nodiscard]]
bool Pop(Token& token, T& value, engine::Deadline deadline) {
630 return element_count_.try_lock_shared_until(deadline) && DoPop(token, value);
633 template <
typename Token>
634 [[nodiscard]]
bool PopNoblock(Token& token, T& value) {
635 return element_count_.try_lock_shared() && DoPop(token, value);
640 void StopBlockingOnPop() { element_count_control_.SetCapacityOverride(kUnbounded + kSemaphoreUnlockValue); }
642 void ResumeBlockingOnPop() { element_count_control_.RemoveCapacityOverride(); }
644 std::size_t GetElementCount()
const {
646 if (cur_element_count < kUnbounded) {
647 return cur_element_count;
648 }
else if (cur_element_count <= kSemaphoreUnlockValue) {
651 return cur_element_count - kSemaphoreUnlockValue;
655 template <
typename Token>
656 [[nodiscard]]
bool DoPop(Token& token, T& value) {
658 if (queue_.DoPop(token, value)) {
661 if (queue_.NoMoreProducers()) {
670 GenericQueue& queue_;
671 engine::CancellableSemaphore element_count_;
672 concurrent::impl::SemaphoreCapacityControl element_count_control_;