userver: userver/concurrent/queue_helpers.hpp Source File
Loading...
Searching...
No Matches
queue_helpers.hpp
1#pragma once
2
3#include <memory>
4
5#include <userver/engine/deadline.hpp>
6#include <userver/engine/task/current_task.hpp>
7#include <userver/utils/assert.hpp>
8
9USERVER_NAMESPACE_BEGIN
10
11namespace concurrent {
12
13namespace impl {
14
15struct NoToken final {
16 template <typename LockFreeQueue>
17 explicit NoToken(LockFreeQueue& /*unused*/) {}
18};
19
20struct MultiToken final {
21 template <typename LockFreeQueue>
22 explicit MultiToken(LockFreeQueue& /*unused*/) {}
23};
24
25} // namespace impl
26
27/// @warning A single Producer must not be used from multiple threads
28/// concurrently
29template <typename QueueType, typename ProducerToken, typename EmplaceEnablerType>
30class Producer final {
31 static_assert(
32 std::is_same_v<EmplaceEnablerType, typename QueueType::EmplaceEnabler>,
33 "Do not instantiate Producer on your own. Use Producer type alias "
34 "from queue"
35 );
36
37 using ValueType = typename QueueType::ValueType;
38
39public:
40 Producer(const Producer&) = delete;
41 Producer(Producer&&) noexcept = default;
42 Producer& operator=(const Producer&) = delete;
43 Producer& operator=(Producer&& other) noexcept {
44 queue_.swap(other.queue_);
45 std::swap(token_, other.token_);
46 return *this;
47 }
48
49 ~Producer() {
50 if (queue_) queue_->MarkProducerIsDead();
51 }
52
53 /// Push element into queue. May wait asynchronously if the queue is full.
54 /// Leaves the `value` unmodified if the operation does not succeed.
55 /// @returns whether push succeeded before the deadline and before the task
56 /// was canceled.
57 [[nodiscard]] bool Push(ValueType&& value, engine::Deadline deadline = {}) const {
58 UASSERT_MSG(queue_, "Trying to use a moved-from queue Producer");
59 UASSERT_MSG(engine::current_task::IsTaskProcessorThread(), "Use PushNoblock for non-coroutine producers");
60 return queue_->Push(token_, std::move(value), deadline);
61 }
62
63 /// Try to push element into queue without blocking. May be used in
64 /// non-coroutine environment. Leaves the `value` unmodified if the operation
65 /// does not succeed.
66 /// @returns whether push succeeded.
67 [[nodiscard]] bool PushNoblock(ValueType&& value) const {
68 UASSERT_MSG(queue_, "Trying to use a moved-from queue Producer");
69 return queue_->PushNoblock(token_, std::move(value));
70 }
71
72 void Reset() && noexcept {
73 if (queue_) queue_->MarkProducerIsDead();
74 queue_.reset();
75 [[maybe_unused]] ProducerToken for_destruction = std::move(token_);
76 }
77
78 /// Const access to source queue.
79 [[nodiscard]] std::shared_ptr<const QueueType> Queue() const { return {queue_}; }
80
81 /// @cond
82 // For internal use only
83 Producer(std::shared_ptr<QueueType> queue, EmplaceEnablerType /*unused*/)
84 : queue_(std::move(queue)), token_(queue_->queue_) {}
85 /// @endcond
86
87private:
88 std::shared_ptr<QueueType> queue_;
89 mutable ProducerToken token_;
90};
91
92/// @warning A single Consumer must not be used from multiple threads
93/// concurrently
94template <typename QueueType, typename ConsumerToken, typename EmplaceEnablerType>
95class Consumer final {
96 static_assert(
97 std::is_same_v<EmplaceEnablerType, typename QueueType::EmplaceEnabler>,
98 "Do not instantiate Consumer on your own. Use Consumer type alias "
99 "from queue"
100 );
101
102 using ValueType = typename QueueType::ValueType;
103
104public:
105 Consumer(const Consumer&) = delete;
106 Consumer(Consumer&&) noexcept = default;
107 Consumer& operator=(const Consumer&) = delete;
108 Consumer& operator=(Consumer&& other) noexcept {
109 queue_.swap(other.queue_);
110 std::swap(token_, other.token_);
111 return *this;
112 }
113
114 ~Consumer() {
115 if (queue_) queue_->MarkConsumerIsDead();
116 }
117
118 /// Pop element from queue. May wait asynchronously if the queue is empty,
119 /// but the producer is alive.
120 /// @returns whether something was popped before the deadline.
121 /// @note `false` can be returned before the deadline when the producer is no
122 /// longer alive.
123 /// @warning Be careful when using a method in a loop. The
124 /// `engine::Deadline` is a wrapper over `std::chrono::time_point`, not
125 /// `duration`! If you need a timeout, you must reconstruct the deadline in
126 /// the loop.
127 [[nodiscard]] bool Pop(ValueType& value, engine::Deadline deadline = {}) const {
128 UASSERT_MSG(queue_, "Trying to use a moved-from queue Consumer");
129 UASSERT_MSG(engine::current_task::IsTaskProcessorThread(), "Use PopNoblock for non-coroutine consumers");
130 return queue_->Pop(token_, value, deadline);
131 }
132
133 /// Try to pop element from queue without blocking. May be used in
134 /// non-coroutine environment
135 /// @return whether something was popped.
136 [[nodiscard]] bool PopNoblock(ValueType& value) const {
137 UASSERT_MSG(queue_, "Trying to use a moved-from queue Consumer");
138 return queue_->PopNoblock(token_, value);
139 }
140
141 void Reset() && {
142 if (queue_) queue_->MarkConsumerIsDead();
143 queue_.reset();
144 [[maybe_unused]] ConsumerToken for_destruction = std::move(token_);
145 }
146
147 /// Const access to source queue.
148 [[nodiscard]] std::shared_ptr<const QueueType> Queue() const { return {queue_}; }
149
150 /// @cond
151 // For internal use only
152 Consumer(std::shared_ptr<QueueType> queue, EmplaceEnablerType /*unused*/)
153 : queue_(std::move(queue)), token_(queue_->queue_) {}
154 /// @endcond
155
156private:
157 std::shared_ptr<QueueType> queue_{};
158 mutable ConsumerToken token_;
159};
160
161} // namespace concurrent
162
163USERVER_NAMESPACE_END