userver: userver/rcu/rcu.hpp Source File
Loading...
Searching...
No Matches
rcu.hpp
Go to the documentation of this file.
1#pragma once
2
3/// @file userver/rcu/rcu.hpp
4/// @brief Implementation of hazard pointer
5
6#include <atomic>
7#include <cstdlib>
8#include <list>
9#include <unordered_set>
10
11#include <userver/engine/async.hpp>
12#include <userver/engine/mutex.hpp>
13#include <userver/logging/log.hpp>
14#include <userver/rcu/fwd.hpp>
15#include <userver/utils/assert.hpp>
16#include <userver/utils/impl/wait_token_storage.hpp>
17
18USERVER_NAMESPACE_BEGIN
19
20/// @brief Read-Copy-Update
21///
22/// @see Based on ideas from
23/// http://www.drdobbs.com/lock-free-data-structures-with-hazard-po/184401890
24/// with modified API
25namespace rcu {
26
27namespace impl {
28
29// Hazard pointer implementation. Pointers form a linked list. \p ptr points
30// to the data they 'hold', next - to the next element in a list.
31// kUsed is a filler value to show that hazard pointer is not free. Please see
32// comment to it. Please note, pointers do not form a linked list with
33// application-wide list, that is there is no 'static
34// std::atomic<HazardPointerRecord*> global_head' Every rcu::Variable has its
35// own list of hazard pointers. Thus, move-assignment on hazard pointers is
36// difficult to implement.
37template <typename T, typename RcuTraits>
38struct HazardPointerRecord final {
39 // You see, objects are created 'filled', that is for the purposes of hazard
40 // pointer list, they contain value. This eliminates some race conditions,
41 // because these algorithms checks for ptr != nullptr (And kUsed is not
42 // nullptr). Obviously, this value can't be used, when dereferenced it points
43 // somewhere into kernel space and will cause SEGFAULT
44 static inline T* const kUsed = reinterpret_cast<T*>(1);
45
46 explicit HazardPointerRecord(const Variable<T, RcuTraits>& owner)
47 : owner(owner) {}
48
49 std::atomic<T*> ptr = kUsed;
50 const Variable<T, RcuTraits>& owner;
51 std::atomic<HazardPointerRecord*> next{nullptr};
52
53 // Simple operation that marks this hazard pointer as no longer used.
54 void Release() { ptr = nullptr; }
55};
56
57template <typename T, typename RcuTraits>
58struct CachedData {
59 impl::HazardPointerRecord<T, RcuTraits>* hp{nullptr};
60 const Variable<T, RcuTraits>* variable{nullptr};
61 // ensures that `variable` points to the instance that filled the cache
62 uint64_t variable_epoch{0};
63};
64
65template <typename T, typename RcuTraits>
66thread_local CachedData<T, RcuTraits> cache;
67
68uint64_t GetNextEpoch() noexcept;
69
70} // namespace impl
71
72/// Default Rcu traits.
73/// - `MutexType` is a writer's mutex type that has to be used to protect
74/// structure on update
75template <typename T>
76struct DefaultRcuTraits {
77 using MutexType = engine::Mutex;
78};
79
80/// Reader smart pointer for rcu::Variable<T>. You may use operator*() or
81/// operator->() to do something with the stored value. Once created,
82/// ReadablePtr references the same immutable value: if Variable's value is
83/// changed during ReadablePtr lifetime, it will not affect value referenced by
84/// ReadablePtr.
85template <typename T, typename RcuTraits>
86class [[nodiscard]] ReadablePtr final {
87 public:
88 explicit ReadablePtr(const Variable<T, RcuTraits>& ptr)
89 : hp_record_(&ptr.MakeHazardPointer()) {
90 // This cycle guarantees that at the end of it both t_ptr_ and
91 // hp_record_->ptr will both be set to
92 // 1. something meaningful
93 // 2. and that this meaningful value was not removed between assigning to
94 // t_ptr_ and storing it in a hazard pointer
95 do {
96 t_ptr_ = ptr.GetCurrent();
97
98 hp_record_->ptr.store(t_ptr_);
99 } while (t_ptr_ != ptr.GetCurrent());
100 }
101
102 ReadablePtr(ReadablePtr<T, RcuTraits>&& other) noexcept
103 : t_ptr_(other.t_ptr_), hp_record_(other.hp_record_) {
104 other.t_ptr_ = nullptr;
105 }
106
107 ReadablePtr& operator=(ReadablePtr<T, RcuTraits>&& other) noexcept {
108 // What do we have here?
109 // 1. 'other' may point to the same variable - or to a different one.
110 // 2. therefore, its hazard pointer may belong to the same list,
111 // or to a different one.
112
113 // We can't move hazard pointers between different lists - there is simply
114 // no way to do this.
115 // We also can't simply swap two pointers inside hazard pointers and leave
116 // it be, because if 'other' is from different Variable, then our
117 // ScanRetiredList won't find any more pointer to current T* and will
118 // destroy object, while 'other' is still holding a hazard pointer to it.
119 // The thing is, whole ReadablePtr is just a glorified holder for
120 // hazard pointer + t_ptr_ just so that we don't have to load() atomics
121 // every time.
122 // so, lets just take pointer to hazard pointer inside other
123 UASSERT_MSG(this != &other, "Self assignment to RCU variable");
124 if (this == &other) {
125 // it is an error to assign to self. Do nothing in this case
126 return *this;
127 }
128
129 // Get rid of our current hp_record_
130 if (t_ptr_) {
131 hp_record_->Release();
132 }
133 // After that moment, the content of our hp_record_ can't be used -
134 // no more hp_record_->xyz calls, because it is probably already reused in
135 // some other ReadablePtr. Also, don't call t_ptr_, it is probably already
136 // freed. Just take values from 'other'.
137 hp_record_ = other.hp_record_;
138 t_ptr_ = other.t_ptr_;
139
140 // Now, it won't do us any good if there were two glorified things having
141 // pointer to same hp_record_. Kill the other one.
142 other.t_ptr_ = nullptr;
143 // We don't need to clean other.hp_record_, because other.t_ptr_ acts
144 // like a guard to it. As long as other.t_ptr_ is nullptr, nobody will
145 // use other.hp_record_
146
147 return *this;
148 }
149
150 ReadablePtr(const ReadablePtr<T, RcuTraits>& other)
151 : ReadablePtr(other.hp_record_->owner) {}
152
153 ReadablePtr& operator=(const ReadablePtr<T, RcuTraits>& other) {
154 if (this != &other) *this = ReadablePtr<T, RcuTraits>{other};
155 return *this;
156 }
157
158 ~ReadablePtr() {
159 if (!t_ptr_) return;
160 UASSERT(hp_record_ != nullptr);
161 hp_record_->Release();
162 }
163
164 const T* Get() const& {
165 UASSERT(t_ptr_);
166 return t_ptr_;
167 }
168
169 const T* Get() && { return GetOnRvalue(); }
170
171 const T* operator->() const& { return Get(); }
172 const T* operator->() && { return GetOnRvalue(); }
173
174 const T& operator*() const& { return *Get(); }
175 const T& operator*() && { return *GetOnRvalue(); }
176
177 private:
178 const T* GetOnRvalue() {
179 static_assert(!sizeof(T),
180 "Don't use temporary ReadablePtr, store it to a variable");
181 std::abort();
182 }
183
184 // This is a pointer to actual data. If it is null, then we treat it as
185 // an indicator that this ReadablePtr is cleared and won't call
186 // any logic associated with hp_record_
187 T* t_ptr_;
188 // Our hazard pointer. It can be nullptr in some circumstances.
189 // Invariant is this: if t_ptr_ is not nullptr, then hp_record_ is also
190 // not nullptr and points to hazard pointer containing same T*.
191 // Thus, if t_ptr_ is nullptr, then hp_record_ is undefined.
192 impl::HazardPointerRecord<T, RcuTraits>* hp_record_;
193};
194
195/// Smart pointer for rcu::Variable<T> for changing RCU value. It stores a
196/// reference to a to-be-changed value and allows one to mutate the value (e.g.
197/// add items to std::unordered_map). Changed value is not visible to readers
198/// until explicit store by Commit. Only a single writer may own a WritablePtr
199/// associated with the same Variable, so WritablePtr creates a critical
200/// section. This critical section doesn't affect readers, so a slow writer
201/// doesn't block readers.
202/// @note you may not pass WritablePtr between coroutines as it owns
203/// engine::Mutex, which must be unlocked in the same coroutine that was used to
204/// lock the mutex.
205template <typename T, typename RcuTraits>
206class [[nodiscard]] WritablePtr final {
207 public:
208 /// For internal use only. Use `var.StartWrite()` instead
209 explicit WritablePtr(Variable<T, RcuTraits>& var)
210 : var_(var),
211 lock_(var.mutex_),
213 LOG_TRACE() << "Start writing ptr=" << ptr_.get();
214 }
215
216 /// For internal use only. Use `var.Emplace(args...)` instead
217 template <typename... Args>
218 WritablePtr(Variable<T, RcuTraits>& var, std::in_place_t,
219 Args&&... initial_value_args)
220 : var_(var),
221 lock_(var.mutex_),
223 LOG_TRACE() << "Start writing ptr=" << ptr_.get()
224 << " with custom initial value";
225 }
226
227 WritablePtr(WritablePtr<T, RcuTraits>&& other) noexcept
228 : var_(other.var_),
229 lock_(std::move(other.lock_)),
230 ptr_(std::move(other.ptr_)) {
231 LOG_TRACE() << "Continue writing ptr=" << ptr_.get();
232 }
233
234 ~WritablePtr() {
235 if (ptr_) {
236 LOG_TRACE() << "Stop writing ptr=" << ptr_.get();
237 }
238 }
239
240 /// Store the changed value in Variable. After Commit() the value becomes
241 /// visible to new readers (IOW, Variable::Read() returns ReadablePtr
242 /// referencing the stored value, not an old value).
243 void Commit() {
244 UASSERT(ptr_ != nullptr);
245 LOG_TRACE() << "Committing ptr=" << ptr_.get();
246
247 std::unique_ptr<T> old_ptr(var_.current_.exchange(ptr_.release()));
248 var_.Retire(std::move(old_ptr), lock_);
249 lock_.unlock();
250 }
251
252 T* Get() & {
253 UASSERT(ptr_);
254 return ptr_.get();
255 }
256
257 T* Get() && { return GetOnRvalue(); }
258
259 T* operator->() & { return Get(); }
260 T* operator->() && { return GetOnRvalue(); }
261
262 T& operator*() & { return *Get(); }
263 T& operator*() && { return *GetOnRvalue(); }
264
265 private:
266 T* GetOnRvalue() {
267 static_assert(!sizeof(T),
268 "Don't use temporary WritablePtr, store it to a variable");
269 std::abort();
270 }
271
272 Variable<T, RcuTraits>& var_;
273 std::unique_lock<typename RcuTraits::MutexType> lock_;
274 std::unique_ptr<T> ptr_;
275};
276
277/// @brief Can be passed to `rcu::Variable` as the first argument to customize
278/// whether old values should be destroyed asynchronously.
279enum class DestructionType { kSync, kAsync };
280
281/// @ingroup userver_concurrency userver_containers
282///
283/// @brief Read-Copy-Update variable
284///
285/// @see Based on ideas from
286/// http://www.drdobbs.com/lock-free-data-structures-with-hazard-po/184401890
287/// with modified API.
288///
289/// A variable with MT-access pattern "very often reads, seldom writes". It is
290/// specially optimized for reads. On read, one obtains a ReaderPtr<T> from it
291/// and uses the obtained value as long as it wants to. On write, one obtains a
292/// WritablePtr<T> with a copy of the last version of the value, makes some
293/// changes to it, and commits the result to update current variable value (does
294/// Read-Copy-Update). Old version of the value is not freed on update, it will
295/// be eventually freed when a subsequent writer identifies that nobody works
296/// with this version.
297///
298/// @note There is no way to create a "null" `Variable`.
299///
300/// ## Example usage:
301///
302/// @snippet rcu/rcu_test.cpp Sample rcu::Variable usage
303///
304/// @see @ref scripts/docs/en/userver/synchronization.md
305template <typename T, typename RcuTraits>
306class Variable final {
307 public:
308 using MutexType = typename RcuTraits::MutexType;
309
310 /// Create a new `Variable` with an in-place constructed initial value.
311 /// Asynchronous destruction is enabled by default.
312 /// @param initial_value_args arguments passed to the constructor of the
313 /// initial value
314 template <typename... Args>
315 Variable(Args&&... initial_value_args)
316 : destruction_type_(std::is_trivially_destructible_v<T> ||
317 std::is_same_v<T, std::string> ||
318 !std::is_same_v<MutexType, engine::Mutex>
319 ? DestructionType::kSync
320 : DestructionType::kAsync),
321 epoch_(impl::GetNextEpoch()),
322 current_(new T(std::forward<Args>(initial_value_args)...)) {}
323
324 /// Create a new `Variable` with an in-place constructed initial value.
325 /// @param destruction_type controls whether destruction of old values should
326 /// be performed asynchronously
327 /// @param initial_value_args arguments passed to the constructor of the
328 /// initial value
329 template <typename... Args>
330 Variable(DestructionType destruction_type, Args&&... initial_value_args)
331 : destruction_type_(destruction_type),
332 epoch_(impl::GetNextEpoch()),
333 current_(new T(std::forward<Args>(initial_value_args)...)) {}
334
335 Variable(const Variable&) = delete;
336 Variable(Variable&&) = delete;
337 Variable& operator=(const Variable&) = delete;
338 Variable& operator=(Variable&&) = delete;
339
340 ~Variable() {
341 delete current_.load();
342
343 auto* hp = hp_record_head_.load();
344 while (hp) {
345 auto* next = hp->next.load();
346 UASSERT_MSG(hp->ptr == nullptr,
347 "RCU variable is destroyed while being used");
348 delete hp;
349 hp = next;
350 }
351
352 // Make sure all data is deleted after return from dtr
353 if (destruction_type_ == DestructionType::kAsync) {
354 wait_token_storage_.WaitForAllTokens();
355 }
356 }
357
358 /// Obtain a smart pointer which can be used to read the current value.
359 ReadablePtr<T, RcuTraits> Read() const {
360 return ReadablePtr<T, RcuTraits>(*this);
361 }
362
363 /// Obtain a copy of contained value.
364 T ReadCopy() const {
365 auto ptr = Read();
366 return *ptr;
367 }
368
369 /// Obtain a smart pointer that will *copy* the current value. The pointer can
370 /// be used to make changes to the value and to set the `Variable` to the
371 /// changed value.
372 WritablePtr<T, RcuTraits> StartWrite() {
373 return WritablePtr<T, RcuTraits>(*this);
374 }
375
376 /// Obtain a smart pointer to a newly in-place constructed value, but does
377 /// not replace the current one yet (in contrast with regular `Emplace`).
378 template <typename... Args>
379 WritablePtr<T, RcuTraits> StartWriteEmplace(Args&&... args) {
380 return WritablePtr<T, RcuTraits>(*this, std::in_place,
381 std::forward<Args>(args)...);
382 }
383
384 /// Replaces the `Variable`'s value with the provided one.
385 void Assign(T new_value) {
386 WritablePtr<T, RcuTraits>(*this, std::in_place, std::move(new_value))
387 .Commit();
388 }
389
390 /// Replaces the `Variable`'s value with an in-place constructed one.
391 template <typename... Args>
392 void Emplace(Args&&... args) {
393 WritablePtr<T, RcuTraits>(*this, std::in_place, std::forward<Args>(args)...)
394 .Commit();
395 }
396
397 void Cleanup() {
398 std::unique_lock lock(mutex_, std::try_to_lock);
399 if (!lock.owns_lock()) {
400 LOG_TRACE() << "Not cleaning up, someone else is holding the mutex lock";
401 // Someone is already changing the RCU
402 return;
403 }
404
405 ScanRetiredList(CollectHazardPtrs(lock));
406 }
407
408 private:
409 T* GetCurrent() const { return current_.load(); }
410
411 impl::HazardPointerRecord<T, RcuTraits>* MakeHazardPointerCached() const {
412 auto& cache = impl::cache<T, RcuTraits>;
413 auto* hp = cache.hp;
414 T* ptr = nullptr;
415 if (hp && cache.variable == this && cache.variable_epoch == epoch_) {
416 if (hp->ptr.load() == nullptr &&
417 hp->ptr.compare_exchange_strong(
418 ptr, impl::HazardPointerRecord<T, RcuTraits>::kUsed)) {
419 return hp;
420 }
421 }
422
423 return nullptr;
424 }
425
426 impl::HazardPointerRecord<T, RcuTraits>* MakeHazardPointerFast() const {
427 // Look for any hazard pointer with nullptr data ptr.
428 // Mark it with kUsed (to reserve it for ourselves) and return it.
429 auto* hp = hp_record_head_.load();
430 while (hp) {
431 T* t_ptr = nullptr;
432 if (hp->ptr.load() == nullptr &&
433 hp->ptr.compare_exchange_strong(
434 t_ptr, impl::HazardPointerRecord<T, RcuTraits>::kUsed)) {
435 return hp;
436 }
437
438 hp = hp->next;
439 }
440 return nullptr;
441 }
442
443 impl::HazardPointerRecord<T, RcuTraits>& MakeHazardPointer() const {
444 auto* hp = MakeHazardPointerCached();
445 if (!hp) {
446 hp = MakeHazardPointerFast();
447 // all buckets are full, create a new one
448 if (!hp) hp = MakeHazardPointerSlow();
449
450 auto& cache = impl::cache<T, RcuTraits>;
451 cache.hp = hp;
452 cache.variable = this;
453 cache.variable_epoch = epoch_;
454 }
455 UASSERT(&hp->owner == this);
456 return *hp;
457 }
458
459 impl::HazardPointerRecord<T, RcuTraits>* MakeHazardPointerSlow() const {
460 // allocate new pointer, and add it to the list (atomically)
461 auto hp = new impl::HazardPointerRecord<T, RcuTraits>(*this);
462 impl::HazardPointerRecord<T, RcuTraits>* old_hp = nullptr;
463 do {
464 old_hp = hp_record_head_.load();
465 hp->next = old_hp;
466 } while (!hp_record_head_.compare_exchange_strong(old_hp, hp));
467 return hp;
468 }
469
470 void Retire(std::unique_ptr<T> old_ptr, std::unique_lock<MutexType>& lock) {
471 LOG_TRACE() << "Retiring ptr=" << old_ptr.get();
472 auto hazard_ptrs = CollectHazardPtrs(lock);
473
474 if (hazard_ptrs.count(old_ptr.get()) > 0) {
475 // old_ptr is being used now, we may not delete it, delay deletion
476 LOG_TRACE() << "Not retire, still used ptr=" << old_ptr.get();
477 retire_list_head_.push_back(std::move(old_ptr));
478 } else {
479 LOG_TRACE() << "Retire, not used ptr=" << old_ptr.get();
480 DeleteAsync(std::move(old_ptr));
481 }
482
483 ScanRetiredList(hazard_ptrs);
484 }
485
486 // Scan retired list and for every object that has no more hazard_ptrs
487 // pointing at it, destroy it (asynchronously)
488 void ScanRetiredList(const std::unordered_set<T*>& hazard_ptrs) {
489 for (auto rit = retire_list_head_.begin();
490 rit != retire_list_head_.end();) {
491 auto current = rit++;
492 if (hazard_ptrs.count(current->get()) == 0) {
493 // *current is not used by anyone, may delete it
494 DeleteAsync(std::move(*current));
495 retire_list_head_.erase(current);
496 }
497 }
498 }
499
500 // Returns all T*, that have hazard ptr pointing at them. Occasionally nullptr
501 // might be in result as well.
502 std::unordered_set<T*> CollectHazardPtrs(std::unique_lock<MutexType>&) {
503 std::unordered_set<T*> hazard_ptrs;
504
505 // Learn all currently used hazard pointers
506 for (auto* hp = hp_record_head_.load(); hp; hp = hp->next) {
507 hazard_ptrs.insert(hp->ptr.load());
508 }
509 return hazard_ptrs;
510 }
511
512 void DeleteAsync(std::unique_ptr<T> ptr) {
513 switch (destruction_type_) {
514 case DestructionType::kSync:
515 ptr.reset();
516 break;
517 case DestructionType::kAsync:
518 engine::CriticalAsyncNoSpan([ptr = std::move(ptr),
519 token = wait_token_storage_
520 .GetToken()]() mutable {
521 // Make sure *ptr is deleted before token is destroyed
522 ptr.reset();
523 }).Detach();
524 break;
525 }
526 }
527
528 const DestructionType destruction_type_;
529 const uint64_t epoch_;
530
531 mutable std::atomic<impl::HazardPointerRecord<T, RcuTraits>*> hp_record_head_{
532 {nullptr}};
533
534 MutexType mutex_; // for current_ changes and retire_list_head_ access
535 // may be read without mutex_ locked, but must be changed with held mutex_
536 std::atomic<T*> current_;
537 std::list<std::unique_ptr<T>> retire_list_head_;
538 utils::impl::WaitTokenStorage wait_token_storage_;
539
540 friend class ReadablePtr<T, RcuTraits>;
541 friend class WritablePtr<T, RcuTraits>;
542};
543
544} // namespace rcu
545
546USERVER_NAMESPACE_END