5#include <userver/engine/deadline.hpp>
6#include <userver/engine/task/current_task.hpp>
7#include <userver/utils/assert.hpp>
16 template <
typename LockFreeQueue>
17 explicit NoToken(LockFreeQueue& ) {}
20struct MultiToken
final {
21 template <
typename LockFreeQueue>
22 explicit MultiToken(LockFreeQueue& ) {}
32template <
typename QueueType,
typename ProducerToken,
typename EmplaceEnablerType>
35 std::is_same_v<EmplaceEnablerType,
typename QueueType::EmplaceEnabler>,
36 "Do not instantiate Producer on your own. Use Producer type alias "
40 using ValueType =
typename QueueType::ValueType;
43 Producer(
const Producer&) =
delete;
44 Producer(Producer&&)
noexcept =
default;
45 Producer& operator=(
const Producer&) =
delete;
46 Producer& operator=(Producer&& other)
noexcept {
47 queue_.swap(other.queue_);
48 std::swap(token_, other.token_);
53 if (queue_) queue_->MarkProducerIsDead();
60 [[nodiscard]]
bool Push(ValueType&& value, engine::Deadline deadline
= {})
const {
61 UASSERT_MSG(queue_,
"Trying to use a moved-from queue Producer");
63 return queue_->Push(token_, std::move(value), deadline);
70 [[nodiscard]]
bool PushNoblock(ValueType&& value)
const {
71 UASSERT_MSG(queue_,
"Trying to use a moved-from queue Producer");
72 return queue_->PushNoblock(token_, std::move(value));
75 void Reset() &&
noexcept {
76 if (queue_) queue_->MarkProducerIsDead();
78 [[maybe_unused]] ProducerToken for_destruction = std::move(token_);
82 [[nodiscard]] std::shared_ptr<
const QueueType>
Queue()
const {
return {queue_}; }
86 Producer(std::shared_ptr<QueueType> queue, EmplaceEnablerType )
87 : queue_(std::move(queue)), token_(queue_->queue_) {}
91 std::shared_ptr<QueueType> queue_;
92 mutable ProducerToken token_;
100template <
typename QueueType,
typename ConsumerToken,
typename EmplaceEnablerType>
101class Consumer
final {
103 std::is_same_v<EmplaceEnablerType,
typename QueueType::EmplaceEnabler>,
104 "Do not instantiate Consumer on your own. Use Consumer type alias "
108 using ValueType =
typename QueueType::ValueType;
111 Consumer(
const Consumer&) =
delete;
112 Consumer(Consumer&&)
noexcept =
default;
113 Consumer& operator=(
const Consumer&) =
delete;
114 Consumer& operator=(Consumer&& other)
noexcept {
115 queue_.swap(other.queue_);
116 std::swap(token_, other.token_);
121 if (queue_) queue_->MarkConsumerIsDead();
132 [[nodiscard]]
bool Pop(ValueType& value, engine::Deadline deadline
= {})
const {
133 UASSERT_MSG(queue_,
"Trying to use a moved-from queue Consumer");
135 return queue_->Pop(token_, value, deadline);
142 [[nodiscard]]
bool PopNoblock(ValueType& value)
const {
143 UASSERT_MSG(queue_,
"Trying to use a moved-from queue Consumer");
144 return queue_->PopNoblock(token_, value);
148 if (queue_) queue_->MarkConsumerIsDead();
150 [[maybe_unused]] ConsumerToken for_destruction = std::move(token_);
154 [[nodiscard]] std::shared_ptr<
const QueueType>
Queue()
const {
return {queue_}; }
158 Consumer(std::shared_ptr<QueueType> queue, EmplaceEnablerType )
159 : queue_(std::move(queue)), token_(queue_->queue_) {}
163 std::shared_ptr<QueueType> queue_{};
164 mutable ConsumerToken token_;