userver: userver/concurrent/impl/semaphore_capacity_control.hpp Source File
Loading...
Searching...
No Matches
semaphore_capacity_control.hpp
1#pragma once
2
3#include <atomic>
4
5#include <userver/engine/semaphore.hpp>
6
7USERVER_NAMESPACE_BEGIN
8
9namespace concurrent::impl {
10
11// Used by concurrent queues. Context:
12// - engine::Semaphore controls queue size, e.g. a writer
13// takes 1 from the semaphore before pushing an element
14// - queue size can be changed at any time (e.g. by a user dynamic_config hook)
15// - the queue may need to "unblock" itself by overriding the capacity to +inf
16// - while the capacity is temporarily overridden, a "normal" capacity
17// update may arrive
18// - such an update must not be missed and must take effect after the queue
19// is "unblocked"
20class SemaphoreCapacityControl final {
21 public:
22 using Counter = engine::Semaphore::Counter;
23 static constexpr Counter kOverrideDisabled = -1;
24
25 explicit SemaphoreCapacityControl(engine::CancellableSemaphore& semaphore);
26
27 // These methods may be called concurrently from N threads.
28 void SetCapacity(Counter capacity);
29 Counter GetCapacity() const noexcept;
30
31 // These methods may be called from 1 thread at a time, potentially
32 // concurrently with *Capacity.
33 void SetCapacityOverride(Counter capacity);
34 void RemoveCapacityOverride();
35
36 private:
37 void UpdateSemaphoreCapacity() const;
38
39 engine::CancellableSemaphore& semaphore_;
40 std::atomic<Counter> capacity_requested_;
41 std::atomic<Counter> capacity_override_{kOverrideDisabled};
42};
43
44} // namespace concurrent::impl
45
46USERVER_NAMESPACE_END