10#include <userver/concurrent/impl/intrusive_mpsc_queue.hpp>
11#include <userver/concurrent/impl/semaphore_capacity_control.hpp>
12#include <userver/concurrent/queue_helpers.hpp>
13#include <userver/engine/deadline.hpp>
14#include <userver/engine/semaphore.hpp>
15#include <userver/engine/single_consumer_event.hpp>
16#include <userver/engine/task/cancel.hpp>
17#include <userver/utils/assert.hpp>
19USERVER_NAMESPACE_BEGIN
26struct MpscQueueNode
final :
public SinglyLinkedBaseHook {
27 explicit MpscQueueNode(T&& value) : value(std::move(value)) {}
44class MpscQueue
final :
public std::enable_shared_from_this<MpscQueue<T>> {
45 struct EmplaceEnabler
final {
47 explicit EmplaceEnabler() =
default;
50 using Node = impl::MpscQueueNode<T>;
52 using ProducerToken = impl::NoToken;
53 using ConsumerToken = impl::NoToken;
55 friend class Producer<MpscQueue, ProducerToken, EmplaceEnabler>;
56 friend class Consumer<MpscQueue, ConsumerToken, EmplaceEnabler>;
59 static constexpr std::size_t kUnbounded = std::numeric_limits<std::size_t>::max();
63 using Producer =
concurrent::Producer<MpscQueue, ProducerToken, EmplaceEnabler>;
64 using Consumer =
concurrent::Consumer<MpscQueue, ConsumerToken, EmplaceEnabler>;
65 using MultiProducer =
concurrent::Producer<MpscQueue, impl::NoToken, EmplaceEnabler>;
69 explicit MpscQueue(std::size_t max_size, EmplaceEnabler )
70 : remaining_capacity_(max_size), remaining_capacity_control_(remaining_capacity_) {}
72 MpscQueue(MpscQueue&&) =
delete;
73 MpscQueue(
const MpscQueue&) =
delete;
74 MpscQueue& operator=(MpscQueue&&) =
delete;
75 MpscQueue& operator=(
const MpscQueue&) =
delete;
80 static std::shared_ptr<MpscQueue>
Create(std::size_t max_size = kUnbounded) {
81 return std::make_shared<MpscQueue>(max_size, EmplaceEnabler{});
118 bool Push(ProducerToken&, T&&, engine::Deadline);
119 bool PushNoblock(ProducerToken&, T&&);
120 bool DoPush(ProducerToken&, T&&);
122 bool Pop(ConsumerToken&, T&, engine::Deadline);
123 bool PopNoblock(ConsumerToken&, T&);
124 bool DoPop(ConsumerToken&, T&);
126 void MarkConsumerIsDead();
127 void MarkProducerIsDead();
129 bool NoMoreProducers()
const {
return producer_is_created_ && producers_count_ == 0; }
130 bool NoMoreConsumers()
const {
return consumer_is_created_and_dead_; }
132 impl::IntrusiveMpscQueue<Node> queue_{};
133 engine::SingleConsumerEvent nonempty_event_{};
134 engine::CancellableSemaphore remaining_capacity_;
135 impl::SemaphoreCapacityControl remaining_capacity_control_;
136 std::atomic<
bool> consumer_is_created_{
false};
137 std::atomic<
bool> consumer_is_created_and_dead_{
false};
138 std::atomic<
bool> producer_is_created_{
false};
139 std::atomic<size_t> producers_count_{0};
140 std::atomic<size_t> size_{0};
144MpscQueue<T>::~MpscQueue() {
145 UASSERT(consumer_is_created_and_dead_ || !consumer_is_created_);
148 while (
const auto node = std::unique_ptr<Node>{queue_.TryPopBlocking()}) {
149 remaining_capacity_.unlock_shared();
156 producer_is_created_ =
true;
157 nonempty_event_.Send();
158 return Producer(
this->shared_from_this(), EmplaceEnabler{});
170 UINVARIANT(!consumer_is_created_,
"MpscQueue::Consumer must only be obtained a single time");
171 consumer_is_created_ =
true;
172 return Consumer(
this->shared_from_this(), EmplaceEnabler{});
177 remaining_capacity_control_.SetCapacity(max_size);
182 return remaining_capacity_control_.GetCapacity();
191bool MpscQueue<T>::Push(ProducerToken& token, T&& value, engine::Deadline deadline) {
192 return remaining_capacity_.try_lock_shared_until(deadline) && DoPush(token, std::move(value));
196bool MpscQueue<T>::PushNoblock(ProducerToken& token, T&& value) {
197 return remaining_capacity_.try_lock_shared() && DoPush(token, std::move(value));
201bool MpscQueue<T>::DoPush(ProducerToken& , T&& value) {
202 if (NoMoreConsumers()) {
203 remaining_capacity_.unlock_shared();
207 auto node = std::make_unique<Node>(std::move(value));
209 (
void)node.release();
212 nonempty_event_.Send();
218bool MpscQueue<T>::Pop(ConsumerToken& token, T& value, engine::Deadline deadline) {
219 bool no_more_producers =
false;
220 const bool success = nonempty_event_.WaitUntil(deadline, [&] {
221 if (DoPop(token, value)) {
224 if (NoMoreProducers()) {
228 if (!DoPop(token, value)) {
229 no_more_producers =
true;
235 return success && !no_more_producers;
239bool MpscQueue<T>::PopNoblock(ConsumerToken& token, T& value) {
240 return DoPop(token, value);
244bool MpscQueue<T>::DoPop(ConsumerToken& , T& value) {
245 if (
const auto node = std::unique_ptr<Node>{queue_.TryPopWeak()}) {
246 value = std::move(node->value);
249 remaining_capacity_.unlock_shared();
250 nonempty_event_.Reset();
257void MpscQueue<T>::MarkConsumerIsDead() {
258 consumer_is_created_and_dead_ =
true;
259 remaining_capacity_control_.SetCapacityOverride(0);
263void MpscQueue<T>::MarkProducerIsDead() {
264 if (--producers_count_ == 0) {
265 nonempty_event_.Send();