userver: userver/concurrent/queue_helpers.hpp Source File
Loading...
Searching...
No Matches
queue_helpers.hpp
1#pragma once
2
3#include <memory>
4
5#include <userver/engine/deadline.hpp>
6
7USERVER_NAMESPACE_BEGIN
8
9namespace concurrent {
10
11namespace impl {
12
13struct NoToken final {
14 template <typename LockFreeQueue>
15 explicit NoToken(LockFreeQueue& /*unused*/) {}
16};
17
18struct MultiToken final {
19 template <typename LockFreeQueue>
20 explicit MultiToken(LockFreeQueue& /*unused*/) {}
21};
22
23} // namespace impl
24
25/// @warning A single Producer must not be used from multiple threads
26/// concurrently
27template <typename QueueType, typename ProducerToken,
28 typename EmplaceEnablerType>
29class Producer final {
30 static_assert(
31 std::is_same_v<EmplaceEnablerType, typename QueueType::EmplaceEnabler>,
32 "Do not instantiate Producer on your own. Use Producer type alias "
33 "from queue");
34
35 using ValueType = typename QueueType::ValueType;
36
37 public:
38 Producer(const Producer&) = delete;
39 Producer(Producer&&) noexcept = default;
40 Producer& operator=(const Producer&) = delete;
41 Producer& operator=(Producer&&) noexcept = default;
42
43 ~Producer() {
44 if (queue_) queue_->MarkProducerIsDead();
45 }
46
47 /// Push element into queue. May wait asynchronously if the queue is full.
48 /// Leaves the `value` unmodified if the operation does not succeed.
49 /// @returns whether push succeeded before the deadline and before the task
50 /// was canceled.
51 [[nodiscard]] bool Push(ValueType&& value,
52 engine::Deadline deadline = {}) const {
53 UASSERT(queue_);
54 return queue_->Push(token_, std::move(value), deadline);
55 }
56
57 /// Try to push element into queue without blocking. May be used in
58 /// non-coroutine environment. Leaves the `value` unmodified if the operation
59 /// does not succeed.
60 /// @returns whether push succeeded.
61 [[nodiscard]] bool PushNoblock(ValueType&& value) const {
62 UASSERT(queue_);
63 return queue_->PushNoblock(token_, std::move(value));
64 }
65
66 void Reset() && {
67 if (queue_) queue_->MarkProducerIsDead();
68 queue_.reset();
69 [[maybe_unused]] ProducerToken for_destruction = std::move(token_);
70 }
71
72 /// Const access to source queue.
73 [[nodiscard]] std::shared_ptr<const QueueType> Queue() const {
74 return {queue_};
75 }
76
77 /// @cond
78 // For internal use only
79 Producer(std::shared_ptr<QueueType> queue, EmplaceEnablerType /*unused*/)
80 : queue_(std::move(queue)), token_(queue_->queue_) {}
81 /// @endcond
82
83 private:
84 std::shared_ptr<QueueType> queue_;
85 mutable ProducerToken token_;
86};
87
88/// @warning A single Consumer must not be used from multiple threads
89/// concurrently
90template <typename QueueType, typename ConsumerToken,
91 typename EmplaceEnablerType>
92class Consumer final {
93 static_assert(
94 std::is_same_v<EmplaceEnablerType, typename QueueType::EmplaceEnabler>,
95 "Do not instantiate Consumer on your own. Use Consumer type alias "
96 "from queue");
97
98 using ValueType = typename QueueType::ValueType;
99
100 public:
101 Consumer(const Consumer&) = delete;
102 Consumer(Consumer&&) noexcept = default;
103 Consumer& operator=(const Consumer&) = delete;
104 Consumer& operator=(Consumer&&) noexcept = default;
105
106 ~Consumer() {
107 if (queue_) queue_->MarkConsumerIsDead();
108 }
109
110 /// Pop element from queue. May wait asynchronously if the queue is empty,
111 /// but the producer is alive.
112 /// @returns whether something was popped before the deadline.
113 /// @note `false` can be returned before the deadline
114 /// when the producer is no longer alive.
115 [[nodiscard]] bool Pop(ValueType& value,
116 engine::Deadline deadline = {}) const {
117 return queue_->Pop(token_, value, deadline);
118 }
119
120 /// Try to pop element from queue without blocking. May be used in
121 /// non-coroutine environment
122 /// @return whether something was popped.
123 [[nodiscard]] bool PopNoblock(ValueType& value) const {
124 return queue_->PopNoblock(token_, value);
125 }
126
127 /// Const access to source queue.
128 [[nodiscard]] std::shared_ptr<const QueueType> Queue() const {
129 return {queue_};
130 }
131
132 /// @cond
133 // For internal use only
134 Consumer(std::shared_ptr<QueueType> queue, EmplaceEnablerType /*unused*/)
135 : queue_(std::move(queue)), token_(queue_->queue_) {}
136 /// @endcond
137
138 private:
139 std::shared_ptr<QueueType> queue_{};
140 mutable ConsumerToken token_;
141};
142
143} // namespace concurrent
144
145USERVER_NAMESPACE_END