Github   Telegram
Loading...
Searching...
No Matches
queue.hpp
1#pragma once
2
3#include <atomic>
4#include <limits>
5#include <memory>
6
7#include <moodycamel/concurrentqueue.h>
8
9#include <userver/concurrent/impl/semaphore_capacity_control.hpp>
10#include <userver/concurrent/queue_helpers.hpp>
17
18USERVER_NAMESPACE_BEGIN
19
20namespace concurrent {
21
22namespace impl {
23
24class MultiProducerToken final {
25 public:
26 template <typename T, typename Traits>
27 explicit MultiProducerToken(moodycamel::ConcurrentQueue<T, Traits>&) {}
28};
29
30} // namespace impl
31
35template <typename T, bool MultipleProducer, bool MultipleConsumer>
36class GenericQueue final
37 : public std::enable_shared_from_this<
38 GenericQueue<T, MultipleProducer, MultipleConsumer>> {
39 struct EmplaceEnabler final {
40 // Disable {}-initialization in Queue's constructor
41 explicit EmplaceEnabler() = default;
42 };
43
44 using ProducerToken =
45 std::conditional_t<MultipleProducer, moodycamel::ProducerToken,
46 impl::NoToken>;
47 using ConsumerToken =
48 std::conditional_t<MultipleProducer, moodycamel::ConsumerToken,
49 impl::NoToken>;
50 using MultiProducerToken = impl::MultiProducerToken;
51
52 using SingleProducerToken =
53 std::conditional_t<!MultipleProducer, moodycamel::ProducerToken,
54 impl::NoToken>;
55
56 friend class Producer<GenericQueue, ProducerToken, EmplaceEnabler>;
57 friend class Producer<GenericQueue, MultiProducerToken, EmplaceEnabler>;
58 friend class Consumer<GenericQueue, EmplaceEnabler>;
59
60 public:
61 using ValueType = T;
62
63 using Producer =
66 using MultiProducer =
68
69 static constexpr std::size_t kUnbounded =
70 std::numeric_limits<std::size_t>::max() / 4;
71
73 // For internal use only
74 explicit GenericQueue(std::size_t max_size, EmplaceEnabler /*unused*/)
75 : queue_(),
76 single_producer_token_(queue_),
77 producer_side_(*this, std::min(max_size, kUnbounded)),
78 consumer_side_(*this) {}
79
81 UASSERT(consumers_count_ == kCreatedAndDead || !consumers_count_);
82 UASSERT(producers_count_ == kCreatedAndDead || !producers_count_);
83
84 if (producers_count_ == kCreatedAndDead) {
85 // To allow reading the remaining items
86 consumer_side_.ResumeBlockingOnPop();
87 }
88
89 // Clear remaining items in queue
90 T value;
91 ConsumerToken token{queue_};
92 while (consumer_side_.PopNoblock(token, value)) {
93 }
94 }
95
96 GenericQueue(GenericQueue&&) = delete;
97 GenericQueue(const GenericQueue&) = delete;
98 GenericQueue& operator=(GenericQueue&&) = delete;
99 GenericQueue& operator=(const GenericQueue&) = delete;
101
103 static std::shared_ptr<GenericQueue> Create(
104 std::size_t max_size = kUnbounded) {
105 return std::make_shared<GenericQueue>(max_size, EmplaceEnabler{});
106 }
107
115 PrepareProducer();
116 return Producer(this->shared_from_this(), EmplaceEnabler{});
117 }
118
129 static_assert(MultipleProducer,
130 "Trying to obtain MultiProducer for a single-producer queue");
131 PrepareProducer();
132 return MultiProducer(this->shared_from_this(), EmplaceEnabler{});
133 }
134
142 std::size_t old_consumers_count{};
143 utils::AtomicUpdate(consumers_count_, [&](auto old_value) {
144 old_consumers_count = old_value;
145 return old_value == kCreatedAndDead ? 1 : old_value + 1;
146 });
147
148 if (old_consumers_count == kCreatedAndDead) {
149 producer_side_.ResumeBlockingOnPush();
150 }
151 UASSERT(MultipleConsumer || old_consumers_count != 1);
152 return Consumer(this->shared_from_this(), EmplaceEnabler{});
153 }
154
157 void SetSoftMaxSize(std::size_t max_size) {
158 producer_side_.SetSoftMaxSize(std::min(max_size, kUnbounded));
159 }
160
162 std::size_t GetSoftMaxSize() const { return producer_side_.GetSoftMaxSize(); }
163
165 std::size_t GetSizeApproximate() const { return consumer_side_.GetSize(); }
166
167 private:
168 class SingleProducerSide;
169 class MultiProducerSide;
170 class SingleConsumerSide;
171 class MultiConsumerSide;
172
175 using ProducerSide = std::conditional_t<MultipleProducer, MultiProducerSide,
176 SingleProducerSide>;
177
180 using ConsumerSide = std::conditional_t<MultipleConsumer, MultiConsumerSide,
181 SingleConsumerSide>;
182
183 template <typename Token>
184 [[nodiscard]] bool Push(Token& token, T&& value, engine::Deadline deadline) {
185 return producer_side_.Push(token, std::move(value), deadline);
186 }
187
188 template <typename Token>
189 [[nodiscard]] bool PushNoblock(Token& token, T&& value) {
190 return producer_side_.PushNoblock(token, std::move(value));
191 }
192
193 [[nodiscard]] bool Pop(ConsumerToken& token, T& value,
194 engine::Deadline deadline) {
195 return consumer_side_.Pop(token, value, deadline);
196 }
197
198 [[nodiscard]] bool PopNoblock(ConsumerToken& token, T& value) {
199 return consumer_side_.PopNoblock(token, value);
200 }
201
202 void PrepareProducer() {
203 std::size_t old_producers_count{};
204 utils::AtomicUpdate(producers_count_, [&](auto old_value) {
205 old_producers_count = old_value;
206 return old_value == kCreatedAndDead ? 1 : old_value + 1;
207 });
208
209 if (old_producers_count == kCreatedAndDead) {
210 consumer_side_.ResumeBlockingOnPop();
211 }
212 UASSERT(MultipleProducer || old_producers_count != 1);
213 }
214
215 void MarkConsumerIsDead() {
216 const auto new_consumers_count =
217 utils::AtomicUpdate(consumers_count_, [](auto old_value) {
218 return old_value == 1 ? kCreatedAndDead : old_value - 1;
219 });
220 if (new_consumers_count == kCreatedAndDead) {
221 producer_side_.StopBlockingOnPush();
222 }
223 }
224
225 void MarkProducerIsDead() {
226 const auto new_producers_count =
227 utils::AtomicUpdate(producers_count_, [](auto old_value) {
228 return old_value == 1 ? kCreatedAndDead : old_value - 1;
229 });
230 if (new_producers_count == kCreatedAndDead) {
231 consumer_side_.StopBlockingOnPop();
232 }
233 }
234
235 public: // TODO
236 bool NoMoreConsumers() const { return consumers_count_ == kCreatedAndDead; }
237
238 bool NoMoreProducers() const { return producers_count_ == kCreatedAndDead; }
239
240 private:
241 template <typename Token>
242 void DoPush(Token& token, T&& value) {
243 if constexpr (std::is_same_v<Token, moodycamel::ProducerToken>) {
244 static_assert(MultipleProducer);
245 queue_.enqueue(token, std::move(value));
246 } else if constexpr (std::is_same_v<Token, MultiProducerToken>) {
247 static_assert(MultipleProducer);
248 queue_.enqueue(std::move(value));
249 } else {
250 static_assert(std::is_same_v<Token, impl::NoToken>);
251 static_assert(!MultipleProducer);
252 queue_.enqueue(single_producer_token_, std::move(value));
253 }
254
255 consumer_side_.OnElementPushed();
256 }
257
258 [[nodiscard]] bool DoPop(ConsumerToken& token, T& value) {
259 bool success = false;
260 if constexpr (MultipleProducer) {
261 success = queue_.try_dequeue(token, value);
262 } else {
263 // Substitute with our single producer token
264 success = queue_.try_dequeue_from_producer(single_producer_token_, value);
265 }
266
267 if (success) {
268 producer_side_.OnElementPopped();
269 return true;
270 }
271
272 return false;
273 }
274
276 std::atomic<std::size_t> consumers_count_{0};
277 std::atomic<std::size_t> producers_count_{0};
278
279 SingleProducerToken single_producer_token_;
280
281 ProducerSide producer_side_;
282 ConsumerSide consumer_side_;
283
284 static constexpr std::size_t kCreatedAndDead =
285 std::numeric_limits<std::size_t>::max();
286 static constexpr std::size_t kSemaphoreUnlockValue =
287 std::numeric_limits<std::size_t>::max() / 2;
288};
289
290// Single-producer ProducerSide implementation
291template <typename T, bool MP, bool MC>
292class GenericQueue<T, MP, MC>::SingleProducerSide final {
293 public:
294 explicit SingleProducerSide(GenericQueue& queue, std::size_t capacity)
295 : queue_(queue), used_capacity_(0), total_capacity_(capacity) {}
296
297 // Blocks if there is a consumer to Pop the current value and task
298 // shouldn't cancel and queue if full
299 template <typename Token>
300 [[nodiscard]] bool Push(Token& token, T&& value, engine::Deadline deadline) {
301 if (DoPush(token, std::move(value))) {
302 return true;
303 }
304
305 return non_full_event_.WaitForEventUntil(deadline) &&
306 // NOLINTNEXTLINE(bugprone-use-after-move)
307 DoPush(token, std::move(value));
308 }
309
310 template <typename Token>
311 [[nodiscard]] bool PushNoblock(Token& token, T&& value) {
312 return DoPush(token, std::move(value));
313 }
314
315 void OnElementPopped() {
316 --used_capacity_;
317 non_full_event_.Send();
318 }
319
320 void StopBlockingOnPush() {
321 total_capacity_ += kSemaphoreUnlockValue;
322 non_full_event_.Send();
323 }
324
325 void ResumeBlockingOnPush() { total_capacity_ -= kSemaphoreUnlockValue; }
326
327 void SetSoftMaxSize(std::size_t new_capacity) {
328 const auto old_capacity = total_capacity_.exchange(new_capacity);
329 if (new_capacity > old_capacity) non_full_event_.Send();
330 }
331
332 std::size_t GetSoftMaxSize() const noexcept { return total_capacity_.load(); }
333
334 private:
335 template <typename Token>
336 [[nodiscard]] bool DoPush(Token& token, T&& value) {
337 if (queue_.NoMoreConsumers() ||
338 used_capacity_.load() >= total_capacity_.load()) {
339 return false;
340 }
341
342 ++used_capacity_;
343 queue_.DoPush(token, std::move(value));
344 non_full_event_.Reset();
345 return true;
346 }
347
348 GenericQueue& queue_;
349 engine::SingleConsumerEvent non_full_event_;
350 std::atomic<std::size_t> used_capacity_;
351 std::atomic<std::size_t> total_capacity_;
352};
353
354// Multi producer ProducerSide implementation
355template <typename T, bool MP, bool MC>
356class GenericQueue<T, MP, MC>::MultiProducerSide final {
357 public:
358 explicit MultiProducerSide(GenericQueue& queue, std::size_t capacity)
359 : queue_(queue),
360 remaining_capacity_(capacity),
361 remaining_capacity_control_(remaining_capacity_) {}
362
363 // Blocks if there is a consumer to Pop the current value and task
364 // shouldn't cancel and queue if full
365 template <typename Token>
366 [[nodiscard]] bool Push(Token& token, T&& value, engine::Deadline deadline) {
367 return !engine::current_task::ShouldCancel() &&
368 remaining_capacity_.try_lock_shared_until(deadline) &&
369 DoPush(token, std::move(value));
370 }
371
372 template <typename Token>
373 [[nodiscard]] bool PushNoblock(Token& token, T&& value) {
374 return remaining_capacity_.try_lock_shared() &&
375 DoPush(token, std::move(value));
376 }
377
378 void OnElementPopped() { remaining_capacity_.unlock_shared(); }
379
380 void StopBlockingOnPush() {
381 remaining_capacity_control_.SetCapacityOverride(0);
382 }
383
384 void ResumeBlockingOnPush() {
385 remaining_capacity_control_.RemoveCapacityOverride();
386 }
387
388 void SetSoftMaxSize(std::size_t count) {
389 remaining_capacity_control_.SetCapacity(count);
390 }
391
392 std::size_t GetSoftMaxSize() const noexcept {
393 return remaining_capacity_control_.GetCapacity();
394 }
395
396 private:
397 template <typename Token>
398 [[nodiscard]] bool DoPush(Token& token, T&& value) {
399 if (queue_.NoMoreConsumers()) {
400 remaining_capacity_.unlock_shared();
401 return false;
402 }
403
404 queue_.DoPush(token, std::move(value));
405 return true;
406 }
407
408 GenericQueue& queue_;
409 engine::Semaphore remaining_capacity_;
410 concurrent::impl::SemaphoreCapacityControl remaining_capacity_control_;
411};
412
413// Single consumer ConsumerSide implementation
414template <typename T, bool MP, bool MC>
415class GenericQueue<T, MP, MC>::SingleConsumerSide final {
416 public:
417 explicit SingleConsumerSide(GenericQueue& queue) : queue_(queue), size_(0) {}
418
419 // Blocks only if queue is empty
420 [[nodiscard]] bool Pop(ConsumerToken& token, T& value,
421 engine::Deadline deadline) {
422 while (!DoPop(token, value)) {
423 if (queue_.NoMoreProducers() ||
424 !nonempty_event_.WaitForEventUntil(deadline)) {
425 // Producer might have pushed something in queue between .pop()
426 // and !producer_is_created_and_dead_ check. Check twice to avoid
427 // TOCTOU.
428 return DoPop(token, value);
429 }
430 }
431 return true;
432 }
433
434 [[nodiscard]] bool PopNoblock(ConsumerToken& token, T& value) {
435 return DoPop(token, value);
436 }
437
438 void OnElementPushed() {
439 ++size_;
440 nonempty_event_.Send();
441 }
442
443 void StopBlockingOnPop() { nonempty_event_.Send(); }
444
445 void ResumeBlockingOnPop() {}
446
447 std::size_t GetSize() const { return size_; }
448
449 private:
450 [[nodiscard]] bool DoPop(ConsumerToken& token, T& value) {
451 if (queue_.DoPop(token, value)) {
452 --size_;
453 nonempty_event_.Reset();
454 return true;
455 }
456 return false;
457 }
458
459 GenericQueue& queue_;
460 engine::SingleConsumerEvent nonempty_event_;
461 std::atomic<std::size_t> size_;
462};
463
464// Multi consumer ConsumerSide implementation
465template <typename T, bool MP, bool MC>
466class GenericQueue<T, MP, MC>::MultiConsumerSide final {
467 public:
468 explicit MultiConsumerSide(GenericQueue& queue)
469 : queue_(queue), size_(kUnbounded), size_control_(size_) {
470 const bool success = size_.try_lock_shared_count(kUnbounded);
471 UASSERT(success);
472 }
473
474 ~MultiConsumerSide() { size_.unlock_shared_count(kUnbounded); }
475
476 // Blocks only if queue is empty
477 [[nodiscard]] bool Pop(ConsumerToken& token, T& value,
478 engine::Deadline deadline) {
479 return size_.try_lock_shared_until(deadline) && DoPop(token, value);
480 }
481
482 [[nodiscard]] bool PopNoblock(ConsumerToken& token, T& value) {
483 return size_.try_lock_shared() && DoPop(token, value);
484 }
485
486 void OnElementPushed() { size_.unlock_shared(); }
487
488 void StopBlockingOnPop() {
489 size_control_.SetCapacityOverride(kUnbounded + kSemaphoreUnlockValue);
490 }
491
492 void ResumeBlockingOnPop() { size_control_.RemoveCapacityOverride(); }
493
494 std::size_t GetSize() const {
495 std::size_t cur_size = size_.RemainingApprox();
496 if (cur_size < kUnbounded) {
497 return cur_size;
498 } else if (cur_size <= kSemaphoreUnlockValue) {
499 return 0;
500 }
501 return cur_size - kSemaphoreUnlockValue;
502 }
503
504 private:
505 [[nodiscard]] bool DoPop(ConsumerToken& token, T& value) {
506 while (true) {
507 if (queue_.DoPop(token, value)) return true;
508 if (queue_.NoMoreProducers()) {
509 size_.unlock_shared();
510 return false;
511 }
512 // We can get here if another consumer steals our element, leaving another
513 // element in a Moodycamel sub-queue that we have already passed.
514 }
515 }
516
517 GenericQueue& queue_;
518 engine::Semaphore size_;
519 concurrent::impl::SemaphoreCapacityControl size_control_;
520};
521
536template <typename T>
538
545template <typename T>
547
553template <typename T>
555
561template <typename T>
563
564} // namespace concurrent
565
566USERVER_NAMESPACE_END