10#include <boost/lockfree/queue.hpp>
12#include <userver/concurrent/impl/semaphore_capacity_control.hpp>
13#include <userver/concurrent/queue_helpers.hpp>
14#include <userver/engine/deadline.hpp>
15#include <userver/engine/semaphore.hpp>
16#include <userver/engine/single_consumer_event.hpp>
17#include <userver/engine/task/cancel.hpp>
18#include <userver/utils/assert.hpp>
20USERVER_NAMESPACE_BEGIN
29 using LockFreeQueue = boost::lockfree::queue<T>;
31 static void Push(LockFreeQueue& queue, T&& value) {
32 [[maybe_unused]]
bool push_result = queue.push(std::move(value));
36 [[nodiscard]]
static bool Pop(LockFreeQueue& queue, T& value) {
37 return queue.pop(value);
40 static_assert(std::is_trivially_destructible_v<T>,
41 "T has non-trivial destructor. Use "
42 "MpscQueue<std::unique_ptr<T>> instead of MpscQueue<T>");
47struct QueueHelper<std::unique_ptr<T>> {
48 using LockFreeQueue = boost::lockfree::queue<T*>;
50 static void Push(LockFreeQueue& queue, std::unique_ptr<T>&& value) {
51 QueueHelper<T*>::Push(queue, value.release());
54 [[nodiscard]]
static bool Pop(LockFreeQueue& queue,
55 std::unique_ptr<T>& value) {
57 if (!QueueHelper<T*>::Pop(queue, ptr))
return false;
75class MpscQueue
final :
public std::enable_shared_from_this<MpscQueue<T>> {
76 struct EmplaceEnabler
final {
78 explicit EmplaceEnabler() =
default;
81 using QueueHelper = impl::QueueHelper<T>;
83 using ProducerToken = impl::NoToken;
84 using ConsumerToken = impl::NoToken;
86 friend class Producer<MpscQueue, ProducerToken, EmplaceEnabler>;
87 friend class Consumer<MpscQueue, ConsumerToken, EmplaceEnabler>;
90 static constexpr std::size_t kUnbounded =
91 std::numeric_limits<std::size_t>::max();
96 concurrent::Producer<MpscQueue, ProducerToken, EmplaceEnabler>;
98 concurrent::Consumer<MpscQueue, ConsumerToken, EmplaceEnabler>;
100 concurrent::Producer<MpscQueue, impl::NoToken, EmplaceEnabler>;
104 explicit MpscQueue(std::size_t max_size, EmplaceEnabler )
105 : remaining_capacity_(max_size),
106 remaining_capacity_control_(remaining_capacity_) {}
108 MpscQueue(MpscQueue&&) =
delete;
109 MpscQueue(
const MpscQueue&) =
delete;
110 MpscQueue& operator=(MpscQueue&&) =
delete;
111 MpscQueue& operator=(
const MpscQueue&) =
delete;
116 static std::shared_ptr<MpscQueue>
Create(std::size_t max_size = kUnbounded) {
117 return std::make_shared<MpscQueue>(max_size, EmplaceEnabler{});
154 bool Push(ProducerToken&, T&&, engine::Deadline);
155 bool PushNoblock(ProducerToken&, T&&);
156 bool DoPush(ProducerToken&, T&&);
158 bool Pop(ConsumerToken&, T&, engine::Deadline);
159 bool PopNoblock(ConsumerToken&, T&);
160 bool DoPop(ConsumerToken&, T&);
162 void MarkConsumerIsDead();
163 void MarkProducerIsDead();
167 typename QueueHelper::LockFreeQueue queue_{1};
168 engine::SingleConsumerEvent nonempty_event_;
169 engine::CancellableSemaphore remaining_capacity_;
170 concurrent::impl::SemaphoreCapacityControl remaining_capacity_control_;
171 std::atomic<
bool> consumer_is_created_{
false};
172 std::atomic<
bool> consumer_is_created_and_dead_{
false};
173 std::atomic<
bool> producer_is_created_and_dead_{
false};
174 std::atomic<size_t> producers_count_{0};
175 std::atomic<size_t> size_{0};
179MpscQueue<T>::~MpscQueue() {
180 UASSERT(consumer_is_created_and_dead_ || !consumer_is_created_);
184 ConsumerToken temp_token{queue_};
185 while (PopNoblock(temp_token, value)) {
192 producer_is_created_and_dead_ =
false;
193 nonempty_event_.Send();
194 return Producer(
this->shared_from_this(), EmplaceEnabler{});
207 "MpscQueue::Consumer must only be obtained a single time");
208 consumer_is_created_ =
true;
209 return Consumer(
this->shared_from_this(), EmplaceEnabler{});
214 remaining_capacity_control_.SetCapacity(max_size);
219 return remaining_capacity_control_.GetCapacity();
228bool MpscQueue<T>::Push(ProducerToken& token, T&& value,
229 engine::Deadline deadline) {
230 return remaining_capacity_.try_lock_shared_until(deadline) &&
231 DoPush(token, std::move(value));
235bool MpscQueue<T>::PushNoblock(ProducerToken& token, T&& value) {
236 return remaining_capacity_.try_lock_shared() &&
237 DoPush(token, std::move(value));
241bool MpscQueue<T>::DoPush(ProducerToken& , T&& value) {
242 if (consumer_is_created_and_dead_) {
243 remaining_capacity_.unlock_shared();
247 QueueHelper::Push(queue_, std::move(value));
249 nonempty_event_.Send();
255bool MpscQueue<T>::Pop(ConsumerToken& token, T& value,
256 engine::Deadline deadline) {
257 while (!DoPop(token, value)) {
258 if (producer_is_created_and_dead_ ||
259 !nonempty_event_.WaitForEventUntil(deadline)) {
263 return DoPop(token, value);
270bool MpscQueue<T>::PopNoblock(ConsumerToken& token, T& value) {
271 return DoPop(token, value);
275bool MpscQueue<T>::DoPop(ConsumerToken& , T& value) {
276 if (QueueHelper::Pop(queue_, value)) {
278 remaining_capacity_.unlock_shared();
279 nonempty_event_.Reset();
286void MpscQueue<T>::MarkConsumerIsDead() {
287 consumer_is_created_and_dead_ =
true;
288 remaining_capacity_control_.SetCapacityOverride(0);
292void MpscQueue<T>::MarkProducerIsDead() {
293 producer_is_created_and_dead_ = (--producers_count_ == 0);
294 nonempty_event_.Send();