Github   Telegram
Loading...
Searching...
No Matches
queue_helpers.hpp
1#pragma once
2
3#include <memory>
4
5#include <userver/engine/deadline.hpp>
6
7USERVER_NAMESPACE_BEGIN
8
9namespace concurrent {
10
11namespace impl {
12
13struct NoToken final {
14 template <typename LockFreeQueue>
15 explicit NoToken(LockFreeQueue& /*unused*/) {}
16};
17
18} // namespace impl
19
20/// @warning A single Producer must not be used from multiple threads
21/// concurrently
22template <typename QueueType, typename ProducerToken,
23 typename EmplaceEnablerType>
24class Producer final {
25 static_assert(
26 std::is_same_v<EmplaceEnablerType, typename QueueType::EmplaceEnabler>,
27 "Do not instantiate Producer on your own. Use Producer type alias "
28 "from queue");
29
30 using ValueType = typename QueueType::ValueType;
31
32 public:
33 Producer(const Producer&) = delete;
34 Producer(Producer&&) noexcept = default;
35 Producer& operator=(const Producer&) = delete;
36 Producer& operator=(Producer&&) noexcept = default;
37
38 ~Producer() {
39 if (queue_) queue_->MarkProducerIsDead();
40 }
41
42 /// Push element into queue. May wait asynchronously if the queue is full.
43 /// Leaves the `value` unmodified if the operation does not succeed.
44 /// @returns whether push succeeded before the deadline and before the task
45 /// was canceled.
46 [[nodiscard]] bool Push(ValueType&& value,
47 engine::Deadline deadline = {}) const {
48 UASSERT(queue_);
49 return queue_->Push(token_, std::move(value), deadline);
50 }
51
52 /// Try to push element into queue without blocking. May be used in
53 /// non-coroutine environment. Leaves the `value` unmodified if the operation
54 /// does not succeed.
55 /// @returns whether push succeeded.
56 [[nodiscard]] bool PushNoblock(ValueType&& value) const {
57 UASSERT(queue_);
58 return queue_->PushNoblock(token_, std::move(value));
59 }
60
61 void Reset() && {
62 if (queue_) queue_->MarkProducerIsDead();
63 queue_.reset();
64 }
65
66 /// Const access to source queue.
67 [[nodiscard]] std::shared_ptr<const QueueType> Queue() const {
68 return {queue_};
69 }
70
71 /// @cond
72 // For internal use only
73 Producer(std::shared_ptr<QueueType> queue, EmplaceEnablerType /*unused*/)
74 : queue_(std::move(queue)), token_(queue_->queue_) {}
75 /// @endcond
76
77 private:
78 std::shared_ptr<QueueType> queue_;
79 mutable ProducerToken token_;
80};
81
82/// @warning A single Consumer must not be used from multiple threads
83/// concurrently
84template <typename QueueType, typename EmplaceEnablerType>
85class Consumer final {
86 static_assert(
87 std::is_same_v<EmplaceEnablerType, typename QueueType::EmplaceEnabler>,
88 "Do not instantiate Consumer on your own. Use Consumer type alias "
89 "from queue");
90
91 using ValueType = typename QueueType::ValueType;
92 using ConsumerToken = typename QueueType::ConsumerToken;
93
94 public:
95 Consumer(const Consumer&) = delete;
96 Consumer(Consumer&&) noexcept = default;
97 Consumer& operator=(const Consumer&) = delete;
98 Consumer& operator=(Consumer&&) noexcept = default;
99
100 ~Consumer() {
101 if (queue_) queue_->MarkConsumerIsDead();
102 }
103
104 /// Pop element from queue. May wait asynchronously if the queue is empty,
105 /// but the producer is alive.
106 /// @returns whether something was popped before the deadline.
107 /// @note `false` can be returned before the deadline
108 /// when the producer is no longer alive.
109 [[nodiscard]] bool Pop(ValueType& value,
110 engine::Deadline deadline = {}) const {
111 return queue_->Pop(token_, value, deadline);
112 }
113
114 /// Try to pop element from queue without blocking. May be used in
115 /// non-coroutine environment
116 /// @return whether something was popped.
117 [[nodiscard]] bool PopNoblock(ValueType& value) const {
118 return queue_->PopNoblock(token_, value);
119 }
120
121 /// Const access to source queue.
122 [[nodiscard]] std::shared_ptr<const QueueType> Queue() const {
123 return {queue_};
124 }
125
126 /// @cond
127 // For internal use only
128 Consumer(std::shared_ptr<QueueType> queue, EmplaceEnablerType /*unused*/)
129 : queue_(std::move(queue)), token_(queue_->queue_) {}
130 /// @endcond
131
132 private:
133 std::shared_ptr<QueueType> queue_{};
134 mutable ConsumerToken token_;
135};
136
137} // namespace concurrent
138
139USERVER_NAMESPACE_END