userver: userver/concurrent/mutex_set.hpp Source File
Loading...
Searching...
No Matches
mutex_set.hpp
Go to the documentation of this file.
1#pragma once
2
3/// @file userver/concurrent/mutex_set.hpp
4/// @brief @copybrief concurrent::MutexSet
5
6#include <chrono>
7#include <functional>
8#include <string>
9#include <unordered_set>
10#include <utility>
11
12#include <userver/engine/condition_variable.hpp>
13#include <userver/engine/mutex.hpp>
14#include <userver/engine/task/cancel.hpp>
15#include <userver/utils/cached_hash.hpp>
16#include <userver/utils/fixed_array.hpp>
17
18USERVER_NAMESPACE_BEGIN
19
20namespace concurrent {
21
22namespace impl {
23
24template <typename T, typename Equal>
25struct MutexDatum final {
26 explicit MutexDatum(size_t way_size, const Equal& equal = Equal{})
27 : set(way_size, {}, equal)
28 {}
29
30 ~MutexDatum() { UASSERT_MSG(set.empty(), "MutexDatum is destroyed while someone is holding the lock"); }
31
32 engine::Mutex mutex;
33 engine::ConditionVariable cv;
34 std::unordered_set<T, std::hash<T>, Equal> set;
35};
36
37} // namespace impl
38
39/// Mutex-like object associated with the key of a MutexSet. It provides the
40/// same interface as engine::Mutex, you may use it with std::unique_lock,
41/// std::lock_guard, etc.
42/// @note different ItemMutex'es of the same MutexSet obtained with the same
43/// argument to GetMutexForKey() share the same critical section. IOW, if
44/// the first mutex is locked, the second one will block until the first
45/// mutex is unlocked.
46/// @note can be used only from coroutines.
47template <typename Key, typename Equal>
48class ItemMutex final {
49public:
50 using HashAndKey = utils::CachedHash<Key>;
51
52 using MutexDatum = impl::MutexDatum<HashAndKey, utils::CachedHashKeyEqual<Equal>>;
53
54 ItemMutex(MutexDatum& md, HashAndKey&& key);
55
56 void lock();
57
58 void unlock();
59
60 bool try_lock();
61
62 template <typename Rep, typename Period>
63 bool try_lock_for(std::chrono::duration<Rep, Period>);
64
65 template <typename Clock, typename Duration>
66 bool try_lock_until(std::chrono::time_point<Clock, Duration>);
67
68private:
69 bool TryFinishLocking();
70
71 MutexDatum& md_;
72 const HashAndKey key_;
73};
74
75/// @ingroup userver_concurrency userver_containers
76///
77/// @brief A dynamic set of mutexes
78///
79/// It can be used for separate critical sections for
80/// multiple keys when the key set is not known at compile time and may change
81/// in runtime.
82///
83/// Example:
84/// @snippet src/concurrent/mutex_set_test.cpp Sample mutex set usage
85template <typename Key = std::string, typename Hash = std::hash<Key>, typename Equal = std::equal_to<Key>>
86class MutexSet final : Hash {
87public:
88 explicit MutexSet(size_t ways = 1, size_t way_size = 1, const Hash& hash = Hash{}, const Equal& equal = Equal{});
89
90 /// Get the mutex-like object for a key. Coroutine-safe.
91 /// @note the returned object holds a reference to MutexSet, so make sure
92 /// that MutexSet is alive while you're working with ItemMutex.
93 ItemMutex<Key, Equal> GetMutexForKey(Key key);
94
95private:
96 using MutexDatum = typename ItemMutex<Key, Equal>::MutexDatum;
97 utils::FixedArray<MutexDatum> mutex_data_;
98};
99
100template <typename Key, typename Hash, typename Equal>
101MutexSet<Key, Hash, Equal>::MutexSet(size_t ways, size_t way_size, const Hash& hash, const Equal& equal)
102 : Hash(hash),
103 mutex_data_(ways, way_size, utils::CachedHashKeyEqual<Equal>{equal})
104{}
105
106template <typename Key, typename Hash, typename Equal>
107ItemMutex<Key, Equal> MutexSet<Key, Hash, Equal>::GetMutexForKey(Key key) {
108 const auto hash_value = Hash::operator()(key);
109 const auto size = mutex_data_.size();
110
111 // Compilers optimize a % b and a / b to a single div operation
112 const auto way = hash_value % size;
113 const auto new_hash = hash_value / size;
114
115 return ItemMutex<Key, Equal>(mutex_data_[way], {new_hash, std::move(key)});
116}
117
118template <typename Key, typename Equal>
119ItemMutex<Key, Equal>::ItemMutex(MutexDatum& md, HashAndKey&& key)
120 : md_(md),
121 key_(std::move(key))
122{}
123
124template <typename Key, typename Equal>
125void ItemMutex<Key, Equal>::lock() {
126 const engine::TaskCancellationBlocker blocker;
127 std::unique_lock<engine::Mutex> lock(md_.mutex);
128
129 [[maybe_unused]] auto is_locked = md_.cv.Wait(lock, [this] { return TryFinishLocking(); });
130 UASSERT(is_locked);
131}
132
133template <typename Key, typename Equal>
134void ItemMutex<Key, Equal>::unlock() {
135 std::unique_lock lock(md_.mutex);
136
137 // Forcing the destructor of the node to run outside of the critical section
138 [[maybe_unused]] auto node = md_.set.extract(key_);
139
140 // We must wakeup all the waiters, because otherwise we can wake up the wrong
141 // one and loose the wakeup:
142 //
143 // Task #1 waits for mutex A, task #2 waits for mutex B
144 // Task #3
145 // * unlocks mutex A
146 // * does NotifyOne
147 // * wakeups thread #2
148 // * mutex B is still locked, task #2 goes to sleep
149 // * we lost the wakeup :(
150 md_.cv.NotifyAll();
151
152 lock.unlock();
153
154 // Destructor of node is run here
155}
156
157template <typename Key, typename Equal>
158bool ItemMutex<Key, Equal>::try_lock() {
159 const std::unique_lock<engine::Mutex> lock(md_.mutex);
160 return TryFinishLocking();
161}
162
163template <typename Key, typename Equal>
164template <typename Rep, typename Period>
165bool ItemMutex<Key, Equal>::try_lock_for(std::chrono::duration<Rep, Period> duration) {
166 std::unique_lock<engine::Mutex> lock(md_.mutex);
167 return md_.cv.WaitFor(lock, duration, [this] { return TryFinishLocking(); });
168}
169
170template <typename Key, typename Equal>
171template <typename Clock, typename Duration>
172bool ItemMutex<Key, Equal>::try_lock_until(std::chrono::time_point<Clock, Duration> time_point) {
173 std::unique_lock<engine::Mutex> lock(md_.mutex);
174 return md_.cv.WaitUntil(lock, time_point, [this] { return TryFinishLocking(); });
175}
176
177template <typename Key, typename Equal>
178bool ItemMutex<Key, Equal>::TryFinishLocking() {
179 return md_.set.insert(key_).second;
180}
181
182} // namespace concurrent
183
184USERVER_NAMESPACE_END