userver: userver/concurrent/queue_helpers.hpp Source File
⚠️ This is the documentation for an old userver version. Click here to switch to the latest version.
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages Concepts
queue_helpers.hpp
1#pragma once
2
3#include <memory>
4
5#include <userver/engine/deadline.hpp>
6
7USERVER_NAMESPACE_BEGIN
8
9namespace concurrent {
10
11namespace impl {
12
13struct NoToken final {
14 template <typename LockFreeQueue>
15 explicit NoToken(LockFreeQueue& /*unused*/) {}
16};
17
18struct MultiToken final {
19 template <typename LockFreeQueue>
20 explicit MultiToken(LockFreeQueue& /*unused*/) {}
21};
22
23} // namespace impl
24
25/// @warning A single Producer must not be used from multiple threads
26/// concurrently
27template <typename QueueType, typename ProducerToken,
28 typename EmplaceEnablerType>
29class Producer final {
30 static_assert(
31 std::is_same_v<EmplaceEnablerType, typename QueueType::EmplaceEnabler>,
32 "Do not instantiate Producer on your own. Use Producer type alias "
33 "from queue");
34
35 using ValueType = typename QueueType::ValueType;
36
37 public:
38 Producer(const Producer&) = delete;
39 Producer(Producer&&) noexcept = default;
40 Producer& operator=(const Producer&) = delete;
41 Producer& operator=(Producer&& other) noexcept {
42 queue_.swap(other.queue_);
43 std::swap(token_, other.token_);
44 return *this;
45 }
46
47 ~Producer() {
48 if (queue_) queue_->MarkProducerIsDead();
49 }
50
51 /// Push element into queue. May wait asynchronously if the queue is full.
52 /// Leaves the `value` unmodified if the operation does not succeed.
53 /// @returns whether push succeeded before the deadline and before the task
54 /// was canceled.
55 [[nodiscard]] bool Push(ValueType&& value,
56 engine::Deadline deadline = {}) const {
57 UASSERT(queue_);
58 return queue_->Push(token_, std::move(value), deadline);
59 }
60
61 /// Try to push element into queue without blocking. May be used in
62 /// non-coroutine environment. Leaves the `value` unmodified if the operation
63 /// does not succeed.
64 /// @returns whether push succeeded.
65 [[nodiscard]] bool PushNoblock(ValueType&& value) const {
66 UASSERT(queue_);
67 return queue_->PushNoblock(token_, std::move(value));
68 }
69
70 void Reset() && {
71 if (queue_) queue_->MarkProducerIsDead();
72 queue_.reset();
73 [[maybe_unused]] ProducerToken for_destruction = std::move(token_);
74 }
75
76 /// Const access to source queue.
77 [[nodiscard]] std::shared_ptr<const QueueType> Queue() const {
78 return {queue_};
79 }
80
81 /// @cond
82 // For internal use only
83 Producer(std::shared_ptr<QueueType> queue, EmplaceEnablerType /*unused*/)
84 : queue_(std::move(queue)), token_(queue_->queue_) {}
85 /// @endcond
86
87 private:
88 std::shared_ptr<QueueType> queue_;
89 mutable ProducerToken token_;
90};
91
92/// @warning A single Consumer must not be used from multiple threads
93/// concurrently
94template <typename QueueType, typename ConsumerToken,
95 typename EmplaceEnablerType>
96class Consumer final {
97 static_assert(
98 std::is_same_v<EmplaceEnablerType, typename QueueType::EmplaceEnabler>,
99 "Do not instantiate Consumer on your own. Use Consumer type alias "
100 "from queue");
101
102 using ValueType = typename QueueType::ValueType;
103
104 public:
105 Consumer(const Consumer&) = delete;
106 Consumer(Consumer&&) noexcept = default;
107 Consumer& operator=(const Consumer&) = delete;
108 Consumer& operator=(Consumer&& other) noexcept {
109 queue_.swap(other.queue_);
110 std::swap(token_, other.token_);
111 return *this;
112 }
113
114 ~Consumer() {
115 if (queue_) queue_->MarkConsumerIsDead();
116 }
117
118 /// Pop element from queue. May wait asynchronously if the queue is empty,
119 /// but the producer is alive.
120 /// @returns whether something was popped before the deadline.
121 /// @note `false` can be returned before the deadline
122 /// when the producer is no longer alive.
123 [[nodiscard]] bool Pop(ValueType& value,
124 engine::Deadline deadline = {}) const {
125 return queue_->Pop(token_, value, deadline);
126 }
127
128 /// Try to pop element from queue without blocking. May be used in
129 /// non-coroutine environment
130 /// @return whether something was popped.
131 [[nodiscard]] bool PopNoblock(ValueType& value) const {
132 return queue_->PopNoblock(token_, value);
133 }
134
135 void Reset() && {
136 if (queue_) queue_->MarkConsumerIsDead();
137 queue_.reset();
138 [[maybe_unused]] ConsumerToken for_destruction = std::move(token_);
139 }
140
141 /// Const access to source queue.
142 [[nodiscard]] std::shared_ptr<const QueueType> Queue() const {
143 return {queue_};
144 }
145
146 /// @cond
147 // For internal use only
148 Consumer(std::shared_ptr<QueueType> queue, EmplaceEnablerType /*unused*/)
149 : queue_(std::move(queue)), token_(queue_->queue_) {}
150 /// @endcond
151
152 private:
153 std::shared_ptr<QueueType> queue_{};
154 mutable ConsumerToken token_;
155};
156
157} // namespace concurrent
158
159USERVER_NAMESPACE_END