5#include <userver/engine/deadline.hpp>
6#include <userver/engine/task/current_task.hpp>
7#include <userver/utils/assert.hpp>
16 template <
typename LockFreeQueue>
17 explicit NoToken(LockFreeQueue& ) {}
20struct MultiToken
final {
21 template <
typename LockFreeQueue>
22 explicit MultiToken(LockFreeQueue& ) {}
32template <
typename QueueType,
typename ProducerToken,
typename EmplaceEnablerType>
35 std::is_same_v<EmplaceEnablerType,
typename QueueType::EmplaceEnabler>,
36 "Do not instantiate Producer on your own. Use Producer type alias "
40 using ValueType =
typename QueueType::ValueType;
43 Producer(
const Producer&) =
delete;
44 Producer(Producer&&)
noexcept =
default;
45 Producer& operator=(
const Producer&) =
delete;
46 Producer& operator=(Producer&& other)
noexcept {
47 queue_.swap(other.queue_);
48 std::swap(token_, other.token_);
54 queue_->MarkProducerIsDead();
62 [[nodiscard]]
bool Push(ValueType&& value,
engine::Deadline deadline
= {})
const {
63 UASSERT_MSG(queue_,
"Trying to use a moved-from queue Producer");
65 return queue_->Push(token_, std::move(value), deadline);
72 [[nodiscard]]
bool PushNoblock(ValueType&& value)
const {
73 UASSERT_MSG(queue_,
"Trying to use a moved-from queue Producer");
74 return queue_->PushNoblock(token_, std::move(value));
77 void Reset() &&
noexcept {
79 queue_->MarkProducerIsDead();
82 [[maybe_unused]] ProducerToken for_destruction = std::move(token_);
86 [[nodiscard]] std::shared_ptr<
const QueueType>
Queue()
const {
return {queue_}; }
90 Producer(std::shared_ptr<QueueType> queue, EmplaceEnablerType )
91 : queue_(std::move(queue)),
92 token_(queue_->queue_)
97 std::shared_ptr<QueueType> queue_;
98 mutable ProducerToken token_;
106template <
typename QueueType,
typename ConsumerToken,
typename EmplaceEnablerType>
107class Consumer
final {
109 std::is_same_v<EmplaceEnablerType,
typename QueueType::EmplaceEnabler>,
110 "Do not instantiate Consumer on your own. Use Consumer type alias "
114 using ValueType =
typename QueueType::ValueType;
117 Consumer(
const Consumer&) =
delete;
118 Consumer(Consumer&&)
noexcept =
default;
119 Consumer& operator=(
const Consumer&) =
delete;
120 Consumer& operator=(Consumer&& other)
noexcept {
121 queue_.swap(other.queue_);
122 std::swap(token_, other.token_);
128 queue_->MarkConsumerIsDead();
140 [[nodiscard]]
bool Pop(ValueType& value,
engine::Deadline deadline
= {})
const {
141 UASSERT_MSG(queue_,
"Trying to use a moved-from queue Consumer");
143 return queue_->Pop(token_, value, deadline);
150 [[nodiscard]]
bool PopNoblock(ValueType& value)
const {
151 UASSERT_MSG(queue_,
"Trying to use a moved-from queue Consumer");
152 return queue_->PopNoblock(token_, value);
157 queue_->MarkConsumerIsDead();
160 [[maybe_unused]] ConsumerToken for_destruction = std::move(token_);
164 [[nodiscard]] std::shared_ptr<
const QueueType>
Queue()
const {
return {queue_}; }
168 Consumer(std::shared_ptr<QueueType> queue, EmplaceEnablerType )
169 : queue_(std::move(queue)),
170 token_(queue_->queue_)
175 std::shared_ptr<QueueType> queue_{};
176 mutable ConsumerToken token_;