userver: userver/concurrent/queue_helpers.hpp Source File
Loading...
Searching...
No Matches
queue_helpers.hpp
Go to the documentation of this file.
1#pragma once
2
3/// @file userver/concurrent/queue_helpers.hpp
4/// @brief Producer and consumer helpers for concurrent queues
5
6#include <memory>
7
8#include <userver/engine/deadline.hpp>
9#include <userver/engine/task/current_task.hpp>
10#include <userver/utils/assert.hpp>
11
12USERVER_NAMESPACE_BEGIN
13
14namespace concurrent {
15
16namespace impl {
17
18struct NoToken final {
19 template <typename LockFreeQueue>
20 explicit NoToken(LockFreeQueue& /*unused*/) {}
21};
22
23struct MultiToken final {
24 template <typename LockFreeQueue>
25 explicit MultiToken(LockFreeQueue& /*unused*/) {}
26};
27
28} // namespace impl
29
30/// @brief Producer side of concurrent queues
31///
32/// @warning A single Producer must not be used from multiple threads concurrently
33///
34/// @see @ref concurrent_queues
35template <typename QueueType, typename ProducerToken, typename EmplaceEnablerType>
36class Producer final {
37 static_assert(
38 std::is_same_v<EmplaceEnablerType, typename QueueType::EmplaceEnabler>,
39 "Do not instantiate Producer on your own. Use Producer type alias "
40 "from queue"
41 );
42
43 using ValueType = typename QueueType::ValueType;
44
45public:
46 Producer(const Producer&) = delete;
47 Producer(Producer&&) noexcept = default;
48 Producer& operator=(const Producer&) = delete;
49 Producer& operator=(Producer&& other) noexcept {
50 queue_.swap(other.queue_);
51 std::swap(token_, other.token_);
52 return *this;
53 }
54
55 ~Producer() {
56 if (queue_) {
57 queue_->MarkProducerIsDead();
58 }
59 }
60
61 /// Push an element into queue. May wait asynchronously if the queue is full.
62 /// Leaves the `value` unmodified if the operation does not succeed.
63 /// @returns whether push succeeded before the deadline and before the task
64 /// was canceled.
65 [[nodiscard]] bool Push(ValueType&& value, engine::Deadline deadline = {}) const {
66 UASSERT_MSG(queue_, "Trying to use a moved-from queue Producer");
67 UASSERT_MSG(engine::current_task::IsTaskProcessorThread(), "Use PushNoblock for non-coroutine producers");
68 return queue_->Push(token_, std::move(value), deadline);
69 }
70
71 /// Try to push an element into queue without blocking. May be used in
72 /// non-coroutine environment. Leaves the `value` unmodified if the operation
73 /// does not succeed.
74 /// @returns whether push succeeded.
75 [[nodiscard]] bool PushNoblock(ValueType&& value) const {
76 UASSERT_MSG(queue_, "Trying to use a moved-from queue Producer");
77 return queue_->PushNoblock(token_, std::move(value));
78 }
79
80 void Reset() && noexcept {
81 if (queue_) {
82 queue_->MarkProducerIsDead();
83 }
84 queue_.reset();
85 [[maybe_unused]] ProducerToken for_destruction = std::move(token_);
86 }
87
88 /// Const access to source queue.
89 [[nodiscard]] std::shared_ptr<const QueueType> Queue() const { return {queue_}; }
90
91 /// @cond
92 // For internal use only
93 Producer(std::shared_ptr<QueueType> queue, EmplaceEnablerType /*unused*/)
94 : queue_(std::move(queue)),
95 token_(queue_->queue_)
96 {}
97 /// @endcond
98
99private:
100 std::shared_ptr<QueueType> queue_;
101 mutable ProducerToken token_;
102};
103
104/// @brief Consumer side of concurrent queues
105///
106/// @warning A single Consumer must not be used from multiple threads concurrently
107///
108/// @see @ref concurrent_queues
109template <typename QueueType, typename ConsumerToken, typename EmplaceEnablerType>
110class Consumer final {
111 static_assert(
112 std::is_same_v<EmplaceEnablerType, typename QueueType::EmplaceEnabler>,
113 "Do not instantiate Consumer on your own. Use Consumer type alias "
114 "from queue"
115 );
116
117 using ValueType = typename QueueType::ValueType;
118
119public:
120 Consumer(const Consumer&) = delete;
121 Consumer(Consumer&&) noexcept = default;
122 Consumer& operator=(const Consumer&) = delete;
123 Consumer& operator=(Consumer&& other) noexcept {
124 queue_.swap(other.queue_);
125 std::swap(token_, other.token_);
126 return *this;
127 }
128
129 ~Consumer() {
130 if (queue_) {
131 queue_->MarkConsumerIsDead();
132 }
133 }
134
135 /// @brief Pop an element from queue.
136 /// May wait asynchronously if the queue is empty, but the producer is alive.
137 ///
138 /// @returns whether something was popped before the deadline or before a task cancellation.
139 /// `false` can be returned before the deadline when the producer is no longer alive.
140 /// @warning Be careful when using a method in a loop. The
141 /// `engine::Deadline` is a wrapper over `std::chrono::time_point`, not
142 /// `duration`! If you need a timeout, you must reconstruct the deadline in the loop.
143 [[nodiscard]] bool Pop(ValueType& value, engine::Deadline deadline = {}) const {
144 UASSERT_MSG(queue_, "Trying to use a moved-from queue Consumer");
145 UASSERT_MSG(engine::current_task::IsTaskProcessorThread(), "Use PopNoblock for non-coroutine consumers");
146 return queue_->Pop(token_, value, deadline);
147 }
148
149 /// @brief Try to pop an element from queue without blocking.
150 /// May be used in non-coroutine environment.
151 ///
152 /// @returns whether something was popped.
153 [[nodiscard]] bool PopNoblock(ValueType& value) const {
154 UASSERT_MSG(queue_, "Trying to use a moved-from queue Consumer");
155 return queue_->PopNoblock(token_, value);
156 }
157
158 void Reset() && {
159 if (queue_) {
160 queue_->MarkConsumerIsDead();
161 }
162 queue_.reset();
163 [[maybe_unused]] ConsumerToken for_destruction = std::move(token_);
164 }
165
166 /// Const access to source queue.
167 [[nodiscard]] std::shared_ptr<const QueueType> Queue() const { return {queue_}; }
168
169 /// @cond
170 // For internal use only
171 Consumer(std::shared_ptr<QueueType> queue, EmplaceEnablerType /*unused*/)
172 : queue_(std::move(queue)),
173 token_(queue_->queue_)
174 {}
175 /// @endcond
176
177private:
178 std::shared_ptr<QueueType> queue_{};
179 mutable ConsumerToken token_;
180};
181
182} // namespace concurrent
183
184USERVER_NAMESPACE_END