userver: userver/concurrent/queue_helpers.hpp Source File
Loading...
Searching...
No Matches
queue_helpers.hpp
1#pragma once
2
3#include <memory>
4
5#include <userver/engine/deadline.hpp>
6#include <userver/engine/task/current_task.hpp>
7#include <userver/utils/assert.hpp>
8
9USERVER_NAMESPACE_BEGIN
10
11namespace concurrent {
12
13namespace impl {
14
15struct NoToken final {
16 template <typename LockFreeQueue>
17 explicit NoToken(LockFreeQueue& /*unused*/) {}
18};
19
20struct MultiToken final {
21 template <typename LockFreeQueue>
22 explicit MultiToken(LockFreeQueue& /*unused*/) {}
23};
24
25} // namespace impl
26
27/// @brief Producer side of concurrent queues
28///
29/// @warning A single Producer must not be used from multiple threads concurrently
30///
31/// @see @ref concurrent_queues
32template <typename QueueType, typename ProducerToken, typename EmplaceEnablerType>
33class Producer final {
34 static_assert(
35 std::is_same_v<EmplaceEnablerType, typename QueueType::EmplaceEnabler>,
36 "Do not instantiate Producer on your own. Use Producer type alias "
37 "from queue"
38 );
39
40 using ValueType = typename QueueType::ValueType;
41
42public:
43 Producer(const Producer&) = delete;
44 Producer(Producer&&) noexcept = default;
45 Producer& operator=(const Producer&) = delete;
46 Producer& operator=(Producer&& other) noexcept {
47 queue_.swap(other.queue_);
48 std::swap(token_, other.token_);
49 return *this;
50 }
51
52 ~Producer() {
53 if (queue_) {
54 queue_->MarkProducerIsDead();
55 }
56 }
57
58 /// Push an element into queue. May wait asynchronously if the queue is full.
59 /// Leaves the `value` unmodified if the operation does not succeed.
60 /// @returns whether push succeeded before the deadline and before the task
61 /// was canceled.
62 [[nodiscard]] bool Push(ValueType&& value, engine::Deadline deadline = {}) const {
63 UASSERT_MSG(queue_, "Trying to use a moved-from queue Producer");
64 UASSERT_MSG(engine::current_task::IsTaskProcessorThread(), "Use PushNoblock for non-coroutine producers");
65 return queue_->Push(token_, std::move(value), deadline);
66 }
67
68 /// Try to push an element into queue without blocking. May be used in
69 /// non-coroutine environment. Leaves the `value` unmodified if the operation
70 /// does not succeed.
71 /// @returns whether push succeeded.
72 [[nodiscard]] bool PushNoblock(ValueType&& value) const {
73 UASSERT_MSG(queue_, "Trying to use a moved-from queue Producer");
74 return queue_->PushNoblock(token_, std::move(value));
75 }
76
77 void Reset() && noexcept {
78 if (queue_) {
79 queue_->MarkProducerIsDead();
80 }
81 queue_.reset();
82 [[maybe_unused]] ProducerToken for_destruction = std::move(token_);
83 }
84
85 /// Const access to source queue.
86 [[nodiscard]] std::shared_ptr<const QueueType> Queue() const { return {queue_}; }
87
88 /// @cond
89 // For internal use only
90 Producer(std::shared_ptr<QueueType> queue, EmplaceEnablerType /*unused*/)
91 : queue_(std::move(queue)),
92 token_(queue_->queue_)
93 {}
94 /// @endcond
95
96private:
97 std::shared_ptr<QueueType> queue_;
98 mutable ProducerToken token_;
99};
100
101/// @brief Consumer side of concurrent queues
102///
103/// @warning A single Consumer must not be used from multiple threads concurrently
104///
105/// @see @ref concurrent_queues
106template <typename QueueType, typename ConsumerToken, typename EmplaceEnablerType>
107class Consumer final {
108 static_assert(
109 std::is_same_v<EmplaceEnablerType, typename QueueType::EmplaceEnabler>,
110 "Do not instantiate Consumer on your own. Use Consumer type alias "
111 "from queue"
112 );
113
114 using ValueType = typename QueueType::ValueType;
115
116public:
117 Consumer(const Consumer&) = delete;
118 Consumer(Consumer&&) noexcept = default;
119 Consumer& operator=(const Consumer&) = delete;
120 Consumer& operator=(Consumer&& other) noexcept {
121 queue_.swap(other.queue_);
122 std::swap(token_, other.token_);
123 return *this;
124 }
125
126 ~Consumer() {
127 if (queue_) {
128 queue_->MarkConsumerIsDead();
129 }
130 }
131
132 /// @brief Pop an element from queue.
133 /// May wait asynchronously if the queue is empty, but the producer is alive.
134 ///
135 /// @returns whether something was popped before the deadline or before a task cancellation.
136 /// `false` can be returned before the deadline when the producer is no longer alive.
137 /// @warning Be careful when using a method in a loop. The
138 /// `engine::Deadline` is a wrapper over `std::chrono::time_point`, not
139 /// `duration`! If you need a timeout, you must reconstruct the deadline in the loop.
140 [[nodiscard]] bool Pop(ValueType& value, engine::Deadline deadline = {}) const {
141 UASSERT_MSG(queue_, "Trying to use a moved-from queue Consumer");
142 UASSERT_MSG(engine::current_task::IsTaskProcessorThread(), "Use PopNoblock for non-coroutine consumers");
143 return queue_->Pop(token_, value, deadline);
144 }
145
146 /// @brief Try to pop an element from queue without blocking.
147 /// May be used in non-coroutine environment.
148 ///
149 /// @returns whether something was popped.
150 [[nodiscard]] bool PopNoblock(ValueType& value) const {
151 UASSERT_MSG(queue_, "Trying to use a moved-from queue Consumer");
152 return queue_->PopNoblock(token_, value);
153 }
154
155 void Reset() && {
156 if (queue_) {
157 queue_->MarkConsumerIsDead();
158 }
159 queue_.reset();
160 [[maybe_unused]] ConsumerToken for_destruction = std::move(token_);
161 }
162
163 /// Const access to source queue.
164 [[nodiscard]] std::shared_ptr<const QueueType> Queue() const { return {queue_}; }
165
166 /// @cond
167 // For internal use only
168 Consumer(std::shared_ptr<QueueType> queue, EmplaceEnablerType /*unused*/)
169 : queue_(std::move(queue)),
170 token_(queue_->queue_)
171 {}
172 /// @endcond
173
174private:
175 std::shared_ptr<QueueType> queue_{};
176 mutable ConsumerToken token_;
177};
178
179} // namespace concurrent
180
181USERVER_NAMESPACE_END