118 bool Push(ProducerToken&, T&&, engine::Deadline);
119 bool PushNoblock(ProducerToken&, T&&);
120 bool DoPush(ProducerToken&, T&&);
122 bool Pop(ConsumerToken&, T&, engine::Deadline);
123 bool PopNoblock(ConsumerToken&, T&);
124 bool DoPop(ConsumerToken&, T&);
126 void MarkConsumerIsDead();
127 void MarkProducerIsDead();
129 bool NoMoreProducers()
const {
return producer_is_created_ && producers_count_ == 0; }
130 bool NoMoreConsumers()
const {
return consumer_is_created_and_dead_; }
132 impl::IntrusiveMpscQueue<Node> queue_{};
133 engine::SingleConsumerEvent nonempty_event_
{};
134 engine::CancellableSemaphore remaining_capacity_;
135 impl::SemaphoreCapacityControl remaining_capacity_control_;
136 std::atomic<
bool> consumer_is_created_{
false};
137 std::atomic<
bool> consumer_is_created_and_dead_{
false};
138 std::atomic<
bool> producer_is_created_{
false};
139 std::atomic<size_t> producers_count_{0};
140 std::atomic<size_t> size_{0};
145MpscQueue<T>::~MpscQueue() {
146 UASSERT(consumer_is_created_and_dead_ || !consumer_is_created_);
149 while (
const auto node = std::unique_ptr<Node>{queue_.TryPopBlocking()}) {
158 producer_is_created_ =
true;
159 nonempty_event_.Send();
160 return Producer(
this->shared_from_this(), EmplaceEnabler{});
172 UINVARIANT(!consumer_is_created_,
"MpscQueue::Consumer must only be obtained a single time");
173 consumer_is_created_ =
true;
174 return Consumer(
this->shared_from_this(), EmplaceEnabler{});
179 remaining_capacity_control_.SetCapacity(max_size);
184 return remaining_capacity_control_.GetCapacity();
193bool MpscQueue<T>::Push(ProducerToken& token, T&& value, engine::Deadline deadline) {
194 return remaining_capacity_.try_lock_shared_until(deadline) && DoPush(token, std::move(value));
198bool MpscQueue<T>::PushNoblock(ProducerToken& token, T&& value) {
199 return remaining_capacity_.try_lock_shared() && DoPush(token, std::move(value));
203bool MpscQueue<T>::DoPush(ProducerToken& , T&& value) {
204 if (NoMoreConsumers()) {
209 auto node = std::make_unique<Node>(std::move(value));
211 (
void)node.release();
214 nonempty_event_.Send();
220bool MpscQueue<T>::Pop(ConsumerToken& token, T& value, engine::Deadline deadline) {
221 bool no_more_producers =
false;
222 const bool success = nonempty_event_.WaitUntil(deadline, [&] {
223 if (DoPop(token, value)) {
226 if (NoMoreProducers()) {
230 if (!DoPop(token, value)) {
231 no_more_producers =
true;
237 return success && !no_more_producers;
241bool MpscQueue<T>::PopNoblock(ConsumerToken& token, T& value) {
242 return DoPop(token, value);
246bool MpscQueue<T>::DoPop(ConsumerToken& , T& value) {
247 if (
const auto node = std::unique_ptr<Node>{queue_.TryPopWeak()}) {
248 value = std::move(node->value);
252 nonempty_event_.Reset();
259void MpscQueue<T>::MarkConsumerIsDead() {
260 consumer_is_created_and_dead_ =
true;
261 remaining_capacity_control_.SetCapacityOverride(0);
265void MpscQueue<T>::MarkProducerIsDead() {
266 if (--producers_count_ == 0) {
267 nonempty_event_.Send();