userver: userver/rcu/rcu.hpp Source File
Loading...
Searching...
No Matches
rcu.hpp
Go to the documentation of this file.
1#pragma once
2
3/// @file userver/rcu/rcu.hpp
4/// @brief Implementation of hazard pointer
5
6#include <atomic>
7#include <cstdlib>
8#include <list>
9#include <unordered_set>
10
11#include <userver/compiler/thread_local.hpp>
12#include <userver/engine/async.hpp>
13#include <userver/engine/mutex.hpp>
14#include <userver/logging/log.hpp>
15#include <userver/rcu/fwd.hpp>
16#include <userver/utils/assert.hpp>
17#include <userver/utils/impl/wait_token_storage.hpp>
18
19USERVER_NAMESPACE_BEGIN
20
21/// @brief Read-Copy-Update
22///
23/// @see Based on ideas from
24/// http://www.drdobbs.com/lock-free-data-structures-with-hazard-po/184401890
25/// with modified API
26namespace rcu {
27
28namespace impl {
29
30// Hazard pointer implementation. Pointers form a linked list. \p ptr points
31// to the data they 'hold', next - to the next element in a list.
32// kUsed is a filler value to show that hazard pointer is not free. Please see
33// comment to it. Please note, pointers do not form a linked list with
34// application-wide list, that is there is no 'static
35// std::atomic<HazardPointerRecord*> global_head' Every rcu::Variable has its
36// own list of hazard pointers. Thus, move-assignment on hazard pointers is
37// difficult to implement.
38template <typename T, typename RcuTraits>
39struct HazardPointerRecord final {
40 // You see, objects are created 'filled', that is for the purposes of hazard
41 // pointer list, they contain value. This eliminates some race conditions,
42 // because these algorithms checks for ptr != nullptr (And kUsed is not
43 // nullptr). Obviously, this value can't be used, when dereferenced it points
44 // somewhere into kernel space and will cause SEGFAULT
45 static inline T* const kUsed = reinterpret_cast<T*>(1);
46
47 explicit HazardPointerRecord(const Variable<T, RcuTraits>& owner)
48 : owner(owner) {}
49
50 std::atomic<T*> ptr = kUsed;
51 const Variable<T, RcuTraits>& owner;
52 std::atomic<HazardPointerRecord*> next{nullptr};
53
54 // Simple operation that marks this hazard pointer as no longer used.
55 void Release() { ptr = nullptr; }
56};
57
58template <typename T, typename RcuTraits>
59struct CachedData {
60 impl::HazardPointerRecord<T, RcuTraits>* hp{nullptr};
61 const Variable<T, RcuTraits>* variable{nullptr};
62 // ensures that `variable` points to the instance that filled the cache
63 uint64_t variable_epoch{0};
64};
65
66template <typename T, typename RcuTraits>
67inline compiler::ThreadLocal local_cached_data =
68 [] { return CachedData<T, RcuTraits>{}; };
69
70uint64_t GetNextEpoch() noexcept;
71
72} // namespace impl
73
74/// Default Rcu traits.
75/// - `MutexType` is a writer's mutex type that has to be used to protect
76/// structure on update
77template <typename T>
78struct DefaultRcuTraits {
79 using MutexType = engine::Mutex;
80};
81
82/// Reader smart pointer for rcu::Variable<T>. You may use operator*() or
83/// operator->() to do something with the stored value. Once created,
84/// ReadablePtr references the same immutable value: if Variable's value is
85/// changed during ReadablePtr lifetime, it will not affect value referenced by
86/// ReadablePtr.
87template <typename T, typename RcuTraits>
88class [[nodiscard]] ReadablePtr final {
89 public:
90 explicit ReadablePtr(const Variable<T, RcuTraits>& ptr)
91 : hp_record_(&ptr.MakeHazardPointer()) {
92 // This cycle guarantees that at the end of it both t_ptr_ and
93 // hp_record_->ptr will both be set to
94 // 1. something meaningful
95 // 2. and that this meaningful value was not removed between assigning to
96 // t_ptr_ and storing it in a hazard pointer
97 do {
98 t_ptr_ = ptr.GetCurrent();
99
100 hp_record_->ptr.store(t_ptr_);
101 } while (t_ptr_ != ptr.GetCurrent());
102 }
103
104 ReadablePtr(ReadablePtr<T, RcuTraits>&& other) noexcept
105 : t_ptr_(other.t_ptr_), hp_record_(other.hp_record_) {
106 other.t_ptr_ = nullptr;
107 }
108
109 ReadablePtr& operator=(ReadablePtr<T, RcuTraits>&& other) noexcept {
110 // What do we have here?
111 // 1. 'other' may point to the same variable - or to a different one.
112 // 2. therefore, its hazard pointer may belong to the same list,
113 // or to a different one.
114
115 // We can't move hazard pointers between different lists - there is simply
116 // no way to do this.
117 // We also can't simply swap two pointers inside hazard pointers and leave
118 // it be, because if 'other' is from different Variable, then our
119 // ScanRetiredList won't find any more pointer to current T* and will
120 // destroy object, while 'other' is still holding a hazard pointer to it.
121 // The thing is, whole ReadablePtr is just a glorified holder for
122 // hazard pointer + t_ptr_ just so that we don't have to load() atomics
123 // every time.
124 // so, lets just take pointer to hazard pointer inside other
125 UASSERT_MSG(this != &other, "Self assignment to RCU variable");
126 if (this == &other) {
127 // it is an error to assign to self. Do nothing in this case
128 return *this;
129 }
130
131 // Get rid of our current hp_record_
132 if (t_ptr_) {
133 hp_record_->Release();
134 }
135 // After that moment, the content of our hp_record_ can't be used -
136 // no more hp_record_->xyz calls, because it is probably already reused in
137 // some other ReadablePtr. Also, don't call t_ptr_, it is probably already
138 // freed. Just take values from 'other'.
139 hp_record_ = other.hp_record_;
140 t_ptr_ = other.t_ptr_;
141
142 // Now, it won't do us any good if there were two glorified things having
143 // pointer to same hp_record_. Kill the other one.
144 other.t_ptr_ = nullptr;
145 // We don't need to clean other.hp_record_, because other.t_ptr_ acts
146 // like a guard to it. As long as other.t_ptr_ is nullptr, nobody will
147 // use other.hp_record_
148
149 return *this;
150 }
151
152 ReadablePtr(const ReadablePtr<T, RcuTraits>& other)
153 : ReadablePtr(other.hp_record_->owner) {}
154
155 ReadablePtr& operator=(const ReadablePtr<T, RcuTraits>& other) {
156 if (this != &other) *this = ReadablePtr<T, RcuTraits>{other};
157 return *this;
158 }
159
160 ~ReadablePtr() {
161 if (!t_ptr_) return;
162 UASSERT(hp_record_ != nullptr);
163 hp_record_->Release();
164 }
165
166 const T* Get() const& {
167 UASSERT(t_ptr_);
168 return t_ptr_;
169 }
170
171 const T* Get() && { return GetOnRvalue(); }
172
173 const T* operator->() const& { return Get(); }
174 const T* operator->() && { return GetOnRvalue(); }
175
176 const T& operator*() const& { return *Get(); }
177 const T& operator*() && { return *GetOnRvalue(); }
178
179 private:
180 const T* GetOnRvalue() {
181 static_assert(!sizeof(T),
182 "Don't use temporary ReadablePtr, store it to a variable");
183 std::abort();
184 }
185
186 // This is a pointer to actual data. If it is null, then we treat it as
187 // an indicator that this ReadablePtr is cleared and won't call
188 // any logic associated with hp_record_
189 T* t_ptr_;
190 // Our hazard pointer. It can be nullptr in some circumstances.
191 // Invariant is this: if t_ptr_ is not nullptr, then hp_record_ is also
192 // not nullptr and points to hazard pointer containing same T*.
193 // Thus, if t_ptr_ is nullptr, then hp_record_ is undefined.
194 impl::HazardPointerRecord<T, RcuTraits>* hp_record_;
195};
196
197/// Smart pointer for rcu::Variable<T> for changing RCU value. It stores a
198/// reference to a to-be-changed value and allows one to mutate the value (e.g.
199/// add items to std::unordered_map). Changed value is not visible to readers
200/// until explicit store by Commit. Only a single writer may own a WritablePtr
201/// associated with the same Variable, so WritablePtr creates a critical
202/// section. This critical section doesn't affect readers, so a slow writer
203/// doesn't block readers.
204/// @note you may not pass WritablePtr between coroutines as it owns
205/// engine::Mutex, which must be unlocked in the same coroutine that was used to
206/// lock the mutex.
207template <typename T, typename RcuTraits>
208class [[nodiscard]] WritablePtr final {
209 public:
210 /// For internal use only. Use `var.StartWrite()` instead
211 explicit WritablePtr(Variable<T, RcuTraits>& var)
212 : var_(var),
213 lock_(var.mutex_),
215 LOG_TRACE() << "Start writing ptr=" << ptr_.get();
216 }
217
218 /// For internal use only. Use `var.Emplace(args...)` instead
219 template <typename... Args>
220 WritablePtr(Variable<T, RcuTraits>& var, std::in_place_t,
221 Args&&... initial_value_args)
222 : var_(var),
223 lock_(var.mutex_),
225 LOG_TRACE() << "Start writing ptr=" << ptr_.get()
226 << " with custom initial value";
227 }
228
229 WritablePtr(WritablePtr<T, RcuTraits>&& other) noexcept
230 : var_(other.var_),
231 lock_(std::move(other.lock_)),
232 ptr_(std::move(other.ptr_)) {
233 LOG_TRACE() << "Continue writing ptr=" << ptr_.get();
234 }
235
236 ~WritablePtr() {
237 if (ptr_) {
238 LOG_TRACE() << "Stop writing ptr=" << ptr_.get();
239 }
240 }
241
242 /// Store the changed value in Variable. After Commit() the value becomes
243 /// visible to new readers (IOW, Variable::Read() returns ReadablePtr
244 /// referencing the stored value, not an old value).
245 void Commit() {
246 UASSERT(ptr_ != nullptr);
247 LOG_TRACE() << "Committing ptr=" << ptr_.get();
248
249 std::unique_ptr<T> old_ptr(var_.current_.exchange(ptr_.release()));
250 var_.Retire(std::move(old_ptr), lock_);
251 lock_.unlock();
252 }
253
254 T* Get() & {
255 UASSERT(ptr_);
256 return ptr_.get();
257 }
258
259 T* Get() && { return GetOnRvalue(); }
260
261 T* operator->() & { return Get(); }
262 T* operator->() && { return GetOnRvalue(); }
263
264 T& operator*() & { return *Get(); }
265 T& operator*() && { return *GetOnRvalue(); }
266
267 private:
268 T* GetOnRvalue() {
269 static_assert(!sizeof(T),
270 "Don't use temporary WritablePtr, store it to a variable");
271 std::abort();
272 }
273
274 Variable<T, RcuTraits>& var_;
275 std::unique_lock<typename RcuTraits::MutexType> lock_;
276 std::unique_ptr<T> ptr_;
277};
278
279/// @brief Can be passed to `rcu::Variable` as the first argument to customize
280/// whether old values should be destroyed asynchronously.
281enum class DestructionType { kSync, kAsync };
282
283/// @ingroup userver_concurrency userver_containers
284///
285/// @brief Read-Copy-Update variable
286///
287/// @see Based on ideas from
288/// http://www.drdobbs.com/lock-free-data-structures-with-hazard-po/184401890
289/// with modified API.
290///
291/// A variable with MT-access pattern "very often reads, seldom writes". It is
292/// specially optimized for reads. On read, one obtains a ReaderPtr<T> from it
293/// and uses the obtained value as long as it wants to. On write, one obtains a
294/// WritablePtr<T> with a copy of the last version of the value, makes some
295/// changes to it, and commits the result to update current variable value (does
296/// Read-Copy-Update). Old version of the value is not freed on update, it will
297/// be eventually freed when a subsequent writer identifies that nobody works
298/// with this version.
299///
300/// @note There is no way to create a "null" `Variable`.
301///
302/// ## Example usage:
303///
304/// @snippet rcu/rcu_test.cpp Sample rcu::Variable usage
305///
306/// @see @ref scripts/docs/en/userver/synchronization.md
307template <typename T, typename RcuTraits>
308class Variable final {
309 public:
310 using MutexType = typename RcuTraits::MutexType;
311
312 /// Create a new `Variable` with an in-place constructed initial value.
313 /// Asynchronous destruction is enabled by default.
314 /// @param initial_value_args arguments passed to the constructor of the
315 /// initial value
316 template <typename... Args>
317 Variable(Args&&... initial_value_args)
318 : destruction_type_(std::is_trivially_destructible_v<T> ||
319 std::is_same_v<T, std::string> ||
320 !std::is_same_v<MutexType, engine::Mutex>
321 ? DestructionType::kSync
322 : DestructionType::kAsync),
323 epoch_(impl::GetNextEpoch()),
324 current_(new T(std::forward<Args>(initial_value_args)...)) {}
325
326 /// Create a new `Variable` with an in-place constructed initial value.
327 /// @param destruction_type controls whether destruction of old values should
328 /// be performed asynchronously
329 /// @param initial_value_args arguments passed to the constructor of the
330 /// initial value
331 template <typename... Args>
332 Variable(DestructionType destruction_type, Args&&... initial_value_args)
333 : destruction_type_(destruction_type),
334 epoch_(impl::GetNextEpoch()),
335 current_(new T(std::forward<Args>(initial_value_args)...)) {}
336
337 Variable(const Variable&) = delete;
338 Variable(Variable&&) = delete;
339 Variable& operator=(const Variable&) = delete;
340 Variable& operator=(Variable&&) = delete;
341
342 ~Variable() {
343 delete current_.load();
344
345 auto* hp = hp_record_head_.load();
346 while (hp) {
347 auto* next = hp->next.load();
348 UASSERT_MSG(hp->ptr == nullptr,
349 "RCU variable is destroyed while being used");
350 delete hp;
351 hp = next;
352 }
353
354 // Make sure all data is deleted after return from dtr
355 if (destruction_type_ == DestructionType::kAsync) {
356 wait_token_storage_.WaitForAllTokens();
357 }
358 }
359
360 /// Obtain a smart pointer which can be used to read the current value.
361 ReadablePtr<T, RcuTraits> Read() const {
362 return ReadablePtr<T, RcuTraits>(*this);
363 }
364
365 /// Obtain a copy of contained value.
366 T ReadCopy() const {
367 auto ptr = Read();
368 return *ptr;
369 }
370
371 /// Obtain a smart pointer that will *copy* the current value. The pointer can
372 /// be used to make changes to the value and to set the `Variable` to the
373 /// changed value.
374 WritablePtr<T, RcuTraits> StartWrite() {
375 return WritablePtr<T, RcuTraits>(*this);
376 }
377
378 /// Obtain a smart pointer to a newly in-place constructed value, but does
379 /// not replace the current one yet (in contrast with regular `Emplace`).
380 template <typename... Args>
381 WritablePtr<T, RcuTraits> StartWriteEmplace(Args&&... args) {
382 return WritablePtr<T, RcuTraits>(*this, std::in_place,
383 std::forward<Args>(args)...);
384 }
385
386 /// Replaces the `Variable`'s value with the provided one.
387 void Assign(T new_value) {
388 WritablePtr<T, RcuTraits>(*this, std::in_place, std::move(new_value))
389 .Commit();
390 }
391
392 /// Replaces the `Variable`'s value with an in-place constructed one.
393 template <typename... Args>
394 void Emplace(Args&&... args) {
395 WritablePtr<T, RcuTraits>(*this, std::in_place, std::forward<Args>(args)...)
396 .Commit();
397 }
398
399 void Cleanup() {
400 std::unique_lock lock(mutex_, std::try_to_lock);
401 if (!lock.owns_lock()) {
402 LOG_TRACE() << "Not cleaning up, someone else is holding the mutex lock";
403 // Someone is already changing the RCU
404 return;
405 }
406
407 ScanRetiredList(CollectHazardPtrs(lock));
408 }
409
410 private:
411 T* GetCurrent() const { return current_.load(); }
412
413 impl::HazardPointerRecord<T, RcuTraits>* MakeHazardPointerCached(
414 impl::CachedData<T, RcuTraits>& cache) const {
415 auto* hp = cache.hp;
416 T* ptr = nullptr;
417 if (hp && cache.variable == this && cache.variable_epoch == epoch_) {
418 if (hp->ptr.load() == nullptr &&
419 hp->ptr.compare_exchange_strong(
420 ptr, impl::HazardPointerRecord<T, RcuTraits>::kUsed)) {
421 return hp;
422 }
423 }
424
425 return nullptr;
426 }
427
428 impl::HazardPointerRecord<T, RcuTraits>* MakeHazardPointerFast() const {
429 // Look for any hazard pointer with nullptr data ptr.
430 // Mark it with kUsed (to reserve it for ourselves) and return it.
431 auto* hp = hp_record_head_.load();
432 while (hp) {
433 T* t_ptr = nullptr;
434 if (hp->ptr.load() == nullptr &&
435 hp->ptr.compare_exchange_strong(
436 t_ptr, impl::HazardPointerRecord<T, RcuTraits>::kUsed)) {
437 return hp;
438 }
439
440 hp = hp->next;
441 }
442 return nullptr;
443 }
444
445 impl::HazardPointerRecord<T, RcuTraits>& MakeHazardPointer() const {
446 auto cache = impl::local_cached_data<T, RcuTraits>.Use();
447 auto* hp = MakeHazardPointerCached(*cache);
448 if (!hp) {
449 hp = MakeHazardPointerFast();
450 // all buckets are full, create a new one
451 if (!hp) hp = MakeHazardPointerSlow();
452
453 cache->hp = hp;
454 cache->variable = this;
455 cache->variable_epoch = epoch_;
456 }
457 UASSERT(&hp->owner == this);
458 return *hp;
459 }
460
461 impl::HazardPointerRecord<T, RcuTraits>* MakeHazardPointerSlow() const {
462 // allocate new pointer, and add it to the list (atomically)
463 auto hp = new impl::HazardPointerRecord<T, RcuTraits>(*this);
464 impl::HazardPointerRecord<T, RcuTraits>* old_hp = nullptr;
465 do {
466 old_hp = hp_record_head_.load();
467 hp->next = old_hp;
468 } while (!hp_record_head_.compare_exchange_strong(old_hp, hp));
469 return hp;
470 }
471
472 void Retire(std::unique_ptr<T> old_ptr, std::unique_lock<MutexType>& lock) {
473 LOG_TRACE() << "Retiring ptr=" << old_ptr.get();
474 auto hazard_ptrs = CollectHazardPtrs(lock);
475
476 if (hazard_ptrs.count(old_ptr.get()) > 0) {
477 // old_ptr is being used now, we may not delete it, delay deletion
478 LOG_TRACE() << "Not retire, still used ptr=" << old_ptr.get();
479 retire_list_head_.push_back(std::move(old_ptr));
480 } else {
481 LOG_TRACE() << "Retire, not used ptr=" << old_ptr.get();
482 DeleteAsync(std::move(old_ptr));
483 }
484
485 ScanRetiredList(hazard_ptrs);
486 }
487
488 // Scan retired list and for every object that has no more hazard_ptrs
489 // pointing at it, destroy it (asynchronously)
490 void ScanRetiredList(const std::unordered_set<T*>& hazard_ptrs) {
491 for (auto rit = retire_list_head_.begin();
492 rit != retire_list_head_.end();) {
493 auto current = rit++;
494 if (hazard_ptrs.count(current->get()) == 0) {
495 // *current is not used by anyone, may delete it
496 DeleteAsync(std::move(*current));
497 retire_list_head_.erase(current);
498 }
499 }
500 }
501
502 // Returns all T*, that have hazard ptr pointing at them. Occasionally nullptr
503 // might be in result as well.
504 std::unordered_set<T*> CollectHazardPtrs(std::unique_lock<MutexType>&) {
505 std::unordered_set<T*> hazard_ptrs;
506
507 // Learn all currently used hazard pointers
508 for (auto* hp = hp_record_head_.load(); hp; hp = hp->next) {
509 hazard_ptrs.insert(hp->ptr.load());
510 }
511 return hazard_ptrs;
512 }
513
514 void DeleteAsync(std::unique_ptr<T> ptr) {
515 switch (destruction_type_) {
516 case DestructionType::kSync:
517 ptr.reset();
518 break;
519 case DestructionType::kAsync:
520 engine::CriticalAsyncNoSpan([ptr = std::move(ptr),
521 token = wait_token_storage_
522 .GetToken()]() mutable {
523 // Make sure *ptr is deleted before token is destroyed
524 ptr.reset();
525 }).Detach();
526 break;
527 }
528 }
529
530 const DestructionType destruction_type_;
531 const uint64_t epoch_;
532
533 mutable std::atomic<impl::HazardPointerRecord<T, RcuTraits>*> hp_record_head_{
534 {nullptr}};
535
536 MutexType mutex_; // for current_ changes and retire_list_head_ access
537 // may be read without mutex_ locked, but must be changed with held mutex_
538 std::atomic<T*> current_;
539 std::list<std::unique_ptr<T>> retire_list_head_;
540 utils::impl::WaitTokenStorage wait_token_storage_;
541
542 friend class ReadablePtr<T, RcuTraits>;
543 friend class WritablePtr<T, RcuTraits>;
544};
545
546} // namespace rcu
547
548USERVER_NAMESPACE_END