10#include <boost/lockfree/queue.hpp> 
   12#include <userver/concurrent/impl/semaphore_capacity_control.hpp> 
   13#include <userver/concurrent/queue_helpers.hpp> 
   14#include <userver/engine/deadline.hpp> 
   15#include <userver/engine/semaphore.hpp> 
   16#include <userver/engine/single_consumer_event.hpp> 
   17#include <userver/engine/task/cancel.hpp> 
   18#include <userver/utils/assert.hpp> 
   20USERVER_NAMESPACE_BEGIN
 
   29  using LockFreeQueue = boost::lockfree::queue<T>;
 
   31  static void Push(LockFreeQueue& queue, T&& value) {
 
   32    [[maybe_unused]] 
bool push_result = queue.push(std::move(value));
 
   36  [[nodiscard]] 
static bool Pop(LockFreeQueue& queue, T& value) {
 
   37    return queue.pop(value);
 
   40  static_assert(std::is_trivially_destructible_v<T>,
 
   41                "T has non-trivial destructor. Use " 
   42                "MpscQueue<std::unique_ptr<T>> instead of MpscQueue<T>");
 
   47struct QueueHelper<std::unique_ptr<T>> {
 
   48  using LockFreeQueue = boost::lockfree::queue<T*>;
 
   50  static void Push(LockFreeQueue& queue, std::unique_ptr<T>&& value) {
 
   51    QueueHelper<T*>::Push(queue, value.release());
 
   54  [[nodiscard]] 
static bool Pop(LockFreeQueue& queue,
 
   55                                std::unique_ptr<T>& value) {
 
   57    if (!QueueHelper<T*>::Pop(queue, ptr)) 
return false;
 
   75class MpscQueue 
final : 
public std::enable_shared_from_this<MpscQueue<T>> {
 
   76  struct EmplaceEnabler 
final {
 
   78    explicit EmplaceEnabler() = 
default;
 
   81  using QueueHelper = impl::QueueHelper<T>;
 
   83  using ProducerToken = impl::NoToken;
 
   84  using ConsumerToken = impl::NoToken;
 
   86  friend class Producer<MpscQueue, ProducerToken, EmplaceEnabler>;
 
   87  friend class Consumer<MpscQueue, ConsumerToken, EmplaceEnabler>;
 
   90  static constexpr std::size_t kUnbounded =
 
   91      std::numeric_limits<std::size_t>::max();
 
   96      concurrent::Producer<MpscQueue, ProducerToken, EmplaceEnabler>;
 
   98      concurrent::Consumer<MpscQueue, ConsumerToken, EmplaceEnabler>;
 
  100      concurrent::Producer<MpscQueue, impl::NoToken, EmplaceEnabler>;
 
  104  explicit MpscQueue(std::size_t max_size, EmplaceEnabler )
 
  105      : remaining_capacity_(max_size),
 
  106        remaining_capacity_control_(remaining_capacity_) {}
 
  108  MpscQueue(MpscQueue&&) = 
delete;
 
  109  MpscQueue(
const MpscQueue&) = 
delete;
 
  110  MpscQueue& operator=(MpscQueue&&) = 
delete;
 
  111  MpscQueue& operator=(
const MpscQueue&) = 
delete;
 
  116  static std::shared_ptr<MpscQueue> 
Create(std::size_t max_size = kUnbounded) {
 
  117    return std::make_shared<MpscQueue>(max_size, EmplaceEnabler{});
 
  154  bool Push(ProducerToken&, T&&, engine::Deadline);
 
  155  bool PushNoblock(ProducerToken&, T&&);
 
  156  bool DoPush(ProducerToken&, T&&);
 
  158  bool Pop(ConsumerToken&, T&, engine::Deadline);
 
  159  bool PopNoblock(ConsumerToken&, T&);
 
  160  bool DoPop(ConsumerToken&, T&);
 
  162  void MarkConsumerIsDead();
 
  163  void MarkProducerIsDead();
 
  167  typename QueueHelper::LockFreeQueue queue_{1};
 
  168  engine::SingleConsumerEvent nonempty_event_;
 
  169  engine::CancellableSemaphore remaining_capacity_;
 
  170  concurrent::impl::SemaphoreCapacityControl remaining_capacity_control_;
 
  171  std::atomic<
bool> consumer_is_created_{
false};
 
  172  std::atomic<
bool> consumer_is_created_and_dead_{
false};
 
  173  std::atomic<
bool> producer_is_created_and_dead_{
false};
 
  174  std::atomic<size_t> producers_count_{0};
 
  175  std::atomic<size_t> size_{0};
 
  179MpscQueue<T>::~MpscQueue() {
 
  180  UASSERT(consumer_is_created_and_dead_ || !consumer_is_created_);
 
  184  ConsumerToken temp_token{queue_};
 
  185  while (PopNoblock(temp_token, value)) {
 
  192  producer_is_created_and_dead_ = 
false;
 
  193  nonempty_event_.Send();
 
  194  return Producer(
this->shared_from_this(), EmplaceEnabler{});
 
  207             "MpscQueue::Consumer must only be obtained a single time");
 
  208  consumer_is_created_ = 
true;
 
  209  return Consumer(
this->shared_from_this(), EmplaceEnabler{});
 
  214  remaining_capacity_control_.SetCapacity(max_size);
 
  219  return remaining_capacity_control_.GetCapacity();
 
  228bool MpscQueue<T>::Push(ProducerToken& token, T&& value,
 
  229                        engine::Deadline deadline) {
 
  230  return remaining_capacity_.try_lock_shared_until(deadline) &&
 
  231         DoPush(token, std::move(value));
 
  235bool MpscQueue<T>::PushNoblock(ProducerToken& token, T&& value) {
 
  236  return remaining_capacity_.try_lock_shared() &&
 
  237         DoPush(token, std::move(value));
 
  241bool MpscQueue<T>::DoPush(ProducerToken& , T&& value) {
 
  242  if (consumer_is_created_and_dead_) {
 
  243    remaining_capacity_.unlock_shared();
 
  247  QueueHelper::Push(queue_, std::move(value));
 
  249  nonempty_event_.Send();
 
  255bool MpscQueue<T>::Pop(ConsumerToken& token, T& value,
 
  256                       engine::Deadline deadline) {
 
  257  while (!DoPop(token, value)) {
 
  258    if (producer_is_created_and_dead_ ||
 
  259        !nonempty_event_.WaitForEventUntil(deadline)) {
 
  263      return DoPop(token, value);
 
  270bool MpscQueue<T>::PopNoblock(ConsumerToken& token, T& value) {
 
  271  return DoPop(token, value);
 
  275bool MpscQueue<T>::DoPop(ConsumerToken& , T& value) {
 
  276  if (QueueHelper::Pop(queue_, value)) {
 
  278    remaining_capacity_.unlock_shared();
 
  279    nonempty_event_.Reset();
 
  286void MpscQueue<T>::MarkConsumerIsDead() {
 
  287  consumer_is_created_and_dead_ = 
true;
 
  288  remaining_capacity_control_.SetCapacityOverride(0);
 
  292void MpscQueue<T>::MarkProducerIsDead() {
 
  293  producer_is_created_and_dead_ = (--producers_count_ == 0);
 
  294  nonempty_event_.Send();