userver: userver/concurrent/queue.hpp Source File
Loading...
Searching...
No Matches
queue.hpp
1#pragma once
2
3#include <atomic>
4#include <limits>
5#include <memory>
6
7#include <moodycamel/concurrentqueue.h>
8
9#include <userver/concurrent/impl/semaphore_capacity_control.hpp>
10#include <userver/concurrent/queue_helpers.hpp>
11#include <userver/engine/deadline.hpp>
12#include <userver/engine/semaphore.hpp>
13#include <userver/engine/single_consumer_event.hpp>
14#include <userver/engine/task/cancel.hpp>
15#include <userver/utils/assert.hpp>
16#include <userver/utils/atomic.hpp>
17
18USERVER_NAMESPACE_BEGIN
19
20namespace concurrent {
21
22namespace impl {
23
24template <bool MultipleProducer, bool MultipleConsumer>
25struct SimpleQueuePolicy {
26 template <typename T>
27 static constexpr std::size_t GetElementSize(const T&) {
28 return 1;
29 }
30
31 static constexpr bool kIsMultipleProducer{MultipleProducer};
32 static constexpr bool kIsMultipleConsumer{MultipleConsumer};
33};
34
35template <bool MultipleProducer, bool MultipleConsumer>
36struct ContainerQueuePolicy {
37 template <typename T>
38 static std::size_t GetElementSize(const T& value) {
39 return std::size(value);
40 }
41
42 static constexpr bool kIsMultipleProducer{MultipleProducer};
43 static constexpr bool kIsMultipleConsumer{MultipleConsumer};
44};
45
46} // namespace impl
47
48/// @brief Queue with single and multi producer/consumer options.
49///
50/// @tparam T element type
51/// @tparam QueuePolicy policy type, it should have:
52/// 1. `static constexpr bool kIsMultipleProducer`
53/// 2. `static constexpr bool kIsMultipleConsumer`
54/// 3. `static std::size_t GetElementSize(const T& value)`
55///
56/// On practice, instead of using `GenericQueue` directly, use an alias:
57///
58/// * concurrent::NonFifoMpmcQueue
59/// * concurrent::NonFifoMpscQueue
60/// * concurrent::SpmcQueue
61/// * concurrent::SpscQueue
62///
63/// @see @ref scripts/docs/en/userver/synchronization.md
64template <typename T, typename QueuePolicy>
65class GenericQueue final
66 : public std::enable_shared_from_this<GenericQueue<T, QueuePolicy>> {
67 struct EmplaceEnabler final {
68 // Disable {}-initialization in Queue's constructor
69 explicit EmplaceEnabler() = default;
70 };
71
72 using ProducerToken =
73 std::conditional_t<QueuePolicy::kIsMultipleProducer,
74 moodycamel::ProducerToken, impl::NoToken>;
75 using ConsumerToken =
76 std::conditional_t<QueuePolicy::kIsMultipleProducer,
77 moodycamel::ConsumerToken, impl::NoToken>;
78 using MultiProducerToken = impl::MultiToken;
79 using MultiConsumerToken =
80 std::conditional_t<QueuePolicy::kIsMultipleProducer, impl::MultiToken,
81 impl::NoToken>;
82
83 using SingleProducerToken =
84 std::conditional_t<!QueuePolicy::kIsMultipleProducer,
85 moodycamel::ProducerToken, impl::NoToken>;
86
87 friend class Producer<GenericQueue, ProducerToken, EmplaceEnabler>;
88 friend class Producer<GenericQueue, MultiProducerToken, EmplaceEnabler>;
89 friend class Consumer<GenericQueue, ConsumerToken, EmplaceEnabler>;
90 friend class Consumer<GenericQueue, MultiConsumerToken, EmplaceEnabler>;
91
92 public:
93 using ValueType = T;
94
95 using Producer =
96 concurrent::Producer<GenericQueue, ProducerToken, EmplaceEnabler>;
97 using Consumer =
98 concurrent::Consumer<GenericQueue, ConsumerToken, EmplaceEnabler>;
99 using MultiProducer =
100 concurrent::Producer<GenericQueue, MultiProducerToken, EmplaceEnabler>;
101 using MultiConsumer =
102 concurrent::Consumer<GenericQueue, MultiConsumerToken, EmplaceEnabler>;
103
104 static constexpr std::size_t kUnbounded =
105 std::numeric_limits<std::size_t>::max() / 4;
106
107 /// @cond
108 // For internal use only
109 explicit GenericQueue(std::size_t max_size, EmplaceEnabler /*unused*/)
110 : queue_(),
111 single_producer_token_(queue_),
112 producer_side_(*this, std::min(max_size, kUnbounded)),
113 consumer_side_(*this) {}
114
115 ~GenericQueue() {
116 UASSERT(consumers_count_ == kCreatedAndDead || !consumers_count_);
117 UASSERT(producers_count_ == kCreatedAndDead || !producers_count_);
118
119 if (producers_count_ == kCreatedAndDead) {
120 // To allow reading the remaining items
121 consumer_side_.ResumeBlockingOnPop();
122 }
123
124 // Clear remaining items in queue
125 T value;
126 ConsumerToken token{queue_};
127 while (consumer_side_.PopNoblock(token, value)) {
128 }
129 }
130
131 GenericQueue(GenericQueue&&) = delete;
132 GenericQueue(const GenericQueue&) = delete;
133 GenericQueue& operator=(GenericQueue&&) = delete;
134 GenericQueue& operator=(const GenericQueue&) = delete;
135 /// @endcond
136
137 /// Create a new queue
138 static std::shared_ptr<GenericQueue> Create(
139 std::size_t max_size = kUnbounded) {
140 return std::make_shared<GenericQueue>(max_size, EmplaceEnabler{});
141 }
142
143 /// Get a `Producer` which makes it possible to push items into the queue.
144 /// Can be called multiple times. The resulting `Producer` is not thread-safe,
145 /// so you have to use multiple Producers of the same queue to simultaneously
146 /// write from multiple coroutines/threads.
147 ///
148 /// @note `Producer` may outlive the queue and consumers.
150 PrepareProducer();
151 return Producer(this->shared_from_this(), EmplaceEnabler{});
152 }
153
154 /// Get a `MultiProducer` which makes it possible to push items into the
155 /// queue. Can be called multiple times. The resulting `MultiProducer` is
156 /// thread-safe, so it can be used simultaneously from multiple
157 /// coroutines/threads.
158 ///
159 /// @note `MultiProducer` may outlive the queue and consumers.
160 ///
161 /// @note Prefer `Producer` tokens when possible, because `MultiProducer`
162 /// token incurs some overhead.
163 MultiProducer GetMultiProducer() {
164 static_assert(QueuePolicy::kIsMultipleProducer,
165 "Trying to obtain MultiProducer for a single-producer queue");
166 PrepareProducer();
167 return MultiProducer(this->shared_from_this(), EmplaceEnabler{});
168 }
169
170 /// Get a `Consumer` which makes it possible to read items from the queue.
171 /// Can be called multiple times. The resulting `Consumer` is not thread-safe,
172 /// so you have to use multiple `Consumer`s of the same queue to
173 /// simultaneously write from multiple coroutines/threads.
174 ///
175 /// @note `Consumer` may outlive the queue and producers.
177 PrepareConsumer();
178 return Consumer(this->shared_from_this(), EmplaceEnabler{});
179 }
180
181 /// Get a `MultiConsumer` which makes it possible to read items from the
182 /// queue. Can be called multiple times. The resulting `MultiConsumer` is
183 /// thread-safe, so it can be used simultaneously from multiple
184 /// coroutines/threads.
185 ///
186 /// @note `MultiConsumer` may outlive the queue and producers.
187 ///
188 /// @note Prefer `Consumer` tokens when possible, because `MultiConsumer`
189 /// token incurs some overhead.
191 static_assert(QueuePolicy::kIsMultipleConsumer,
192 "Trying to obtain MultiConsumer for a single-consumer queue");
193 PrepareConsumer();
194 return MultiConsumer(this->shared_from_this(), EmplaceEnabler{});
195 }
196
197 /// @brief Sets the limit on the queue size, pushes over this limit will block
198 /// @note This is a soft limit and may be slightly overrun under load.
199 void SetSoftMaxSize(std::size_t max_size) {
200 producer_side_.SetSoftMaxSize(std::min(max_size, kUnbounded));
201 }
202
203 /// @brief Gets the limit on the queue size
204 std::size_t GetSoftMaxSize() const { return producer_side_.GetSoftMaxSize(); }
205
206 /// @brief Gets the approximate size of queue
207 std::size_t GetSizeApproximate() const {
208 return producer_side_.GetSizeApproximate();
209 }
210
211 private:
212 class SingleProducerSide;
213 class MultiProducerSide;
214 class SingleConsumerSide;
215 class MultiConsumerSide;
216
217 /// Proxy-class makes synchronization of Push operations in multi or single
218 /// producer cases
219 using ProducerSide =
220 std::conditional_t<QueuePolicy::kIsMultipleProducer, MultiProducerSide,
221 SingleProducerSide>;
222
223 /// Proxy-class makes synchronization of Pop operations in multi or single
224 /// consumer cases
225 using ConsumerSide =
226 std::conditional_t<QueuePolicy::kIsMultipleConsumer, MultiConsumerSide,
227 SingleConsumerSide>;
228
229 template <typename Token>
230 [[nodiscard]] bool Push(Token& token, T&& value, engine::Deadline deadline) {
231 const std::size_t value_size = QueuePolicy::GetElementSize(value);
232 UASSERT(value_size > 0);
233 return producer_side_.Push(token, std::move(value), deadline, value_size);
234 }
235
236 template <typename Token>
237 [[nodiscard]] bool PushNoblock(Token& token, T&& value) {
238 const std::size_t value_size = QueuePolicy::GetElementSize(value);
239 UASSERT(value_size > 0);
240 return producer_side_.PushNoblock(token, std::move(value), value_size);
241 }
242
243 template <typename Token>
244 [[nodiscard]] bool Pop(Token& token, T& value, engine::Deadline deadline) {
245 return consumer_side_.Pop(token, value, deadline);
246 }
247
248 template <typename Token>
249 [[nodiscard]] bool PopNoblock(Token& token, T& value) {
250 return consumer_side_.PopNoblock(token, value);
251 }
252
253 void PrepareProducer() {
254 std::size_t old_producers_count{};
255 utils::AtomicUpdate(producers_count_, [&](auto old_value) {
256 UINVARIANT(QueuePolicy::kIsMultipleProducer || old_value != 1,
257 "Incorrect usage of queue producers");
258 old_producers_count = old_value;
259 return old_value == kCreatedAndDead ? 1 : old_value + 1;
260 });
261
262 if (old_producers_count == kCreatedAndDead) {
263 consumer_side_.ResumeBlockingOnPop();
264 }
265 }
266
267 void PrepareConsumer() {
268 std::size_t old_consumers_count{};
269 utils::AtomicUpdate(consumers_count_, [&](auto old_value) {
270 UINVARIANT(QueuePolicy::kIsMultipleConsumer || old_value != 1,
271 "Incorrect usage of queue consumers");
272 old_consumers_count = old_value;
273 return old_value == kCreatedAndDead ? 1 : old_value + 1;
274 });
275
276 if (old_consumers_count == kCreatedAndDead) {
277 producer_side_.ResumeBlockingOnPush();
278 }
279 }
280
281 void MarkConsumerIsDead() {
282 const auto new_consumers_count =
283 utils::AtomicUpdate(consumers_count_, [](auto old_value) {
284 return old_value == 1 ? kCreatedAndDead : old_value - 1;
285 });
286 if (new_consumers_count == kCreatedAndDead) {
287 producer_side_.StopBlockingOnPush();
288 }
289 }
290
291 void MarkProducerIsDead() {
292 const auto new_producers_count =
293 utils::AtomicUpdate(producers_count_, [](auto old_value) {
294 return old_value == 1 ? kCreatedAndDead : old_value - 1;
295 });
296 if (new_producers_count == kCreatedAndDead) {
297 consumer_side_.StopBlockingOnPop();
298 }
299 }
300
301 public: // TODO
302 /// @cond
303 bool NoMoreConsumers() const { return consumers_count_ == kCreatedAndDead; }
304
305 bool NoMoreProducers() const { return producers_count_ == kCreatedAndDead; }
306 /// @endcond
307
308 private:
309 template <typename Token>
310 void DoPush(Token& token, T&& value) {
311 if constexpr (std::is_same_v<Token, moodycamel::ProducerToken>) {
312 static_assert(QueuePolicy::kIsMultipleProducer);
313 queue_.enqueue(token, std::move(value));
314 } else if constexpr (std::is_same_v<Token, MultiProducerToken>) {
315 static_assert(QueuePolicy::kIsMultipleProducer);
316 queue_.enqueue(std::move(value));
317 } else {
318 static_assert(std::is_same_v<Token, impl::NoToken>);
319 static_assert(!QueuePolicy::kIsMultipleProducer);
320 queue_.enqueue(single_producer_token_, std::move(value));
321 }
322
323 consumer_side_.OnElementPushed();
324 }
325
326 template <typename Token>
327 [[nodiscard]] bool DoPop(Token& token, T& value) {
328 bool success{};
329
330 if constexpr (std::is_same_v<Token, moodycamel::ConsumerToken>) {
331 static_assert(QueuePolicy::kIsMultipleProducer);
332 success = queue_.try_dequeue(token, value);
333 } else if constexpr (std::is_same_v<Token, impl::MultiToken>) {
334 static_assert(QueuePolicy::kIsMultipleProducer);
335 success = queue_.try_dequeue(value);
336 } else {
337 static_assert(std::is_same_v<Token, impl::NoToken>);
338 static_assert(!QueuePolicy::kIsMultipleProducer);
339 success = queue_.try_dequeue_from_producer(single_producer_token_, value);
340 }
341
342 if (success) {
343 producer_side_.OnElementPopped(QueuePolicy::GetElementSize(value));
344 return true;
345 }
346
347 return false;
348 }
349
350 moodycamel::ConcurrentQueue<T> queue_{1};
351 std::atomic<std::size_t> consumers_count_{0};
352 std::atomic<std::size_t> producers_count_{0};
353
354 SingleProducerToken single_producer_token_;
355
356 ProducerSide producer_side_;
357 ConsumerSide consumer_side_;
358
359 static constexpr std::size_t kCreatedAndDead =
360 std::numeric_limits<std::size_t>::max();
361 static constexpr std::size_t kSemaphoreUnlockValue =
362 std::numeric_limits<std::size_t>::max() / 2;
363};
364
365// Single-producer ProducerSide implementation
366template <typename T, typename QueuePolicy>
367class GenericQueue<T, QueuePolicy>::SingleProducerSide final {
368 public:
369 explicit SingleProducerSide(GenericQueue& queue, std::size_t capacity)
370 : queue_(queue), used_capacity_(0), total_capacity_(capacity) {}
371
372 // Blocks if there is a consumer to Pop the current value and task
373 // shouldn't cancel and queue if full
374 template <typename Token>
375 [[nodiscard]] bool Push(Token& token, T&& value, engine::Deadline deadline,
376 std::size_t value_size) {
377 bool no_more_consumers = false;
378 const bool success = non_full_event_.WaitUntil(deadline, [&] {
379 if (queue_.NoMoreConsumers()) {
380 no_more_consumers = true;
381 return true;
382 }
383 if (DoPush(token, std::move(value), value_size)) {
384 return true;
385 }
386 return false;
387 });
388 return success && !no_more_consumers;
389 }
390
391 template <typename Token>
392 [[nodiscard]] bool PushNoblock(Token& token, T&& value,
393 std::size_t value_size) {
394 return !queue_.NoMoreConsumers() &&
395 DoPush(token, std::move(value), value_size);
396 }
397
398 void OnElementPopped(std::size_t released_capacity) {
399 used_capacity_.fetch_sub(released_capacity);
400 non_full_event_.Send();
401 }
402
403 void StopBlockingOnPush() { non_full_event_.Send(); }
404
405 void ResumeBlockingOnPush() {}
406
407 void SetSoftMaxSize(std::size_t new_capacity) {
408 const auto old_capacity = total_capacity_.exchange(new_capacity);
409 if (new_capacity > old_capacity) non_full_event_.Send();
410 }
411
412 std::size_t GetSoftMaxSize() const noexcept { return total_capacity_.load(); }
413
414 std::size_t GetSizeApproximate() const noexcept {
415 return used_capacity_.load();
416 }
417
418 private:
419 template <typename Token>
420 [[nodiscard]] bool DoPush(Token& token, T&& value, std::size_t value_size) {
421 if (used_capacity_.load() + value_size > total_capacity_.load()) {
422 return false;
423 }
424
425 used_capacity_.fetch_add(value_size);
426 queue_.DoPush(token, std::move(value));
427 return true;
428 }
429
430 GenericQueue& queue_;
431 engine::SingleConsumerEvent non_full_event_;
432 std::atomic<std::size_t> used_capacity_;
433 std::atomic<std::size_t> total_capacity_;
434};
435
436// Multi producer ProducerSide implementation
437template <typename T, typename QueuePolicy>
438class GenericQueue<T, QueuePolicy>::MultiProducerSide final {
439 public:
440 explicit MultiProducerSide(GenericQueue& queue, std::size_t capacity)
441 : queue_(queue),
442 remaining_capacity_(capacity),
443 remaining_capacity_control_(remaining_capacity_) {}
444
445 // Blocks if there is a consumer to Pop the current value and task
446 // shouldn't cancel and queue if full
447 template <typename Token>
448 [[nodiscard]] bool Push(Token& token, T&& value, engine::Deadline deadline,
449 std::size_t value_size) {
450 return remaining_capacity_.try_lock_shared_until_count(deadline,
451 value_size) &&
452 DoPush(token, std::move(value), value_size);
453 }
454
455 template <typename Token>
456 [[nodiscard]] bool PushNoblock(Token& token, T&& value,
457 std::size_t value_size) {
458 return remaining_capacity_.try_lock_shared_count(value_size) &&
459 DoPush(token, std::move(value), value_size);
460 }
461
462 void OnElementPopped(std::size_t value_size) {
463 remaining_capacity_.unlock_shared_count(value_size);
464 }
465
466 void StopBlockingOnPush() {
467 remaining_capacity_control_.SetCapacityOverride(0);
468 }
469
470 void ResumeBlockingOnPush() {
471 remaining_capacity_control_.RemoveCapacityOverride();
472 }
473
474 void SetSoftMaxSize(std::size_t count) {
475 remaining_capacity_control_.SetCapacity(count);
476 }
477
478 std::size_t GetSizeApproximate() const noexcept {
479 return remaining_capacity_.UsedApprox();
480 }
481
482 std::size_t GetSoftMaxSize() const noexcept {
483 return remaining_capacity_control_.GetCapacity();
484 }
485
486 private:
487 template <typename Token>
488 [[nodiscard]] bool DoPush(Token& token, T&& value, std::size_t value_size) {
489 if (queue_.NoMoreConsumers()) {
490 remaining_capacity_.unlock_shared_count(value_size);
491 return false;
492 }
493
494 queue_.DoPush(token, std::move(value));
495 return true;
496 }
497
498 GenericQueue& queue_;
499 engine::CancellableSemaphore remaining_capacity_;
500 concurrent::impl::SemaphoreCapacityControl remaining_capacity_control_;
501};
502
503// Single consumer ConsumerSide implementation
504template <typename T, typename QueuePolicy>
505class GenericQueue<T, QueuePolicy>::SingleConsumerSide final {
506 public:
507 explicit SingleConsumerSide(GenericQueue& queue)
508 : queue_(queue), element_count_(0) {}
509
510 // Blocks only if queue is empty
511 template <typename Token>
512 [[nodiscard]] bool Pop(Token& token, T& value, engine::Deadline deadline) {
513 bool no_more_producers = false;
514 const bool success = nonempty_event_.WaitUntil(deadline, [&] {
515 if (DoPop(token, value)) {
516 return true;
517 }
518 if (queue_.NoMoreProducers()) {
519 // Producer might have pushed something in queue between .pop()
520 // and !producer_is_created_and_dead_ check. Check twice to avoid
521 // TOCTOU.
522 if (!DoPop(token, value)) {
523 no_more_producers = true;
524 }
525 return true;
526 }
527 return false;
528 });
529 return success && !no_more_producers;
530 }
531
532 template <typename Token>
533 [[nodiscard]] bool PopNoblock(Token& token, T& value) {
534 return DoPop(token, value);
535 }
536
537 void OnElementPushed() {
538 ++element_count_;
539 nonempty_event_.Send();
540 }
541
542 void StopBlockingOnPop() { nonempty_event_.Send(); }
543
544 void ResumeBlockingOnPop() {}
545
546 std::size_t GetElementCount() const { return element_count_; }
547
548 private:
549 template <typename Token>
550 [[nodiscard]] bool DoPop(Token& token, T& value) {
551 if (queue_.DoPop(token, value)) {
552 --element_count_;
553 nonempty_event_.Reset();
554 return true;
555 }
556 return false;
557 }
558
559 GenericQueue& queue_;
560 engine::SingleConsumerEvent nonempty_event_;
561 std::atomic<std::size_t> element_count_;
562};
563
564// Multi consumer ConsumerSide implementation
565template <typename T, typename QueuePolicy>
566class GenericQueue<T, QueuePolicy>::MultiConsumerSide final {
567 public:
568 explicit MultiConsumerSide(GenericQueue& queue)
569 : queue_(queue),
570 element_count_(kUnbounded),
571 element_count_control_(element_count_) {
572 const bool success = element_count_.try_lock_shared_count(kUnbounded);
573 UASSERT(success);
574 }
575
576 ~MultiConsumerSide() { element_count_.unlock_shared_count(kUnbounded); }
577
578 // Blocks only if queue is empty
579 template <typename Token>
580 [[nodiscard]] bool Pop(Token& token, T& value, engine::Deadline deadline) {
581 return element_count_.try_lock_shared_until(deadline) &&
582 DoPop(token, value);
583 }
584
585 template <typename Token>
586 [[nodiscard]] bool PopNoblock(Token& token, T& value) {
587 return element_count_.try_lock_shared() && DoPop(token, value);
588 }
589
590 void OnElementPushed() { element_count_.unlock_shared(); }
591
592 void StopBlockingOnPop() {
593 element_count_control_.SetCapacityOverride(kUnbounded +
594 kSemaphoreUnlockValue);
595 }
596
597 void ResumeBlockingOnPop() {
598 element_count_control_.RemoveCapacityOverride();
599 }
600
601 std::size_t GetElementCount() const {
602 const std::size_t cur_element_count = element_count_.RemainingApprox();
603 if (cur_element_count < kUnbounded) {
604 return cur_element_count;
605 } else if (cur_element_count <= kSemaphoreUnlockValue) {
606 return 0;
607 }
608 return cur_element_count - kSemaphoreUnlockValue;
609 }
610
611 private:
612 template <typename Token>
613 [[nodiscard]] bool DoPop(Token& token, T& value) {
614 while (true) {
615 if (queue_.DoPop(token, value)) {
616 return true;
617 }
618 if (queue_.NoMoreProducers()) {
619 element_count_.unlock_shared();
620 return false;
621 }
622 // We can get here if another consumer steals our element, leaving another
623 // element in a Moodycamel sub-queue that we have already passed.
624 }
625 }
626
627 GenericQueue& queue_;
628 engine::CancellableSemaphore element_count_;
629 concurrent::impl::SemaphoreCapacityControl element_count_control_;
630};
631
632/// @ingroup userver_concurrency
633///
634/// @brief Non FIFO multiple producers multiple consumers queue.
635///
636/// Items from the same producer are always delivered in the production order.
637/// Items from different producers (or when using a `MultiProducer` token) are
638/// delivered in an unspecified order. In other words, FIFO order is maintained
639/// only within producers, but not between them. This may lead to increased peak
640/// latency of item processing.
641///
642/// In exchange for this, the queue has lower contention and increased
643/// throughput compared to a conventional lock-free queue.
644///
645/// @see @ref scripts/docs/en/userver/synchronization.md
646template <typename T>
647using NonFifoMpmcQueue = GenericQueue<T, impl::SimpleQueuePolicy<true, true>>;
648
649/// @ingroup userver_concurrency
650///
651/// @brief Non FIFO multiple producers single consumer queue.
652///
653/// @see concurrent::NonFifoMpmcQueue for the description of what NonFifo means.
654/// @see @ref scripts/docs/en/userver/synchronization.md
655template <typename T>
656using NonFifoMpscQueue = GenericQueue<T, impl::SimpleQueuePolicy<true, false>>;
657
658/// @ingroup userver_concurrency
659///
660/// @brief Single producer multiple consumers queue.
661///
662/// @see @ref scripts/docs/en/userver/synchronization.md
663template <typename T>
664using SpmcQueue = GenericQueue<T, impl::SimpleQueuePolicy<false, true>>;
665
666/// @ingroup userver_concurrency
667///
668/// @brief Single producer single consumer queue.
669///
670/// @see @ref scripts/docs/en/userver/synchronization.md
671template <typename T>
672using SpscQueue = GenericQueue<T, impl::SimpleQueuePolicy<false, false>>;
673
674/// @ingroup userver_concurrency
675///
676/// @brief Single producer single consumer queue of std::string which is bounded
677/// bytes inside.
678///
679/// @see @ref scripts/docs/en/userver/synchronization.md
680using StringStreamQueue =
681 GenericQueue<std::string, impl::ContainerQueuePolicy<false, false>>;
682
683} // namespace concurrent
684
685USERVER_NAMESPACE_END