userver: userver/concurrent/queue_helpers.hpp Source File
⚠️ This is the documentation for an old userver version. Click here to switch to the latest version.
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages Concepts
queue_helpers.hpp
1#pragma once
2
3#include <memory>
4
5#include <userver/engine/deadline.hpp>
6
7USERVER_NAMESPACE_BEGIN
8
9namespace concurrent {
10
11namespace impl {
12
13struct NoToken final {
14 template <typename LockFreeQueue>
15 explicit NoToken(LockFreeQueue& /*unused*/) {}
16};
17
18struct MultiToken final {
19 template <typename LockFreeQueue>
20 explicit MultiToken(LockFreeQueue& /*unused*/) {}
21};
22
23} // namespace impl
24
25/// @warning A single Producer must not be used from multiple threads
26/// concurrently
27template <typename QueueType, typename ProducerToken,
28 typename EmplaceEnablerType>
29class Producer final {
30 static_assert(
31 std::is_same_v<EmplaceEnablerType, typename QueueType::EmplaceEnabler>,
32 "Do not instantiate Producer on your own. Use Producer type alias "
33 "from queue");
34
35 using ValueType = typename QueueType::ValueType;
36
37 public:
38 Producer(const Producer&) = delete;
39 Producer(Producer&&) noexcept = default;
40 Producer& operator=(const Producer&) = delete;
41 Producer& operator=(Producer&&) noexcept = default;
42
43 ~Producer() {
44 if (queue_) queue_->MarkProducerIsDead();
45 }
46
47 /// Push element into queue. May wait asynchronously if the queue is full.
48 /// Leaves the `value` unmodified if the operation does not succeed.
49 /// @returns whether push succeeded before the deadline and before the task
50 /// was canceled.
51 [[nodiscard]] bool Push(ValueType&& value,
52 engine::Deadline deadline = {}) const {
53 UASSERT(queue_);
54 return queue_->Push(token_, std::move(value), deadline);
55 }
56
57 /// Try to push element into queue without blocking. May be used in
58 /// non-coroutine environment. Leaves the `value` unmodified if the operation
59 /// does not succeed.
60 /// @returns whether push succeeded.
61 [[nodiscard]] bool PushNoblock(ValueType&& value) const {
62 UASSERT(queue_);
63 return queue_->PushNoblock(token_, std::move(value));
64 }
65
66 void Reset() && {
67 if (queue_) queue_->MarkProducerIsDead();
68 queue_.reset();
69 [[maybe_unused]] ProducerToken for_destruction = std::move(token_);
70 }
71
72 /// Const access to source queue.
73 [[nodiscard]] std::shared_ptr<const QueueType> Queue() const {
74 return {queue_};
75 }
76
77 /// @cond
78 // For internal use only
79 Producer(std::shared_ptr<QueueType> queue, EmplaceEnablerType /*unused*/)
80 : queue_(std::move(queue)), token_(queue_->queue_) {}
81 /// @endcond
82
83 private:
84 std::shared_ptr<QueueType> queue_;
85 mutable ProducerToken token_;
86};
87
88/// @warning A single Consumer must not be used from multiple threads
89/// concurrently
90template <typename QueueType, typename ConsumerToken,
91 typename EmplaceEnablerType>
92class Consumer final {
93 static_assert(
94 std::is_same_v<EmplaceEnablerType, typename QueueType::EmplaceEnabler>,
95 "Do not instantiate Consumer on your own. Use Consumer type alias "
96 "from queue");
97
98 using ValueType = typename QueueType::ValueType;
99
100 public:
101 Consumer(const Consumer&) = delete;
102 Consumer(Consumer&&) noexcept = default;
103 Consumer& operator=(const Consumer&) = delete;
104 Consumer& operator=(Consumer&&) noexcept = default;
105
106 ~Consumer() {
107 if (queue_) queue_->MarkConsumerIsDead();
108 }
109
110 /// Pop element from queue. May wait asynchronously if the queue is empty,
111 /// but the producer is alive.
112 /// @returns whether something was popped before the deadline.
113 /// @note `false` can be returned before the deadline
114 /// when the producer is no longer alive.
115 [[nodiscard]] bool Pop(ValueType& value,
116 engine::Deadline deadline = {}) const {
117 return queue_->Pop(token_, value, deadline);
118 }
119
120 /// Try to pop element from queue without blocking. May be used in
121 /// non-coroutine environment
122 /// @return whether something was popped.
123 [[nodiscard]] bool PopNoblock(ValueType& value) const {
124 return queue_->PopNoblock(token_, value);
125 }
126
127 /// Const access to source queue.
128 [[nodiscard]] std::shared_ptr<const QueueType> Queue() const {
129 return {queue_};
130 }
131
132 /// @cond
133 // For internal use only
134 Consumer(std::shared_ptr<QueueType> queue, EmplaceEnablerType /*unused*/)
135 : queue_(std::move(queue)), token_(queue_->queue_) {}
136 /// @endcond
137
138 private:
139 std::shared_ptr<QueueType> queue_{};
140 mutable ConsumerToken token_;
141};
142
143} // namespace concurrent
144
145USERVER_NAMESPACE_END