7#include <moodycamel/concurrentqueue.h>
9#include <userver/concurrent/impl/semaphore_capacity_control.hpp>
10#include <userver/concurrent/queue_helpers.hpp>
18USERVER_NAMESPACE_BEGIN
24class MultiProducerToken final {
26 template <
typename T,
typename Traits>
35template <
typename T,
bool MultipleProducer,
bool MultipleConsumer>
37 :
public std::enable_shared_from_this<
38 GenericQueue<T, MultipleProducer, MultipleConsumer>> {
39 struct EmplaceEnabler final {
41 explicit EmplaceEnabler() =
default;
45 std::conditional_t<MultipleProducer, moodycamel::ProducerToken,
48 std::conditional_t<MultipleProducer, moodycamel::ConsumerToken,
50 using MultiProducerToken = impl::MultiProducerToken;
52 using SingleProducerToken =
53 std::conditional_t<!MultipleProducer, moodycamel::ProducerToken,
69 static constexpr std::size_t kUnbounded =
70 std::numeric_limits<std::size_t>::max() / 4;
74 explicit GenericQueue(std::size_t max_size, EmplaceEnabler )
76 single_producer_token_(queue_),
77 producer_side_(*
this, std::min(max_size, kUnbounded)),
78 consumer_side_(*
this) {}
81 UASSERT(consumers_count_ == kCreatedAndDead || !consumers_count_);
82 UASSERT(producers_count_ == kCreatedAndDead || !producers_count_);
84 if (producers_count_ == kCreatedAndDead) {
86 consumer_side_.ResumeBlockingOnPop();
91 ConsumerToken token{queue_};
92 while (consumer_side_.PopNoblock(token, value)) {
103 static std::shared_ptr<GenericQueue>
Create(
104 std::size_t max_size = kUnbounded) {
105 return std::make_shared<GenericQueue>(max_size, EmplaceEnabler{});
116 return Producer(this->shared_from_this(), EmplaceEnabler{});
129 static_assert(MultipleProducer,
130 "Trying to obtain MultiProducer for a single-producer queue");
132 return MultiProducer(this->shared_from_this(), EmplaceEnabler{});
142 std::size_t old_consumers_count{};
144 old_consumers_count = old_value;
145 return old_value == kCreatedAndDead ? 1 : old_value + 1;
148 if (old_consumers_count == kCreatedAndDead) {
149 producer_side_.ResumeBlockingOnPush();
151 UASSERT(MultipleConsumer || old_consumers_count != 1);
152 return Consumer(this->shared_from_this(), EmplaceEnabler{});
158 producer_side_.SetSoftMaxSize(std::min(max_size, kUnbounded));
168 class SingleProducerSide;
169 class MultiProducerSide;
170 class SingleConsumerSide;
171 class MultiConsumerSide;
175 using ProducerSide = std::conditional_t<MultipleProducer, MultiProducerSide,
180 using ConsumerSide = std::conditional_t<MultipleConsumer, MultiConsumerSide,
183 template <
typename Token>
184 [[nodiscard]]
bool Push(Token& token, T&& value,
engine::Deadline deadline) {
185 return producer_side_.Push(token, std::move(value), deadline);
188 template <
typename Token>
189 [[nodiscard]]
bool PushNoblock(Token& token, T&& value) {
190 return producer_side_.PushNoblock(token, std::move(value));
193 [[nodiscard]]
bool Pop(ConsumerToken& token, T& value,
195 return consumer_side_.Pop(token, value, deadline);
198 [[nodiscard]]
bool PopNoblock(ConsumerToken& token, T& value) {
199 return consumer_side_.PopNoblock(token, value);
202 void PrepareProducer() {
203 std::size_t old_producers_count{};
205 old_producers_count = old_value;
206 return old_value == kCreatedAndDead ? 1 : old_value + 1;
209 if (old_producers_count == kCreatedAndDead) {
210 consumer_side_.ResumeBlockingOnPop();
212 UASSERT(MultipleProducer || old_producers_count != 1);
215 void MarkConsumerIsDead() {
216 const auto new_consumers_count =
218 return old_value == 1 ? kCreatedAndDead : old_value - 1;
220 if (new_consumers_count == kCreatedAndDead) {
221 producer_side_.StopBlockingOnPush();
225 void MarkProducerIsDead() {
226 const auto new_producers_count =
228 return old_value == 1 ? kCreatedAndDead : old_value - 1;
230 if (new_producers_count == kCreatedAndDead) {
231 consumer_side_.StopBlockingOnPop();
236 bool NoMoreConsumers()
const {
return consumers_count_ == kCreatedAndDead; }
238 bool NoMoreProducers()
const {
return producers_count_ == kCreatedAndDead; }
241 template <
typename Token>
242 void DoPush(Token& token, T&& value) {
243 if constexpr (std::is_same_v<Token, moodycamel::ProducerToken>) {
244 static_assert(MultipleProducer);
245 queue_.enqueue(token, std::move(value));
246 }
else if constexpr (std::is_same_v<Token, MultiProducerToken>) {
247 static_assert(MultipleProducer);
248 queue_.enqueue(std::move(value));
250 static_assert(std::is_same_v<Token, impl::NoToken>);
251 static_assert(!MultipleProducer);
252 queue_.enqueue(single_producer_token_, std::move(value));
255 consumer_side_.OnElementPushed();
258 [[nodiscard]]
bool DoPop(ConsumerToken& token, T& value) {
259 bool success =
false;
260 if constexpr (MultipleProducer) {
261 success = queue_.try_dequeue(token, value);
264 success = queue_.try_dequeue_from_producer(single_producer_token_, value);
268 producer_side_.OnElementPopped();
276 std::atomic<std::size_t> consumers_count_{0};
277 std::atomic<std::size_t> producers_count_{0};
279 SingleProducerToken single_producer_token_;
281 ProducerSide producer_side_;
282 ConsumerSide consumer_side_;
284 static constexpr std::size_t kCreatedAndDead =
285 std::numeric_limits<std::size_t>::max();
286 static constexpr std::size_t kSemaphoreUnlockValue =
287 std::numeric_limits<std::size_t>::max() / 2;
291template <
typename T,
bool MP,
bool MC>
295 : queue_(queue), used_capacity_(0), total_capacity_(capacity) {}
299 template <
typename Token>
300 [[nodiscard]]
bool Push(Token& token, T&& value,
engine::Deadline deadline) {
301 if (DoPush(token, std::move(value))) {
305 return non_full_event_.WaitForEventUntil(deadline) &&
307 DoPush(token, std::move(value));
310 template <
typename Token>
311 [[nodiscard]]
bool PushNoblock(Token& token, T&& value) {
312 return DoPush(token, std::move(value));
315 void OnElementPopped() {
317 non_full_event_.Send();
320 void StopBlockingOnPush() {
321 total_capacity_ += kSemaphoreUnlockValue;
322 non_full_event_.Send();
325 void ResumeBlockingOnPush() { total_capacity_ -= kSemaphoreUnlockValue; }
328 const auto old_capacity = total_capacity_.exchange(new_capacity);
329 if (new_capacity > old_capacity) non_full_event_.Send();
332 std::size_t
GetSoftMaxSize()
const noexcept {
return total_capacity_.load(); }
335 template <
typename Token>
336 [[nodiscard]]
bool DoPush(Token& token, T&& value) {
337 if (queue_.NoMoreConsumers() ||
338 used_capacity_.load() >= total_capacity_.load()) {
343 queue_.DoPush(token, std::move(value));
344 non_full_event_.Reset();
350 std::atomic<std::size_t> used_capacity_;
351 std::atomic<std::size_t> total_capacity_;
355template <
typename T,
bool MP,
bool MC>
360 remaining_capacity_(capacity),
361 remaining_capacity_control_(remaining_capacity_) {}
365 template <
typename Token>
366 [[nodiscard]]
bool Push(Token& token, T&& value,
engine::Deadline deadline) {
367 return !engine::current_task::ShouldCancel() &&
368 remaining_capacity_.try_lock_shared_until(deadline) &&
369 DoPush(token, std::move(value));
372 template <
typename Token>
373 [[nodiscard]]
bool PushNoblock(Token& token, T&& value) {
374 return remaining_capacity_.try_lock_shared() &&
375 DoPush(token, std::move(value));
378 void OnElementPopped() { remaining_capacity_.unlock_shared(); }
380 void StopBlockingOnPush() {
381 remaining_capacity_control_.SetCapacityOverride(0);
384 void ResumeBlockingOnPush() {
385 remaining_capacity_control_.RemoveCapacityOverride();
389 remaining_capacity_control_.SetCapacity(count);
393 return remaining_capacity_control_.GetCapacity();
397 template <
typename Token>
398 [[nodiscard]]
bool DoPush(Token& token, T&& value) {
399 if (queue_.NoMoreConsumers()) {
400 remaining_capacity_.unlock_shared();
404 queue_.DoPush(token, std::move(value));
410 concurrent::impl::SemaphoreCapacityControl remaining_capacity_control_;
414template <
typename T,
bool MP,
bool MC>
420 [[nodiscard]]
bool Pop(ConsumerToken& token, T& value,
422 while (!DoPop(token, value)) {
423 if (queue_.NoMoreProducers() ||
424 !nonempty_event_.WaitForEventUntil(deadline)) {
428 return DoPop(token, value);
434 [[nodiscard]]
bool PopNoblock(ConsumerToken& token, T& value) {
435 return DoPop(token, value);
438 void OnElementPushed() {
440 nonempty_event_.Send();
443 void StopBlockingOnPop() { nonempty_event_.Send(); }
445 void ResumeBlockingOnPop() {}
447 std::size_t GetSize()
const {
return size_; }
450 [[nodiscard]]
bool DoPop(ConsumerToken& token, T& value) {
451 if (queue_.DoPop(token, value)) {
453 nonempty_event_.Reset();
461 std::atomic<std::size_t> size_;
465template <
typename T,
bool MP,
bool MC>
469 : queue_(queue), size_(kUnbounded), size_control_(size_) {
470 const bool success = size_.try_lock_shared_count(kUnbounded);
477 [[nodiscard]]
bool Pop(ConsumerToken& token, T& value,
479 return size_.try_lock_shared_until(deadline) && DoPop(token, value);
482 [[nodiscard]]
bool PopNoblock(ConsumerToken& token, T& value) {
483 return size_.try_lock_shared() && DoPop(token, value);
486 void OnElementPushed() { size_.unlock_shared(); }
488 void StopBlockingOnPop() {
489 size_control_.SetCapacityOverride(kUnbounded + kSemaphoreUnlockValue);
492 void ResumeBlockingOnPop() { size_control_.RemoveCapacityOverride(); }
494 std::size_t GetSize()
const {
495 std::size_t cur_size = size_.RemainingApprox();
496 if (cur_size < kUnbounded) {
498 }
else if (cur_size <= kSemaphoreUnlockValue) {
501 return cur_size - kSemaphoreUnlockValue;
505 [[nodiscard]]
bool DoPop(ConsumerToken& token, T& value) {
507 if (queue_.DoPop(token, value))
return true;
508 if (queue_.NoMoreProducers()) {
509 size_.unlock_shared();
519 concurrent::impl::SemaphoreCapacityControl size_control_;