122 bool Push(ProducerToken&, T&&,
engine::Deadline);
123 bool PushNoblock(ProducerToken&, T&&);
124 bool DoPush(ProducerToken&, T&&);
126 bool Pop(ConsumerToken&, T&,
engine::Deadline);
127 bool PopNoblock(ConsumerToken&, T&);
128 bool DoPop(ConsumerToken&, T&, impl::IntrusiveMpscQueueImpl::PopMode);
130 void MarkConsumerIsDead();
131 void MarkProducerIsDead();
133 bool NoMoreProducers()
const {
return producer_is_created_ && producers_count_ == 0; }
134 bool NoMoreConsumers()
const {
return consumer_is_created_and_dead_; }
136 impl::IntrusiveMpscQueue<Node> queue_{};
137 engine::SingleConsumerEvent nonempty_event_
{};
138 engine::CancellableSemaphore remaining_capacity_;
139 impl::SemaphoreCapacityControl remaining_capacity_control_;
140 std::atomic<
bool> consumer_is_created_{
false};
141 std::atomic<
bool> consumer_is_created_and_dead_{
false};
142 std::atomic<
bool> producer_is_created_{
false};
143 std::atomic<size_t> producers_count_{0};
144 std::atomic<size_t> size_{0};
149MpscQueue<T>::~MpscQueue() {
150 UASSERT(consumer_is_created_and_dead_ || !consumer_is_created_);
153 while (
const auto node = std::unique_ptr<Node>{queue_.TryPopBlocking()}) {
162 producer_is_created_ =
true;
164 return Producer(
this->shared_from_this(), EmplaceEnabler{});
176 UINVARIANT(!consumer_is_created_,
"MpscQueue::Consumer must only be obtained a single time");
177 consumer_is_created_ =
true;
178 return Consumer(
this->shared_from_this(), EmplaceEnabler{});
183 remaining_capacity_control_.SetCapacity(max_size);
188 return remaining_capacity_control_.GetCapacity();
197bool MpscQueue<T>::Push(ProducerToken& token, T&& value,
engine::Deadline deadline) {
198 return remaining_capacity_.try_lock_shared_until(deadline) && DoPush(token, std::move(value));
202bool MpscQueue<T>::PushNoblock(ProducerToken& token, T&& value) {
203 return remaining_capacity_.try_lock_shared() && DoPush(token, std::move(value));
207bool MpscQueue<T>::DoPush(ProducerToken& , T&& value) {
208 if (NoMoreConsumers()) {
213 auto node = std::make_unique<Node>(std::move(value));
215 (
void)node.release();
224bool MpscQueue<T>::Pop(ConsumerToken& token, T& value,
engine::Deadline deadline) {
225 bool no_more_producers =
false;
226 const bool success = nonempty_event_.WaitUntil(deadline, [&] {
229 if (DoPop(token, value, impl::IntrusiveMpscQueueImpl::PopMode::kWeak)) {
232 if (NoMoreProducers()) {
236 if (!DoPop(token, value, impl::IntrusiveMpscQueueImpl::PopMode::kRarelyBlocking)) {
237 no_more_producers =
true;
243 return success && !no_more_producers;
247bool MpscQueue<T>::PopNoblock(ConsumerToken& token, T& value) {
254 return DoPop(token, value, impl::IntrusiveMpscQueueImpl::PopMode::kRarelyBlocking);
258bool MpscQueue<T>::DoPop(ConsumerToken& , T& value, impl::IntrusiveMpscQueueImpl::PopMode pop_mode) {
259 if (
const auto node = std::unique_ptr<Node>{queue_.TryPop(pop_mode)}) {
260 value = std::move(node->value);
271void MpscQueue<T>::MarkConsumerIsDead() {
272 consumer_is_created_and_dead_ =
true;
273 remaining_capacity_control_.SetCapacityOverride(0);
277void MpscQueue<T>::MarkProducerIsDead() {
278 if (--producers_count_ == 0) {