userver: userver/rcu/rcu.hpp Source File
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages Concepts
rcu.hpp
Go to the documentation of this file.
1#pragma once
2
3/// @file userver/rcu/rcu.hpp
4/// @brief @copybrief rcu::Variable
5
6#include <atomic>
7#include <cstdlib>
8#include <mutex>
9#include <optional>
10#include <utility>
11
12#include <userver/concurrent/impl/asymmetric_fence.hpp>
13#include <userver/concurrent/impl/intrusive_hooks.hpp>
14#include <userver/concurrent/impl/intrusive_stack.hpp>
15#include <userver/concurrent/impl/striped_read_indicator.hpp>
16#include <userver/engine/async.hpp>
17#include <userver/engine/mutex.hpp>
18#include <userver/rcu/fwd.hpp>
19#include <userver/utils/assert.hpp>
20#include <userver/utils/impl/wait_token_storage.hpp>
21
22USERVER_NAMESPACE_BEGIN
23
24/// @brief Read-Copy-Update
25///
26/// @see Based on ideas from
27/// http://www.drdobbs.com/lock-free-data-structures-with-hazard-po/184401890
28/// with modified API
29namespace rcu {
30
31namespace impl {
32
33template <typename T>
34struct SnapshotRecord final {
35 std::optional<T> data;
36 concurrent::impl::StripedReadIndicator indicator;
37 concurrent::impl::SinglyLinkedHook<SnapshotRecord> free_list_hook;
38 SnapshotRecord* next_retired{nullptr};
39};
40
41// Used instead of concurrent::impl::MemberHook to avoid instantiating
42// SnapshotRecord<T> ahead of time.
43template <typename T>
44struct FreeListHookGetter {
45 auto& operator()(SnapshotRecord<T>& node) const noexcept { return node.free_list_hook; }
46};
47
48template <typename T>
49struct SnapshotRecordFreeList {
50 SnapshotRecordFreeList() = default;
51
52 ~SnapshotRecordFreeList() {
53 list.DisposeUnsafe([](SnapshotRecord<T>& record) { delete &record; });
54 }
55
56 concurrent::impl::IntrusiveStack<SnapshotRecord<T>, FreeListHookGetter<T>> list;
57};
58
59template <typename T>
60class SnapshotRecordRetiredList final {
61public:
62 SnapshotRecordRetiredList() = default;
63
64 bool IsEmpty() const noexcept { return head_ == nullptr; }
65
66 void Push(SnapshotRecord<T>& record) noexcept {
67 record.next_retired = head_;
68 head_ = &record;
69 }
70
71 template <typename Predicate, typename Disposer>
72 void RemoveAndDisposeIf(Predicate predicate, Disposer disposer) {
73 SnapshotRecord<T>** ptr_to_current = &head_;
74
75 while (*ptr_to_current != nullptr) {
76 SnapshotRecord<T>* const current = *ptr_to_current;
77
78 if (predicate(*current)) {
79 *ptr_to_current = std::exchange(current->next_retired, nullptr);
80 disposer(*current);
81 } else {
82 ptr_to_current = &current->next_retired;
83 }
84 }
85 }
86
87private:
88 SnapshotRecord<T>* head_{nullptr};
89};
90
91class ExclusiveMutex final {
92public:
93 void lock() {
94 const bool was_locked = is_locked_.exchange(true);
96 !was_locked,
97 "Detected a race condition when multiple writers Assign to an rcu::Variable concurrently. The value that "
98 "will remain in rcu::Variable when the dust settles is unspecified."
99 );
100 }
101
102 void unlock() noexcept { is_locked_.store(false); }
103
104private:
105 std::atomic<bool> is_locked_{false};
106};
107
108} // namespace impl
109
110/// @brief A handle to the retired object version, which an RCU deleter should
111/// clean up.
112/// @see rcu::DefaultRcuTraits
113template <typename T>
114class SnapshotHandle final {
115public:
116 SnapshotHandle(SnapshotHandle&& other) noexcept
117 : record_(std::exchange(other.record_, nullptr)), free_list_(std::exchange(other.free_list_, nullptr)) {}
118
119 ~SnapshotHandle() {
120 if (record_ != nullptr) {
121 UASSERT(free_list_ != nullptr);
122 record_->data.reset();
123 free_list_->list.Push(*record_);
124 }
125 }
126
127private:
128 template <typename /*T*/, typename Traits>
129 friend class Variable;
130
131 template <typename /*T*/, typename Traits>
132 friend class WritablePtr;
133
134 explicit SnapshotHandle(impl::SnapshotRecord<T>& record, impl::SnapshotRecordFreeList<T>& free_list) noexcept
135 : record_(&record), free_list_(&free_list) {}
136
137 impl::SnapshotRecord<T>* record_;
138 impl::SnapshotRecordFreeList<T>* free_list_;
139};
140
141/// @brief Destroys retired objects synchronously.
142/// @see rcu::DefaultRcuTraits
143struct SyncDeleter final {
144 template <typename T>
145 void Delete(SnapshotHandle<T>&& handle) noexcept {
146 [[maybe_unused]] const auto for_deletion = std::move(handle);
147 }
148};
149
150/// @brief Destroys retired objects asynchronously in the same `TaskProcessor`.
151/// @see rcu::DefaultRcuTraits
152class AsyncDeleter final {
153public:
154 ~AsyncDeleter() { wait_token_storage_.WaitForAllTokens(); }
155
156 template <typename T>
157 void Delete(SnapshotHandle<T>&& handle) noexcept {
158 if constexpr (std::is_trivially_destructible_v<T> || std::is_same_v<T, std::string>) {
159 SyncDeleter{}.Delete(std::move(handle));
160 } else {
161 try {
162 engine::CriticalAsyncNoSpan(
163 // The order of captures is important, 'handle' must be destroyed before 'token'.
164 [token = wait_token_storage_.GetToken(), handle = std::move(handle)]() mutable {}
165 ).Detach();
166 // NOLINTNEXTLINE(bugprone-empty-catch)
167 } catch (...) {
168 // Task creation somehow failed.
169 // `handle` will be destroyed synchronously, because it is already moved
170 // into the task's lambda.
171 }
172 }
173 }
174
175private:
176 utils::impl::WaitTokenStorage wait_token_storage_;
177};
178
179/// @brief Default RCU traits. Deletes garbage asynchronously.
180/// Designed for storing data of multi-megabyte or multi-gigabyte caches.
181/// @note Allows reads from any kind of thread.
182/// Only allows writes from coroutine threads.
183/// @see rcu::Variable
184/// @see rcu::SyncRcuTraits
185/// @see rcu::BlockingRcuTraits
187 /// `MutexType` is a writer's mutex type that has to be used to protect
188 /// structure on update.
189 using MutexType = engine::Mutex;
190
191 /// `DeleterType` is used to delete retired objects. It should:
192 /// 1. should contain `void Delete(SnapshotHandle<T>) noexcept`;
193 /// 2. force synchronous cleanup of remaining handles on destruction.
194 using DeleterType = AsyncDeleter;
195};
196
197/// @brief Deletes garbage synchronously.
198/// Designed for storing small amounts of data with relatively fast destructors.
199/// @note Allows reads from any kind of thread.
200/// Only allows writes from coroutine threads.
201/// @see rcu::DefaultRcuTraits
203 using DeleterType = SyncDeleter;
204};
205
206/// @brief Rcu traits for using outside of coroutines.
207/// @note Allows reads from any kind of thread.
208/// Only allows writes from NON-coroutine threads.
209/// @warning Blocks writing threads which are coroutines, which can cause
210/// deadlocks and hangups.
211/// @see rcu::DefaultRcuTraits
213 using MutexType = std::mutex;
214 using DeleterType = SyncDeleter;
215};
216
217/// @brief Rcu traits that only allow a single writer.
218/// Detects race conditions when multiple writers call `Assign` concurrently.
219/// @note Allows reads and writes from any kind of thread.
220/// @see rcu::DefaultRcuTraits
222 using MutexType = impl::ExclusiveMutex;
223 using DeleterType = SyncDeleter;
224};
225
226/// Reader smart pointer for rcu::Variable<T>. You may use operator*() or
227/// operator->() to do something with the stored value. Once created,
228/// ReadablePtr references the same immutable value: if Variable's value is
229/// changed during ReadablePtr lifetime, it will not affect value referenced by
230/// ReadablePtr.
231template <typename T, typename RcuTraits>
232class [[nodiscard]] ReadablePtr final {
233public:
234 explicit ReadablePtr(const Variable<T, RcuTraits>& ptr) {
235 auto* record = ptr.current_.load();
236
237 while (true) {
238 // Lock 'record', which may or may not be 'current_' by the time we got
239 // there.
240 lock_ = record->indicator.Lock();
241
242 // seq_cst is required for indicator.Lock in the following case.
243 //
244 // Reader thread point-of-view:
245 // 1. [reader] load current_
246 // 2. [reader] indicator.Lock
247 // 3. [reader] load current_
248 // 4. [writer] store current_
249 // 5. [writer] indicator.IsFree
250 //
251 // Given seq_cst only on (3), (4), and (5), the writer can see
252 // (2) after (5). In this case the reader will think that it has
253 // successfully taken the lock, which is false.
254 //
255 // So we need seq_cst on all of (2), (3), (4), (5). Making (2) seq_cst is
256 // somewhat expensive, but this is a well-known cost of hazard pointers.
257 //
258 // The seq_cst cost can be mitigated by utilizing asymmetric fences.
259 // This asymmetric fence effectively grants std::memory_order_seq_cst
260 // to indicator.Lock when applied together with AsymmetricThreadFenceHeavy
261 // in (5). The technique is taken from Folly HazPtr.
262 concurrent::impl::AsymmetricThreadFenceLight();
263
264 // Is the record we locked 'current_'? If so, congratulations, we are
265 // holding a lock to 'current_'.
266 auto* new_current = ptr.current_.load(std::memory_order_seq_cst);
267 if (new_current == record) break;
268
269 // 'current_' changed, try again
270 record = new_current;
271 }
272
273 ptr_ = &*record->data;
274 }
275
276 ReadablePtr(ReadablePtr&& other) noexcept = default;
277 ReadablePtr& operator=(ReadablePtr&& other) noexcept = default;
278 ReadablePtr(const ReadablePtr& other) = default;
279 ReadablePtr& operator=(const ReadablePtr& other) = default;
280 ~ReadablePtr() = default;
281
282 const T* Get() const& {
283 UASSERT(ptr_);
284 return ptr_;
285 }
286
287 const T* Get() && { return GetOnRvalue(); }
288
289 const T* operator->() const& { return Get(); }
290 const T* operator->() && { return GetOnRvalue(); }
291
292 const T& operator*() const& { return *Get(); }
293 const T& operator*() && { return *GetOnRvalue(); }
294
295private:
296 const T* GetOnRvalue() {
297 static_assert(!sizeof(T), "Don't use temporary ReadablePtr, store it to a variable");
298 std::abort();
299 }
300
301 const T* ptr_;
302 concurrent::impl::StripedReadIndicatorLock lock_;
303};
304
305/// Smart pointer for rcu::Variable<T> for changing RCU value. It stores a
306/// reference to a to-be-changed value and allows one to mutate the value (e.g.
307/// add items to std::unordered_map). Changed value is not visible to readers
308/// until explicit store by Commit. Only a single writer may own a WritablePtr
309/// associated with the same Variable, so WritablePtr creates a critical
310/// section. This critical section doesn't affect readers, so a slow writer
311/// doesn't block readers.
312/// @note you may not pass WritablePtr between coroutines as it owns
313/// engine::Mutex, which must be unlocked in the same coroutine that was used to
314/// lock the mutex.
315template <typename T, typename RcuTraits>
316class [[nodiscard]] WritablePtr final {
317public:
318 /// @cond
319 // For internal use only. Use `var.StartWrite()` instead
320 explicit WritablePtr(Variable<T, RcuTraits>& var)
321 : var_(var), lock_(var.mutex_), record_(&var.EmplaceSnapshot(*var.current_.load()->data)) {}
322
323 // For internal use only. Use `var.Emplace(args...)` instead
324 template <typename... Args>
325 WritablePtr(Variable<T, RcuTraits>& var, std::in_place_t, Args&&... initial_value_args)
326 : var_(var), lock_(var.mutex_), record_(&var.EmplaceSnapshot(std::forward<Args>(initial_value_args)...)) {}
327 /// @endcond
328
329 WritablePtr(WritablePtr&& other) noexcept
330 : var_(other.var_), lock_(std::move(other.lock_)), record_(std::exchange(other.record_, nullptr)) {}
331
332 ~WritablePtr() {
333 if (record_) {
334 var_.DeleteSnapshot(*record_);
335 }
336 }
337
338 /// Store the changed value in Variable. After Commit() the value becomes
339 /// visible to new readers (IOW, Variable::Read() returns ReadablePtr
340 /// referencing the stored value, not an old value).
341 void Commit() {
342 UASSERT(record_ != nullptr);
343 var_.DoAssign(*std::exchange(record_, nullptr), lock_);
344 lock_.unlock();
345 }
346
347 T* Get() & {
348 UASSERT(record_ != nullptr);
349 return &*record_->data;
350 }
351
352 T* Get() && { return GetOnRvalue(); }
353
354 T* operator->() & { return Get(); }
355 T* operator->() && { return GetOnRvalue(); }
356
357 T& operator*() & { return *Get(); }
358 T& operator*() && { return *GetOnRvalue(); }
359
360private:
361 [[noreturn]] static T* GetOnRvalue() {
362 static_assert(!sizeof(T), "Don't use temporary WritablePtr, store it to a variable");
363 std::abort();
364 }
365
366 Variable<T, RcuTraits>& var_;
367 std::unique_lock<typename RcuTraits::MutexType> lock_;
368 impl::SnapshotRecord<T>* record_;
369};
370
371/// @ingroup userver_concurrency userver_containers
372///
373/// @brief Read-Copy-Update variable
374///
375/// @see Based on ideas from
376/// http://www.drdobbs.com/lock-free-data-structures-with-hazard-po/184401890
377/// with modified API.
378///
379/// A variable with MT-access pattern "very often reads, seldom writes". It is
380/// specially optimized for reads. On read, one obtains a ReaderPtr<T> from it
381/// and uses the obtained value as long as it wants to. On write, one obtains a
382/// WritablePtr<T> with a copy of the last version of the value, makes some
383/// changes to it, and commits the result to update current variable value (does
384/// Read-Copy-Update). Old version of the value is not freed on update, it will
385/// be eventually freed when a subsequent writer identifies that nobody works
386/// with this version.
387///
388/// @note There is no way to create a "null" `Variable`.
389///
390/// ## Example usage:
391///
392/// @snippet rcu/rcu_test.cpp Sample rcu::Variable usage
393///
394/// @see @ref scripts/docs/en/userver/synchronization.md
395///
396/// @tparam T the stored value
397/// @tparam RcuTraits traits, should inherit from rcu::DefaultRcuTraits
398template <typename T, typename RcuTraits>
399class Variable final {
400 static_assert(
401 std::is_base_of_v<DefaultRcuTraits, RcuTraits>,
402 "RcuTraits should publicly inherit from rcu::DefaultRcuTraits"
403 );
404
405public:
406 using MutexType = typename RcuTraits::MutexType;
407 using DeleterType = typename RcuTraits::DeleterType;
408
409 /// @brief Create a new `Variable` with an in-place constructed initial value.
410 /// @param initial_value_args arguments passed to the constructor of the
411 /// initial value
412 template <typename... Args>
413 // TODO make explicit
414 Variable(Args&&... initial_value_args) : current_(&EmplaceSnapshot(std::forward<Args>(initial_value_args)...)) {}
415
416 Variable(const Variable&) = delete;
417 Variable(Variable&&) = delete;
418 Variable& operator=(const Variable&) = delete;
419 Variable& operator=(Variable&&) = delete;
420
421 ~Variable() {
422 {
423 auto* record = current_.load();
424 UASSERT_MSG(record->indicator.IsFree(), "RCU variable is destroyed while being used");
425 delete record;
426 }
427
428 retired_list_.RemoveAndDisposeIf(
429 [](impl::SnapshotRecord<T>&) { return true; },
430 [](impl::SnapshotRecord<T>& record) {
431 UASSERT_MSG(record.indicator.IsFree(), "RCU variable is destroyed while being used");
432 delete &record;
433 }
434 );
435 }
436
437 /// Obtain a smart pointer which can be used to read the current value.
438 ReadablePtr<T, RcuTraits> Read() const { return ReadablePtr<T, RcuTraits>(*this); }
439
440 /// Obtain a copy of contained value.
441 T ReadCopy() const {
442 auto ptr = Read();
443 return *ptr;
444 }
445
446 /// Obtain a smart pointer that will *copy* the current value. The pointer can
447 /// be used to make changes to the value and to set the `Variable` to the
448 /// changed value.
449 WritablePtr<T, RcuTraits> StartWrite() { return WritablePtr<T, RcuTraits>(*this); }
450
451 /// Obtain a smart pointer to a newly in-place constructed value, but does
452 /// not replace the current one yet (in contrast with regular `Emplace`).
453 template <typename... Args>
454 WritablePtr<T, RcuTraits> StartWriteEmplace(Args&&... args) {
455 return WritablePtr<T, RcuTraits>(*this, std::in_place, std::forward<Args>(args)...);
456 }
457
458 /// Replaces the `Variable`'s value with the provided one.
459 void Assign(T new_value) { WritablePtr<T, RcuTraits>(*this, std::in_place, std::move(new_value)).Commit(); }
460
461 /// Replaces the `Variable`'s value with an in-place constructed one.
462 template <typename... Args>
463 void Emplace(Args&&... args) {
464 WritablePtr<T, RcuTraits>(*this, std::in_place, std::forward<Args>(args)...).Commit();
465 }
466
467 void Cleanup() {
468 std::unique_lock lock(mutex_, std::try_to_lock);
469 if (!lock.owns_lock()) {
470 // Someone is already assigning to the RCU. They will call ScanRetireList
471 // in the process.
472 return;
473 }
474 ScanRetiredList(lock);
475 }
476
477private:
478 friend class ReadablePtr<T, RcuTraits>;
479 friend class WritablePtr<T, RcuTraits>;
480
481 void DoAssign(impl::SnapshotRecord<T>& new_snapshot, std::unique_lock<MutexType>& lock) {
482 UASSERT(lock.owns_lock());
483
484 // Note: exchange RMW operation would not give any benefits here.
485 auto* const old_snapshot = current_.load();
486 current_.store(&new_snapshot, std::memory_order_seq_cst);
487
488 UASSERT(old_snapshot);
489 retired_list_.Push(*old_snapshot);
490 ScanRetiredList(lock);
491 }
492
493 template <typename... Args>
494 [[nodiscard]] impl::SnapshotRecord<T>& EmplaceSnapshot(Args&&... args) {
495 auto* const free_list_record = free_list_.list.TryPop();
496 auto& record = free_list_record ? *free_list_record : *new impl::SnapshotRecord<T>{};
497 UASSERT(!record.data);
498
499 try {
500 record.data.emplace(std::forward<Args>(args)...);
501 } catch (...) {
502 free_list_.list.Push(record);
503 throw;
504 }
505
506 return record;
507 }
508
509 void ScanRetiredList(std::unique_lock<MutexType>& lock) noexcept {
510 UASSERT(lock.owns_lock());
511 if (retired_list_.IsEmpty()) return;
512
513 concurrent::impl::AsymmetricThreadFenceHeavy();
514
515 retired_list_.RemoveAndDisposeIf(
516 [](impl::SnapshotRecord<T>& record) { return record.indicator.IsFree(); },
517 [&](impl::SnapshotRecord<T>& record) { DeleteSnapshot(record); }
518 );
519 }
520
521 void DeleteSnapshot(impl::SnapshotRecord<T>& record) noexcept {
522 static_assert(
523 noexcept(deleter_.Delete(SnapshotHandle<T>{record, free_list_})), "DeleterType::Delete must be noexcept"
524 );
525 deleter_.Delete(SnapshotHandle<T>{record, free_list_});
526 }
527
528 // Covers current_ writes, free_list_.Pop, retired_list_
529 MutexType mutex_{};
530 impl::SnapshotRecordFreeList<T> free_list_;
531 impl::SnapshotRecordRetiredList<T> retired_list_;
532 // Must be placed after 'free_list_' to force sync cleanup before
533 // the destruction of free_list_.
534 DeleterType deleter_{};
535 // Must be placed after 'free_list_' and 'deleter_' so that if
536 // the initialization of current_ throws, it can be disposed properly.
537 std::atomic<impl::SnapshotRecord<T>*> current_;
538};
539
540} // namespace rcu
541
542USERVER_NAMESPACE_END