7#include <moodycamel/concurrentqueue.h> 
    9#include <userver/concurrent/impl/semaphore_capacity_control.hpp> 
   10#include <userver/concurrent/queue_helpers.hpp> 
   11#include <userver/engine/deadline.hpp> 
   12#include <userver/engine/semaphore.hpp> 
   13#include <userver/engine/single_consumer_event.hpp> 
   14#include <userver/engine/task/cancel.hpp> 
   15#include <userver/utils/assert.hpp> 
   16#include <userver/utils/atomic.hpp> 
   18USERVER_NAMESPACE_BEGIN
 
   24template <
bool MultipleProducer, 
bool MultipleConsumer>
 
   25struct SimpleQueuePolicy {
 
   27  static constexpr std::size_t GetElementSize(
const T&) {
 
   31  static constexpr bool kIsMultipleProducer{MultipleProducer};
 
   32  static constexpr bool kIsMultipleConsumer{MultipleConsumer};
 
   35template <
bool MultipleProducer, 
bool MultipleConsumer>
 
   36struct ContainerQueuePolicy {
 
   38  static std::size_t GetElementSize(
const T& value) {
 
   39    return std::size(value);
 
   42  static constexpr bool kIsMultipleProducer{MultipleProducer};
 
   43  static constexpr bool kIsMultipleConsumer{MultipleConsumer};
 
   51template <
typename T, 
typename QueuePolicy>
 
   52class GenericQueue 
final 
   53    : 
public std::enable_shared_from_this<GenericQueue<T, QueuePolicy>> {
 
   54  struct EmplaceEnabler 
final {
 
   56    explicit EmplaceEnabler() = 
default;
 
   60      std::conditional_t<QueuePolicy::kIsMultipleProducer,
 
   61                         moodycamel::ProducerToken, impl::NoToken>;
 
   63      std::conditional_t<QueuePolicy::kIsMultipleProducer,
 
   64                         moodycamel::ConsumerToken, impl::NoToken>;
 
   65  using MultiProducerToken = impl::MultiToken;
 
   66  using MultiConsumerToken =
 
   67      std::conditional_t<QueuePolicy::kIsMultipleProducer, impl::MultiToken,
 
   70  using SingleProducerToken =
 
   71      std::conditional_t<!QueuePolicy::kIsMultipleProducer,
 
   72                         moodycamel::ProducerToken, impl::NoToken>;
 
   74  friend class Producer<GenericQueue, ProducerToken, EmplaceEnabler>;
 
   75  friend class Producer<GenericQueue, MultiProducerToken, EmplaceEnabler>;
 
   76  friend class Consumer<GenericQueue, ConsumerToken, EmplaceEnabler>;
 
   77  friend class Consumer<GenericQueue, MultiConsumerToken, EmplaceEnabler>;
 
   83      concurrent::Producer<GenericQueue, ProducerToken, EmplaceEnabler>;
 
   85      concurrent::Consumer<GenericQueue, ConsumerToken, EmplaceEnabler>;
 
   87      concurrent::Producer<GenericQueue, MultiProducerToken, EmplaceEnabler>;
 
   89      concurrent::Consumer<GenericQueue, MultiConsumerToken, EmplaceEnabler>;
 
   91  static constexpr std::size_t kUnbounded =
 
   92      std::numeric_limits<std::size_t>::max() / 4;
 
   96  explicit GenericQueue(std::size_t max_size, EmplaceEnabler )
 
   98        single_producer_token_(queue_),
 
   99        producer_side_(*
this, std::min(max_size, kUnbounded)),
 
  100        consumer_side_(*
this) {}
 
  103    UASSERT(consumers_count_ == kCreatedAndDead || !consumers_count_);
 
  104    UASSERT(producers_count_ == kCreatedAndDead || !producers_count_);
 
  106    if (producers_count_ == kCreatedAndDead) {
 
  108      consumer_side_.ResumeBlockingOnPop();
 
  113    ConsumerToken token{queue_};
 
  114    while (consumer_side_.PopNoblock(token, value)) {
 
  118  GenericQueue(GenericQueue&&) = 
delete;
 
  119  GenericQueue(
const GenericQueue&) = 
delete;
 
  120  GenericQueue& operator=(GenericQueue&&) = 
delete;
 
  121  GenericQueue& operator=(
const GenericQueue&) = 
delete;
 
  125  static std::shared_ptr<GenericQueue> 
Create(
 
  126      std::size_t max_size = kUnbounded) {
 
  127    return std::make_shared<GenericQueue>(max_size, EmplaceEnabler{});
 
  138    return Producer(
this->shared_from_this(), EmplaceEnabler{});
 
  151    static_assert(QueuePolicy::kIsMultipleProducer,
 
  152                  "Trying to obtain MultiProducer for a single-producer queue");
 
  154    return MultiProducer(
this->shared_from_this(), EmplaceEnabler{});
 
  165    return Consumer(
this->shared_from_this(), EmplaceEnabler{});
 
  178    static_assert(QueuePolicy::kIsMultipleConsumer,
 
  179                  "Trying to obtain MultiConsumer for a single-consumer queue");
 
  181    return MultiConsumer(
this->shared_from_this(), EmplaceEnabler{});
 
  187    producer_side_.SetSoftMaxSize(std::min(max_size, kUnbounded));
 
  191  std::size_t 
GetSoftMaxSize() 
const { 
return producer_side_.GetSoftMaxSize(); }
 
  195    return producer_side_.GetSizeApproximate();
 
  199  class SingleProducerSide;
 
  200  class MultiProducerSide;
 
  201  class SingleConsumerSide;
 
  202  class MultiConsumerSide;
 
  207      std::conditional_t<QueuePolicy::kIsMultipleProducer, MultiProducerSide,
 
  213      std::conditional_t<QueuePolicy::kIsMultipleConsumer, MultiConsumerSide,
 
  216  template <
typename Token>
 
  217  [[nodiscard]] 
bool Push(Token& token, T&& value, engine::Deadline deadline) {
 
  218    return producer_side_.Push(token, std::move(value), deadline);
 
  221  template <
typename Token>
 
  222  [[nodiscard]] 
bool PushNoblock(Token& token, T&& value) {
 
  223    return producer_side_.PushNoblock(token, std::move(value));
 
  226  template <
typename Token>
 
  227  [[nodiscard]] 
bool Pop(Token& token, T& value, engine::Deadline deadline) {
 
  228    return consumer_side_.Pop(token, value, deadline);
 
  231  template <
typename Token>
 
  232  [[nodiscard]] 
bool PopNoblock(Token& token, T& value) {
 
  233    return consumer_side_.PopNoblock(token, value);
 
  236  void PrepareProducer() {
 
  237    std::size_t old_producers_count{};
 
  238    utils::AtomicUpdate(producers_count_, [&](
auto old_value) {
 
  239      old_producers_count = old_value;
 
  240      return old_value == kCreatedAndDead ? 1 : old_value + 1;
 
  243    if (old_producers_count == kCreatedAndDead) {
 
  244      consumer_side_.ResumeBlockingOnPop();
 
  246    UASSERT(QueuePolicy::kIsMultipleProducer || old_producers_count != 1);
 
  249  void PrepareConsumer() {
 
  250    std::size_t old_consumers_count{};
 
  251    utils::AtomicUpdate(consumers_count_, [&](
auto old_value) {
 
  252      old_consumers_count = old_value;
 
  253      return old_value == kCreatedAndDead ? 1 : old_value + 1;
 
  256    if (old_consumers_count == kCreatedAndDead) {
 
  257      producer_side_.ResumeBlockingOnPush();
 
  259    UASSERT(QueuePolicy::kIsMultipleConsumer || old_consumers_count != 1);
 
  262  void MarkConsumerIsDead() {
 
  263    const auto new_consumers_count =
 
  264        utils::AtomicUpdate(consumers_count_, [](
auto old_value) {
 
  265          return old_value == 1 ? kCreatedAndDead : old_value - 1;
 
  267    if (new_consumers_count == kCreatedAndDead) {
 
  268      producer_side_.StopBlockingOnPush();
 
  272  void MarkProducerIsDead() {
 
  273    const auto new_producers_count =
 
  274        utils::AtomicUpdate(producers_count_, [](
auto old_value) {
 
  275          return old_value == 1 ? kCreatedAndDead : old_value - 1;
 
  277    if (new_producers_count == kCreatedAndDead) {
 
  278      consumer_side_.StopBlockingOnPop();
 
  284  bool NoMoreConsumers() 
const { 
return consumers_count_ == kCreatedAndDead; }
 
  286  bool NoMoreProducers() 
const { 
return producers_count_ == kCreatedAndDead; }
 
  290  template <
typename Token>
 
  291  void DoPush(Token& token, T&& value) {
 
  292    if constexpr (std::is_same_v<Token, moodycamel::ProducerToken>) {
 
  293      static_assert(QueuePolicy::kIsMultipleProducer);
 
  294      queue_.enqueue(token, std::move(value));
 
  295    } 
else if constexpr (std::is_same_v<Token, MultiProducerToken>) {
 
  296      static_assert(QueuePolicy::kIsMultipleProducer);
 
  297      queue_.enqueue(std::move(value));
 
  299      static_assert(std::is_same_v<Token, impl::NoToken>);
 
  300      static_assert(!QueuePolicy::kIsMultipleProducer);
 
  301      queue_.enqueue(single_producer_token_, std::move(value));
 
  304    consumer_side_.OnElementPushed();
 
  307  template <
typename Token>
 
  308  [[nodiscard]] 
bool DoPop(Token& token, T& value) {
 
  311    if constexpr (std::is_same_v<Token, moodycamel::ConsumerToken>) {
 
  312      static_assert(QueuePolicy::kIsMultipleProducer);
 
  313      success = queue_.try_dequeue(token, value);
 
  314    } 
else if constexpr (std::is_same_v<Token, impl::MultiToken>) {
 
  315      static_assert(QueuePolicy::kIsMultipleProducer);
 
  316      success = queue_.try_dequeue(value);
 
  318      static_assert(std::is_same_v<Token, impl::NoToken>);
 
  319      static_assert(!QueuePolicy::kIsMultipleProducer);
 
  320      success = queue_.try_dequeue_from_producer(single_producer_token_, value);
 
  324      producer_side_.OnElementPopped(QueuePolicy::GetElementSize(value));
 
  331  moodycamel::ConcurrentQueue<T> queue_{1};
 
  332  std::atomic<std::size_t> consumers_count_{0};
 
  333  std::atomic<std::size_t> producers_count_{0};
 
  335  SingleProducerToken single_producer_token_;
 
  337  ProducerSide producer_side_;
 
  338  ConsumerSide consumer_side_;
 
  340  static constexpr std::size_t kCreatedAndDead =
 
  341      std::numeric_limits<std::size_t>::max();
 
  342  static constexpr std::size_t kSemaphoreUnlockValue =
 
  343      std::numeric_limits<std::size_t>::max() / 2;
 
  347template <
typename T, 
typename QueuePolicy>
 
  348class GenericQueue<T, QueuePolicy>::SingleProducerSide 
final {
 
  350  explicit SingleProducerSide(GenericQueue& queue, std::size_t capacity)
 
  351      : queue_(queue), used_capacity_(0), total_capacity_(capacity) {}
 
  355  template <
typename Token>
 
  356  [[nodiscard]] 
bool Push(Token& token, T&& value, engine::Deadline deadline) {
 
  357    if (DoPush(token, std::move(value))) {
 
  361    return non_full_event_.WaitForEventUntil(deadline) &&
 
  363           DoPush(token, std::move(value));
 
  366  template <
typename Token>
 
  367  [[nodiscard]] 
bool PushNoblock(Token& token, T&& value) {
 
  368    return DoPush(token, std::move(value));
 
  371  void OnElementPopped(std::size_t released_capacity) {
 
  372    used_capacity_.fetch_sub(released_capacity);
 
  373    non_full_event_.Send();
 
  376  void StopBlockingOnPush() {
 
  377    total_capacity_ += kSemaphoreUnlockValue;
 
  378    non_full_event_.Send();
 
  381  void ResumeBlockingOnPush() { total_capacity_ -= kSemaphoreUnlockValue; }
 
  383  void SetSoftMaxSize(std::size_t new_capacity) {
 
  384    const auto old_capacity = total_capacity_.exchange(new_capacity);
 
  385    if (new_capacity > old_capacity) non_full_event_.Send();
 
  388  std::size_t GetSoftMaxSize() 
const noexcept { 
return total_capacity_.load(); }
 
  390  std::size_t GetSizeApproximate() 
const noexcept {
 
  391    return used_capacity_.load();
 
  395  template <
typename Token>
 
  396  [[nodiscard]] 
bool DoPush(Token& token, T&& value) {
 
  397    const std::size_t value_size = QueuePolicy::GetElementSize(value);
 
  398    if (queue_.NoMoreConsumers() ||
 
  399        used_capacity_.load() + value_size > total_capacity_.load()) {
 
  403    used_capacity_.fetch_add(value_size);
 
  404    queue_.DoPush(token, std::move(value));
 
  405    non_full_event_.Reset();
 
  409  GenericQueue& queue_;
 
  410  engine::SingleConsumerEvent non_full_event_;
 
  411  std::atomic<std::size_t> used_capacity_;
 
  412  std::atomic<std::size_t> total_capacity_;
 
  416template <
typename T, 
typename QueuePolicy>
 
  417class GenericQueue<T, QueuePolicy>::MultiProducerSide 
final {
 
  419  explicit MultiProducerSide(GenericQueue& queue, std::size_t capacity)
 
  421        remaining_capacity_(capacity),
 
  422        remaining_capacity_control_(remaining_capacity_) {}
 
  426  template <
typename Token>
 
  427  [[nodiscard]] 
bool Push(Token& token, T&& value, engine::Deadline deadline) {
 
  428    const std::size_t value_size = QueuePolicy::GetElementSize(value);
 
  429    return remaining_capacity_.try_lock_shared_until_count(deadline,
 
  431           DoPush(token, std::move(value));
 
  434  template <
typename Token>
 
  435  [[nodiscard]] 
bool PushNoblock(Token& token, T&& value) {
 
  436    const std::size_t value_size = QueuePolicy::GetElementSize(value);
 
  437    return remaining_capacity_.try_lock_shared_count(value_size) &&
 
  438           DoPush(token, std::move(value));
 
  441  void OnElementPopped(std::size_t value_size) {
 
  442    remaining_capacity_.unlock_shared_count(value_size);
 
  445  void StopBlockingOnPush() {
 
  446    remaining_capacity_control_.SetCapacityOverride(0);
 
  449  void ResumeBlockingOnPush() {
 
  450    remaining_capacity_control_.RemoveCapacityOverride();
 
  453  void SetSoftMaxSize(std::size_t count) {
 
  454    remaining_capacity_control_.SetCapacity(count);
 
  457  std::size_t GetSizeApproximate() 
const noexcept {
 
  458    return remaining_capacity_.UsedApprox();
 
  461  std::size_t GetSoftMaxSize() 
const noexcept {
 
  462    return remaining_capacity_control_.GetCapacity();
 
  466  template <
typename Token>
 
  467  [[nodiscard]] 
bool DoPush(Token& token, T&& value) {
 
  468    const std::size_t value_size = QueuePolicy::GetElementSize(value);
 
  470    if (queue_.NoMoreConsumers()) {
 
  471      remaining_capacity_.unlock_shared_count(value_size);
 
  475    queue_.DoPush(token, std::move(value));
 
  479  GenericQueue& queue_;
 
  480  engine::CancellableSemaphore remaining_capacity_;
 
  481  concurrent::impl::SemaphoreCapacityControl remaining_capacity_control_;
 
  485template <
typename T, 
typename QueuePolicy>
 
  486class GenericQueue<T, QueuePolicy>::SingleConsumerSide 
final {
 
  488  explicit SingleConsumerSide(GenericQueue& queue)
 
  489      : queue_(queue), element_count_(0) {}
 
  492  template <
typename Token>
 
  493  [[nodiscard]] 
bool Pop(Token& token, T& value, engine::Deadline deadline) {
 
  494    while (!DoPop(token, value)) {
 
  495      if (queue_.NoMoreProducers() ||
 
  496          !nonempty_event_.WaitForEventUntil(deadline)) {
 
  500        return DoPop(token, value);
 
  506  template <
typename Token>
 
  507  [[nodiscard]] 
bool PopNoblock(Token& token, T& value) {
 
  508    return DoPop(token, value);
 
  511  void OnElementPushed() {
 
  513    nonempty_event_.Send();
 
  516  void StopBlockingOnPop() { nonempty_event_.Send(); }
 
  518  void ResumeBlockingOnPop() {}
 
  520  std::size_t GetElementCount() 
const { 
return element_count_; }
 
  523  template <
typename Token>
 
  524  [[nodiscard]] 
bool DoPop(Token& token, T& value) {
 
  525    if (queue_.DoPop(token, value)) {
 
  527      nonempty_event_.Reset();
 
  533  GenericQueue& queue_;
 
  534  engine::SingleConsumerEvent nonempty_event_;
 
  535  std::atomic<std::size_t> element_count_;
 
  539template <
typename T, 
typename QueuePolicy>
 
  540class GenericQueue<T, QueuePolicy>::MultiConsumerSide 
final {
 
  542  explicit MultiConsumerSide(GenericQueue& queue)
 
  544        element_count_(kUnbounded),
 
  545        element_count_control_(element_count_) {
 
  546    const bool success = element_count_.try_lock_shared_count(kUnbounded);
 
  550  ~MultiConsumerSide() { element_count_.unlock_shared_count(kUnbounded); }
 
  553  template <
typename Token>
 
  554  [[nodiscard]] 
bool Pop(Token& token, T& value, engine::Deadline deadline) {
 
  555    return element_count_.try_lock_shared_until(deadline) &&
 
  559  template <
typename Token>
 
  560  [[nodiscard]] 
bool PopNoblock(Token& token, T& value) {
 
  561    return element_count_.try_lock_shared() && DoPop(token, value);
 
  564  void OnElementPushed() { element_count_.unlock_shared(); }
 
  566  void StopBlockingOnPop() {
 
  567    element_count_control_.SetCapacityOverride(kUnbounded +
 
  568                                               kSemaphoreUnlockValue);
 
  571  void ResumeBlockingOnPop() {
 
  572    element_count_control_.RemoveCapacityOverride();
 
  575  std::size_t GetElementCount() 
const {
 
  576    const std::size_t cur_element_count = element_count_.RemainingApprox();
 
  577    if (cur_element_count < kUnbounded) {
 
  578      return cur_element_count;
 
  579    } 
else if (cur_element_count <= kSemaphoreUnlockValue) {
 
  582    return cur_element_count - kSemaphoreUnlockValue;
 
  586  template <
typename Token>
 
  587  [[nodiscard]] 
bool DoPop(Token& token, T& value) {
 
  589      if (queue_.DoPop(token, value)) {
 
  592      if (queue_.NoMoreProducers()) {
 
  593        element_count_.unlock_shared();
 
  601  GenericQueue& queue_;
 
  602  engine::CancellableSemaphore element_count_;
 
  603  concurrent::impl::SemaphoreCapacityControl element_count_control_;
 
  621using NonFifoMpmcQueue = GenericQueue<T, impl::SimpleQueuePolicy<
true, 
true>>;
 
  630using NonFifoMpscQueue = GenericQueue<T, impl::SimpleQueuePolicy<
true, 
false>>;
 
  638using SpmcQueue = GenericQueue<T, impl::SimpleQueuePolicy<
false, 
true>>;
 
  646using SpscQueue = GenericQueue<T, impl::SimpleQueuePolicy<
false, 
false>>;
 
  654using StringStreamQueue =
 
  655    GenericQueue<std::string, impl::ContainerQueuePolicy<
false, 
false>>;