userver: userver/concurrent/queue_helpers.hpp Source File
Loading...
Searching...
No Matches
queue_helpers.hpp
1#pragma once
2
3#include <memory>
4
5#include <userver/engine/deadline.hpp>
6#include <userver/engine/task/current_task.hpp>
7#include <userver/utils/assert.hpp>
8
9USERVER_NAMESPACE_BEGIN
10
11namespace concurrent {
12
13namespace impl {
14
15struct NoToken final {
16 template <typename LockFreeQueue>
17 explicit NoToken(LockFreeQueue& /*unused*/) {}
18};
19
20struct MultiToken final {
21 template <typename LockFreeQueue>
22 explicit MultiToken(LockFreeQueue& /*unused*/) {}
23};
24
25} // namespace impl
26
27/// @brief Producer side of concurrent queues
28///
29/// @warning A single Producer must not be used from multiple threads concurrently
30///
31/// @see @ref concurrent_queues
32template <typename QueueType, typename ProducerToken, typename EmplaceEnablerType>
33class Producer final {
34 static_assert(
35 std::is_same_v<EmplaceEnablerType, typename QueueType::EmplaceEnabler>,
36 "Do not instantiate Producer on your own. Use Producer type alias "
37 "from queue"
38 );
39
40 using ValueType = typename QueueType::ValueType;
41
42public:
43 Producer(const Producer&) = delete;
44 Producer(Producer&&) noexcept = default;
45 Producer& operator=(const Producer&) = delete;
46 Producer& operator=(Producer&& other) noexcept {
47 queue_.swap(other.queue_);
48 std::swap(token_, other.token_);
49 return *this;
50 }
51
52 ~Producer() {
53 if (queue_) queue_->MarkProducerIsDead();
54 }
55
56 /// Push an element into queue. May wait asynchronously if the queue is full.
57 /// Leaves the `value` unmodified if the operation does not succeed.
58 /// @returns whether push succeeded before the deadline and before the task
59 /// was canceled.
60 [[nodiscard]] bool Push(ValueType&& value, engine::Deadline deadline = {}) const {
61 UASSERT_MSG(queue_, "Trying to use a moved-from queue Producer");
62 UASSERT_MSG(engine::current_task::IsTaskProcessorThread(), "Use PushNoblock for non-coroutine producers");
63 return queue_->Push(token_, std::move(value), deadline);
64 }
65
66 /// Try to push an element into queue without blocking. May be used in
67 /// non-coroutine environment. Leaves the `value` unmodified if the operation
68 /// does not succeed.
69 /// @returns whether push succeeded.
70 [[nodiscard]] bool PushNoblock(ValueType&& value) const {
71 UASSERT_MSG(queue_, "Trying to use a moved-from queue Producer");
72 return queue_->PushNoblock(token_, std::move(value));
73 }
74
75 void Reset() && noexcept {
76 if (queue_) queue_->MarkProducerIsDead();
77 queue_.reset();
78 [[maybe_unused]] ProducerToken for_destruction = std::move(token_);
79 }
80
81 /// Const access to source queue.
82 [[nodiscard]] std::shared_ptr<const QueueType> Queue() const { return {queue_}; }
83
84 /// @cond
85 // For internal use only
86 Producer(std::shared_ptr<QueueType> queue, EmplaceEnablerType /*unused*/)
87 : queue_(std::move(queue)), token_(queue_->queue_) {}
88 /// @endcond
89
90private:
91 std::shared_ptr<QueueType> queue_;
92 mutable ProducerToken token_;
93};
94
95/// @brief Consumer side of concurrent queues
96///
97/// @warning A single Consumer must not be used from multiple threads concurrently
98///
99/// @see @ref concurrent_queues
100template <typename QueueType, typename ConsumerToken, typename EmplaceEnablerType>
101class Consumer final {
102 static_assert(
103 std::is_same_v<EmplaceEnablerType, typename QueueType::EmplaceEnabler>,
104 "Do not instantiate Consumer on your own. Use Consumer type alias "
105 "from queue"
106 );
107
108 using ValueType = typename QueueType::ValueType;
109
110public:
111 Consumer(const Consumer&) = delete;
112 Consumer(Consumer&&) noexcept = default;
113 Consumer& operator=(const Consumer&) = delete;
114 Consumer& operator=(Consumer&& other) noexcept {
115 queue_.swap(other.queue_);
116 std::swap(token_, other.token_);
117 return *this;
118 }
119
120 ~Consumer() {
121 if (queue_) queue_->MarkConsumerIsDead();
122 }
123
124 /// @brief Pop an element from queue.
125 /// May wait asynchronously if the queue is empty, but the producer is alive.
126 ///
127 /// @returns whether something was popped before the deadline or before a task cancellation.
128 /// `false` can be returned before the deadline when the producer is no longer alive.
129 /// @warning Be careful when using a method in a loop. The
130 /// `engine::Deadline` is a wrapper over `std::chrono::time_point`, not
131 /// `duration`! If you need a timeout, you must reconstruct the deadline in the loop.
132 [[nodiscard]] bool Pop(ValueType& value, engine::Deadline deadline = {}) const {
133 UASSERT_MSG(queue_, "Trying to use a moved-from queue Consumer");
134 UASSERT_MSG(engine::current_task::IsTaskProcessorThread(), "Use PopNoblock for non-coroutine consumers");
135 return queue_->Pop(token_, value, deadline);
136 }
137
138 /// @brief Try to pop an element from queue without blocking.
139 /// May be used in non-coroutine environment.
140 ///
141 /// @returns whether something was popped.
142 [[nodiscard]] bool PopNoblock(ValueType& value) const {
143 UASSERT_MSG(queue_, "Trying to use a moved-from queue Consumer");
144 return queue_->PopNoblock(token_, value);
145 }
146
147 void Reset() && {
148 if (queue_) queue_->MarkConsumerIsDead();
149 queue_.reset();
150 [[maybe_unused]] ConsumerToken for_destruction = std::move(token_);
151 }
152
153 /// Const access to source queue.
154 [[nodiscard]] std::shared_ptr<const QueueType> Queue() const { return {queue_}; }
155
156 /// @cond
157 // For internal use only
158 Consumer(std::shared_ptr<QueueType> queue, EmplaceEnablerType /*unused*/)
159 : queue_(std::move(queue)), token_(queue_->queue_) {}
160 /// @endcond
161
162private:
163 std::shared_ptr<QueueType> queue_{};
164 mutable ConsumerToken token_;
165};
166
167} // namespace concurrent
168
169USERVER_NAMESPACE_END