userver: userver/concurrent/mutex_set.hpp Source File
Loading...
Searching...
No Matches
mutex_set.hpp
Go to the documentation of this file.
1#pragma once
2
3/// @file userver/concurrent/mutex_set.hpp
4/// @brief @copybrief concurrent::MutexSet
5
6#include <chrono>
7#include <functional>
8#include <string>
9#include <unordered_set>
10#include <utility>
11
12#include <userver/engine/condition_variable.hpp>
13#include <userver/engine/mutex.hpp>
14#include <userver/engine/task/cancel.hpp>
15#include <userver/utils/cached_hash.hpp>
16#include <userver/utils/fixed_array.hpp>
17
18USERVER_NAMESPACE_BEGIN
19
20namespace concurrent {
21
22namespace impl {
23
24template <typename T, typename Equal>
25struct MutexDatum final {
26 explicit MutexDatum(size_t way_size, const Equal& equal = Equal{})
27 : set(way_size, {}, equal) {}
28
29 ~MutexDatum() {
30 UASSERT_MSG(set.empty(),
31 "MutexDatum is destroyed while someone is holding the lock");
32 }
33
34 engine::Mutex mutex;
35 engine::ConditionVariable cv;
36 std::unordered_set<T, std::hash<T>, Equal> set;
37};
38
39} // namespace impl
40
41/// Mutex-like object associated with the key of a MutexSet. It provides the
42/// same interface as engine::Mutex, you may use it with std::unique_lock,
43/// std::lock_guard, etc.
44/// @note different ItemMutex'es of the same MutexSet obtained with the same
45/// argument to GetMutexForKey() share the same critical section. IOW, if
46/// the first mutex is locked, the second one will block until the first
47/// mutex is unlocked.
48/// @note can be used only from coroutines.
49template <typename Key, typename Equal>
50class ItemMutex final {
51 public:
52 using HashAndKey = utils::CachedHash<Key>;
53
54 using MutexDatum =
55 impl::MutexDatum<HashAndKey, utils::CachedHashKeyEqual<Equal>>;
56
57 ItemMutex(MutexDatum& md, HashAndKey&& key);
58
59 void lock();
60
61 void unlock();
62
63 bool try_lock();
64
65 template <typename Rep, typename Period>
66 bool try_lock_for(std::chrono::duration<Rep, Period>);
67
68 template <typename Clock, typename Duration>
69 bool try_lock_until(std::chrono::time_point<Clock, Duration>);
70
71 private:
72 bool TryFinishLocking();
73
74 MutexDatum& md_;
75 const HashAndKey key_;
76};
77
78/// @ingroup userver_concurrency userver_containers
79///
80/// @brief A dynamic set of mutexes
81///
82/// It can be used for separate critical sections for
83/// multiple keys when the key set is not known at compile time and may change
84/// in runtime.
85///
86/// Example:
87/// @snippet src/concurrent/mutex_set_test.cpp Sample mutex set usage
88template <typename Key = std::string, typename Hash = std::hash<Key>,
89 typename Equal = std::equal_to<Key>>
90class MutexSet final : Hash {
91 public:
92 explicit MutexSet(size_t ways = 1, size_t way_size = 1,
93 const Hash& hash = Hash{}, const Equal& equal = Equal{});
94
95 /// Get the mutex-like object for a key. Coroutine-safe.
96 /// @note the returned object holds a reference to MutexSet, so make sure
97 /// that MutexSet is alive while you're working with ItemMutex.
98 ItemMutex<Key, Equal> GetMutexForKey(Key key);
99
100 private:
101 using MutexDatum = typename ItemMutex<Key, Equal>::MutexDatum;
102 utils::FixedArray<MutexDatum> mutex_data_;
103};
104
105template <typename Key, typename Hash, typename Equal>
106MutexSet<Key, Hash, Equal>::MutexSet(size_t ways, size_t way_size,
107 const Hash& hash, const Equal& equal)
108 : Hash(hash),
109 mutex_data_(ways, way_size, utils::CachedHashKeyEqual<Equal>{equal}) {}
110
111template <typename Key, typename Hash, typename Equal>
112ItemMutex<Key, Equal> MutexSet<Key, Hash, Equal>::GetMutexForKey(Key key) {
113 const auto hash_value = Hash::operator()(key);
114 const auto size = mutex_data_.size();
115
116 // Compilers optimize a % b and a / b to a single div operation
117 const auto way = hash_value % size;
118 const auto new_hash = hash_value / size;
119
120 return ItemMutex<Key, Equal>(mutex_data_[way], {new_hash, std::move(key)});
121}
122
123template <typename Key, typename Equal>
124ItemMutex<Key, Equal>::ItemMutex(MutexDatum& md, HashAndKey&& key)
125 : md_(md), key_(std::move(key)) {}
126
127template <typename Key, typename Equal>
128void ItemMutex<Key, Equal>::lock() {
129 engine::TaskCancellationBlocker blocker;
130 std::unique_lock<engine::Mutex> lock(md_.mutex);
131
132 [[maybe_unused]] auto is_locked =
133 md_.cv.Wait(lock, [this] { return TryFinishLocking(); });
134 UASSERT(is_locked);
135}
136
137template <typename Key, typename Equal>
138void ItemMutex<Key, Equal>::unlock() {
139 std::unique_lock lock(md_.mutex);
140
141 // Forcing the destructor of the node to run outside of the critical section
142 [[maybe_unused]] auto node = md_.set.extract(key_);
143
144 // We must wakeup all the waiters, because otherwise we can wake up the wrong
145 // one and loose the wakeup:
146 //
147 // Task #1 waits for mutex A, task #2 waits for mutex B
148 // Task #3
149 // * unlocks mutex A
150 // * does NotifyOne
151 // * wakeups thread #2
152 // * mutex B is still locked, task #2 goes to sleep
153 // * we lost the wakeup :(
154 md_.cv.NotifyAll();
155
156 lock.unlock();
157
158 // Destructor of node is run here
159}
160
161template <typename Key, typename Equal>
162bool ItemMutex<Key, Equal>::try_lock() {
163 std::unique_lock<engine::Mutex> lock(md_.mutex);
164 return TryFinishLocking();
165}
166
167template <typename Key, typename Equal>
168template <typename Rep, typename Period>
169bool ItemMutex<Key, Equal>::try_lock_for(
170 std::chrono::duration<Rep, Period> duration) {
171 std::unique_lock<engine::Mutex> lock(md_.mutex);
172 return md_.cv.WaitFor(lock, duration, [this] { return TryFinishLocking(); });
173}
174
175template <typename Key, typename Equal>
176template <typename Clock, typename Duration>
177bool ItemMutex<Key, Equal>::try_lock_until(
178 std::chrono::time_point<Clock, Duration> time_point) {
179 std::unique_lock<engine::Mutex> lock(md_.mutex);
180 return md_.cv.WaitUntil(lock, time_point,
181 [this] { return TryFinishLocking(); });
182}
183
184template <typename Key, typename Equal>
185bool ItemMutex<Key, Equal>::TryFinishLocking() {
186 return md_.set.insert(key_).second;
187}
188
189} // namespace concurrent
190
191USERVER_NAMESPACE_END