5#include <userver/engine/deadline.hpp>
6#include <userver/engine/task/current_task.hpp>
7#include <userver/utils/assert.hpp>
16 template <
typename LockFreeQueue>
17 explicit NoToken(LockFreeQueue& ) {}
20struct MultiToken
final {
21 template <
typename LockFreeQueue>
22 explicit MultiToken(LockFreeQueue& ) {}
29template <
typename QueueType,
typename ProducerToken,
typename EmplaceEnablerType>
32 std::is_same_v<EmplaceEnablerType,
typename QueueType::EmplaceEnabler>,
33 "Do not instantiate Producer on your own. Use Producer type alias "
37 using ValueType =
typename QueueType::ValueType;
40 Producer(
const Producer&) =
delete;
41 Producer(Producer&&)
noexcept =
default;
42 Producer& operator=(
const Producer&) =
delete;
43 Producer& operator=(Producer&& other)
noexcept {
44 queue_.swap(other.queue_);
45 std::swap(token_, other.token_);
50 if (queue_) queue_->MarkProducerIsDead();
57 [[nodiscard]]
bool Push(ValueType&& value, engine::Deadline deadline = {})
const {
58 UASSERT_MSG(queue_,
"Trying to use a moved-from queue Producer");
60 return queue_->Push(token_, std::move(value), deadline);
67 [[nodiscard]]
bool PushNoblock(ValueType&& value)
const {
68 UASSERT_MSG(queue_,
"Trying to use a moved-from queue Producer");
69 return queue_->PushNoblock(token_, std::move(value));
72 void Reset() &&
noexcept {
73 if (queue_) queue_->MarkProducerIsDead();
75 [[maybe_unused]] ProducerToken for_destruction = std::move(token_);
79 [[nodiscard]] std::shared_ptr<
const QueueType>
Queue()
const {
return {queue_}; }
83 Producer(std::shared_ptr<QueueType> queue, EmplaceEnablerType )
84 : queue_(std::move(queue)), token_(queue_->queue_) {}
88 std::shared_ptr<QueueType> queue_;
89 mutable ProducerToken token_;
94template <
typename QueueType,
typename ConsumerToken,
typename EmplaceEnablerType>
97 std::is_same_v<EmplaceEnablerType,
typename QueueType::EmplaceEnabler>,
98 "Do not instantiate Consumer on your own. Use Consumer type alias "
102 using ValueType =
typename QueueType::ValueType;
105 Consumer(
const Consumer&) =
delete;
106 Consumer(Consumer&&)
noexcept =
default;
107 Consumer& operator=(
const Consumer&) =
delete;
108 Consumer& operator=(Consumer&& other)
noexcept {
109 queue_.swap(other.queue_);
110 std::swap(token_, other.token_);
115 if (queue_) queue_->MarkConsumerIsDead();
127 [[nodiscard]]
bool Pop(ValueType& value, engine::Deadline deadline = {})
const {
128 UASSERT_MSG(queue_,
"Trying to use a moved-from queue Consumer");
130 return queue_->Pop(token_, value, deadline);
136 [[nodiscard]]
bool PopNoblock(ValueType& value)
const {
137 UASSERT_MSG(queue_,
"Trying to use a moved-from queue Consumer");
138 return queue_->PopNoblock(token_, value);
142 if (queue_) queue_->MarkConsumerIsDead();
144 [[maybe_unused]] ConsumerToken for_destruction = std::move(token_);
148 [[nodiscard]] std::shared_ptr<
const QueueType>
Queue()
const {
return {queue_}; }
152 Consumer(std::shared_ptr<QueueType> queue, EmplaceEnablerType )
153 : queue_(std::move(queue)), token_(queue_->queue_) {}
157 std::shared_ptr<QueueType> queue_{};
158 mutable ConsumerToken token_;