userver: userver/rcu/rcu.hpp Source File
Loading...
Searching...
No Matches
rcu.hpp
Go to the documentation of this file.
1#pragma once
2
3/// @file userver/rcu/rcu.hpp
4/// @brief @copybrief rcu::Variable
5
6#include <atomic>
7#include <cstdlib>
8#include <mutex>
9#include <optional>
10#include <utility>
11
12#include <userver/concurrent/impl/asymmetric_fence.hpp>
13#include <userver/concurrent/impl/intrusive_hooks.hpp>
14#include <userver/concurrent/impl/intrusive_stack.hpp>
15#include <userver/concurrent/impl/striped_read_indicator.hpp>
16#include <userver/engine/async.hpp>
17#include <userver/engine/mutex.hpp>
18#include <userver/rcu/fwd.hpp>
19#include <userver/utils/assert.hpp>
20#include <userver/utils/impl/wait_token_storage.hpp>
21
22USERVER_NAMESPACE_BEGIN
23
24/// @brief Read-Copy-Update
25///
26/// @see Based on ideas from
27/// http://www.drdobbs.com/lock-free-data-structures-with-hazard-po/184401890
28/// with modified API
29namespace rcu {
30
31namespace impl {
32
33template <typename T>
34struct SnapshotRecord final {
35 std::optional<T> data;
36 concurrent::impl::StripedReadIndicator indicator;
37 concurrent::impl::SinglyLinkedHook<SnapshotRecord> free_list_hook;
38 SnapshotRecord* next_retired{nullptr};
39};
40
41// Used instead of concurrent::impl::MemberHook to avoid instantiating
42// SnapshotRecord<T> ahead of time.
43template <typename T>
44struct FreeListHookGetter {
45 static auto& GetHook(SnapshotRecord<T>& node) noexcept { return node.free_list_hook; }
46};
47
48template <typename T>
49struct SnapshotRecordFreeList {
50 SnapshotRecordFreeList() = default;
51
52 ~SnapshotRecordFreeList() {
53 list.DisposeUnsafe([](SnapshotRecord<T>& record) { delete &record; });
54 }
55
56 concurrent::impl::IntrusiveStack<SnapshotRecord<T>, FreeListHookGetter<T>> list;
57};
58
59template <typename T>
60class SnapshotRecordRetiredList final {
61public:
62 SnapshotRecordRetiredList() = default;
63
64 bool IsEmpty() const noexcept { return head_ == nullptr; }
65
66 void Push(SnapshotRecord<T>& record) noexcept {
67 record.next_retired = head_;
68 head_ = &record;
69 }
70
71 template <typename Predicate, typename Disposer>
72 void RemoveAndDisposeIf(Predicate predicate, Disposer disposer) {
73 SnapshotRecord<T>** ptr_to_current = &head_;
74
75 while (*ptr_to_current != nullptr) {
76 SnapshotRecord<T>* const current = *ptr_to_current;
77
78 if (predicate(*current)) {
79 *ptr_to_current = std::exchange(current->next_retired, nullptr);
80 disposer(*current);
81 } else {
82 ptr_to_current = &current->next_retired;
83 }
84 }
85 }
86
87private:
88 SnapshotRecord<T>* head_{nullptr};
89};
90
91class ExclusiveMutex final {
92public:
93 void lock() {
94 const bool was_locked = is_locked_.exchange(true);
96 !was_locked,
97 "Detected a race condition when multiple writers Assign to an rcu::Variable concurrently. The value that "
98 "will remain in rcu::Variable when the dust settles is unspecified."
99 );
100 }
101
102 void unlock() noexcept { is_locked_.store(false); }
103
104private:
105 std::atomic<bool> is_locked_{false};
106};
107
108} // namespace impl
109
110/// @brief A handle to the retired object version, which an RCU deleter should
111/// clean up.
112/// @see rcu::DefaultRcuTraits
113template <typename T>
114class SnapshotHandle final {
115public:
116 SnapshotHandle(SnapshotHandle&& other) noexcept
117 : record_(std::exchange(other.record_, nullptr)), free_list_(std::exchange(other.free_list_, nullptr)) {}
118
119 ~SnapshotHandle() {
120 if (record_ != nullptr) {
121 UASSERT(free_list_ != nullptr);
122 record_->data.reset();
123 free_list_->list.Push(*record_);
124 }
125 }
126
127private:
128 template <typename /*T*/, typename Traits>
129 friend class Variable;
130
131 template <typename /*T*/, typename Traits>
132 friend class WritablePtr;
133
134 explicit SnapshotHandle(impl::SnapshotRecord<T>& record, impl::SnapshotRecordFreeList<T>& free_list) noexcept
135 : record_(&record), free_list_(&free_list) {}
136
137 impl::SnapshotRecord<T>* record_;
138 impl::SnapshotRecordFreeList<T>* free_list_;
139};
140
141/// @brief Destroys retired objects synchronously.
142/// @see rcu::DefaultRcuTraits
143struct SyncDeleter final {
144 template <typename T>
145 void Delete(SnapshotHandle<T>&& handle) noexcept {
146 [[maybe_unused]] const auto for_deletion = std::move(handle);
147 }
148};
149
150/// @brief Destroys retired objects asynchronously in the same `TaskProcessor`.
151/// @see rcu::DefaultRcuTraits
152class AsyncDeleter final {
153public:
154 ~AsyncDeleter() { wait_token_storage_.WaitForAllTokens(); }
155
156 template <typename T>
157 void Delete(SnapshotHandle<T>&& handle) noexcept {
158 if constexpr (std::is_trivially_destructible_v<T> || std::is_same_v<T, std::string>) {
159 SyncDeleter{}.Delete(std::move(handle));
160 } else {
161 try {
162 engine::DetachUnscopedUnsafe(engine::CriticalAsyncNoSpan(
163 // The order of captures is important, 'handle' must be destroyed before 'token'.
164 [token = wait_token_storage_.GetToken(), handle = std::move(handle)]() mutable {}
165 ));
166 // NOLINTNEXTLINE(bugprone-empty-catch)
167 } catch (...) {
168 // Task creation somehow failed.
169 // `handle` will be destroyed synchronously, because it is already moved
170 // into the task's lambda.
171 }
172 }
173 }
174
175private:
176 utils::impl::WaitTokenStorage wait_token_storage_;
177};
178
179/// @brief Default RCU traits. Deletes garbage asynchronously.
180/// Designed for storing data of multi-megabyte or multi-gigabyte caches.
181/// @note Allows reads from any kind of thread.
182/// Only allows writes from coroutine threads.
183/// @see rcu::Variable
184/// @see rcu::SyncRcuTraits
185/// @see rcu::BlockingRcuTraits
187 /// `MutexType` is a writer's mutex type that has to be used to protect
188 /// structure on update.
189 using MutexType = engine::Mutex;
190
191 /// `DeleterType` is used to delete retired objects. It should:
192 /// 1. should contain `void Delete(SnapshotHandle<T>) noexcept`;
193 /// 2. force synchronous cleanup of remaining handles on destruction.
194 using DeleterType = AsyncDeleter;
195};
196
197/// @brief Deletes garbage synchronously.
198/// Designed for storing small amounts of data with relatively fast destructors.
199/// @note Allows reads from any kind of thread.
200/// Only allows writes from coroutine threads.
201/// @see rcu::DefaultRcuTraits
203 using DeleterType = SyncDeleter;
204};
205
206/// @brief Rcu traits for using outside of coroutines.
207/// @note Allows reads from any kind of thread.
208/// Only allows writes from NON-coroutine threads.
209/// @warning Blocks writing threads which are coroutines, which can cause
210/// deadlocks and hangups.
211/// @see rcu::DefaultRcuTraits
213 using MutexType = std::mutex;
214 using DeleterType = SyncDeleter;
215};
216
217/// @brief Rcu traits that only allow a single writer.
218/// Detects race conditions when multiple writers call `Assign` concurrently.
219/// @note Allows reads and writes from any kind of thread.
220/// @see rcu::DefaultRcuTraits
222 using MutexType = impl::ExclusiveMutex;
223 using DeleterType = SyncDeleter;
224};
225
226/// Reader smart pointer for rcu::Variable<T>. You may use operator*() or
227/// operator->() to do something with the stored value. Once created,
228/// ReadablePtr references the same immutable value: if Variable's value is
229/// changed during ReadablePtr lifetime, it will not affect value referenced by
230/// ReadablePtr.
231template <typename T, typename RcuTraits>
232class [[nodiscard]] ReadablePtr final {
233public:
234 explicit ReadablePtr(const Variable<T, RcuTraits>& ptr) {
235 auto* record = ptr.current_.load();
236
237 while (true) {
238 // Lock 'record', which may or may not be 'current_' by the time we got
239 // there.
240 lock_ = record->indicator.GetLock();
241
242 // seq_cst is required for indicator.Lock in the following case.
243 //
244 // Reader thread point-of-view:
245 // 1. [reader] load current_
246 // 2. [reader] indicator.Lock
247 // 3. [reader] load current_
248 // 4. [writer] store current_
249 // 5. [writer] indicator.IsFree
250 //
251 // Given seq_cst only on (3), (4), and (5), the writer can see
252 // (2) after (5). In this case the reader will think that it has
253 // successfully taken the lock, which is false.
254 //
255 // So we need seq_cst on all of (2), (3), (4), (5). Making (2) seq_cst is
256 // somewhat expensive, but this is a well-known cost of hazard pointers.
257 //
258 // The seq_cst cost can be mitigated by utilizing asymmetric fences.
259 // This asymmetric fence effectively grants std::memory_order_seq_cst
260 // to indicator.Lock when applied together with AsymmetricThreadFenceHeavy
261 // in (5). The technique is taken from Folly HazPtr.
262 concurrent::impl::AsymmetricThreadFenceLight();
263
264 // Is the record we locked 'current_'? If so, congratulations, we are
265 // holding a lock to 'current_'.
266 auto* new_current = ptr.current_.load(std::memory_order_seq_cst);
267 if (new_current == record) {
268 break;
269 }
270
271 // 'current_' changed, try again
272 record = new_current;
273 }
274
275 ptr_ = &*record->data;
276 }
277
278 ReadablePtr(ReadablePtr&& other) noexcept = default;
279 ReadablePtr& operator=(ReadablePtr&& other) noexcept = default;
280 ReadablePtr(const ReadablePtr& other) = default;
281 ReadablePtr& operator=(const ReadablePtr& other) = default;
282 ~ReadablePtr() = default;
283
284 const T* Get() const& {
285 UASSERT(ptr_);
286 return ptr_;
287 }
288
289 const T* Get() && { return GetOnRvalue(); }
290
291 const T* operator->() const& { return Get(); }
292 const T* operator->() && { return GetOnRvalue(); }
293
294 const T& operator*() const& { return *Get(); }
295 const T& operator*() && { return *GetOnRvalue(); }
296
297private:
298 const T* GetOnRvalue() {
299 static_assert(!sizeof(T), "Don't use temporary ReadablePtr, store it to a variable");
300 std::abort();
301 }
302
303 const T* ptr_;
304 concurrent::impl::StripedReadIndicatorLock lock_;
305};
306
307/// Smart pointer for rcu::Variable<T> for changing RCU value. It stores a
308/// reference to a to-be-changed value and allows one to mutate the value (e.g.
309/// add items to std::unordered_map). Changed value is not visible to readers
310/// until explicit store by Commit. Only a single writer may own a WritablePtr
311/// associated with the same Variable, so WritablePtr creates a critical
312/// section. This critical section doesn't affect readers, so a slow writer
313/// doesn't block readers.
314/// @note you may not pass WritablePtr between coroutines as it owns
315/// engine::Mutex, which must be unlocked in the same coroutine that was used to
316/// lock the mutex.
317template <typename T, typename RcuTraits>
318class [[nodiscard]] WritablePtr final {
319public:
320 /// @cond
321 // For internal use only. Use `var.StartWrite()` instead
322 explicit WritablePtr(Variable<T, RcuTraits>& var)
323 : var_(var),
324 lock_(var.mutex_),
325 record_(&var.EmplaceSnapshot(*var.current_.load()->data))
326 {}
327
328 // For internal use only. Use `var.Emplace(args...)` instead
329 template <typename... Args>
330 WritablePtr(Variable<T, RcuTraits>& var, std::in_place_t, Args&&... initial_value_args)
331 : var_(var),
332 lock_(var.mutex_),
333 record_(&var.EmplaceSnapshot(std::forward<Args>(initial_value_args)...))
334 {}
335 /// @endcond
336
337 WritablePtr(WritablePtr&& other) noexcept
338 : var_(other.var_), lock_(std::move(other.lock_)), record_(std::exchange(other.record_, nullptr)) {}
339
340 ~WritablePtr() {
341 if (record_) {
342 var_.DeleteSnapshot(*record_);
343 }
344 }
345
346 /// Store the changed value in Variable. After Commit() the value becomes
347 /// visible to new readers (IOW, Variable::Read() returns ReadablePtr
348 /// referencing the stored value, not an old value).
349 void Commit() {
350 UASSERT(record_ != nullptr);
351 var_.DoAssign(*std::exchange(record_, nullptr), lock_);
352 lock_.unlock();
353 }
354
355 T* Get() & {
356 UASSERT(record_ != nullptr);
357 return &*record_->data;
358 }
359
360 T* Get() && { return GetOnRvalue(); }
361
362 T* operator->() & { return Get(); }
363 T* operator->() && { return GetOnRvalue(); }
364
365 T& operator*() & { return *Get(); }
366 T& operator*() && { return *GetOnRvalue(); }
367
368private:
369 [[noreturn]] static T* GetOnRvalue() {
370 static_assert(!sizeof(T), "Don't use temporary WritablePtr, store it to a variable");
371 std::abort();
372 }
373
374 Variable<T, RcuTraits>& var_;
375 std::unique_lock<typename RcuTraits::MutexType> lock_;
376 impl::SnapshotRecord<T>* record_;
377};
378
379/// @ingroup userver_concurrency userver_containers
380///
381/// @brief Read-Copy-Update variable
382///
383/// @see Based on ideas from
384/// http://www.drdobbs.com/lock-free-data-structures-with-hazard-po/184401890
385/// with modified API.
386///
387/// A variable with MT-access pattern "very often reads, seldom writes". It is
388/// specially optimized for reads. On read, one obtains a ReaderPtr<T> from it
389/// and uses the obtained value as long as it wants to. On write, one obtains a
390/// WritablePtr<T> with a copy of the last version of the value, makes some
391/// changes to it, and commits the result to update current variable value (does
392/// Read-Copy-Update). Old version of the value is not freed on update, it will
393/// be eventually freed when a subsequent writer identifies that nobody works
394/// with this version.
395///
396/// @note There is no way to create a "null" `Variable`.
397///
398/// ## Example usage:
399///
400/// @snippet rcu/rcu_test.cpp Sample rcu::Variable usage
401///
402/// @see @ref scripts/docs/en/userver/synchronization.md
403///
404/// @tparam T the stored value
405/// @tparam RcuTraits traits, should inherit from rcu::DefaultRcuTraits
406template <typename T, typename RcuTraits>
407class Variable final {
408 static_assert(
409 std::is_base_of_v<DefaultRcuTraits, RcuTraits>,
410 "RcuTraits should publicly inherit from rcu::DefaultRcuTraits"
411 );
412
413public:
414 using MutexType = typename RcuTraits::MutexType;
415 using DeleterType = typename RcuTraits::DeleterType;
416
417 /// @brief Create a new `Variable` with an in-place constructed initial value.
418 /// @param initial_value_args arguments passed to the constructor of the
419 /// initial value
420 template <typename... Args>
421 // TODO make explicit
422 Variable(Args&&... initial_value_args)
423 : current_(&EmplaceSnapshot(std::forward<Args>(initial_value_args)...))
424 {}
425
426 Variable(const Variable&) = delete;
427 Variable(Variable&&) = delete;
428 Variable& operator=(const Variable&) = delete;
429 Variable& operator=(Variable&&) = delete;
430
431 ~Variable() {
432 {
433 auto* record = current_.load();
434 UASSERT_MSG(record->indicator.IsFree(), "RCU variable is destroyed while being used");
435 delete record;
436 }
437
438 retired_list_.RemoveAndDisposeIf(
439 [](impl::SnapshotRecord<T>&) { return true; },
440 [](impl::SnapshotRecord<T>& record) {
441 UASSERT_MSG(record.indicator.IsFree(), "RCU variable is destroyed while being used");
442 delete &record;
443 }
444 );
445 }
446
447 /// Obtain a smart pointer which can be used to read the current value.
448 ReadablePtr<T, RcuTraits> Read() const { return ReadablePtr<T, RcuTraits>(*this); }
449
450 /// Obtain a copy of contained value.
451 T ReadCopy() const {
452 auto ptr = Read();
453 return *ptr;
454 }
455
456 /// Obtain a smart pointer that will *copy* the current value. The pointer can
457 /// be used to make changes to the value and to set the `Variable` to the
458 /// changed value.
459 WritablePtr<T, RcuTraits> StartWrite() { return WritablePtr<T, RcuTraits>(*this); }
460
461 /// Obtain a smart pointer to a newly in-place constructed value, but does
462 /// not replace the current one yet (in contrast with regular `Emplace`).
463 template <typename... Args>
464 WritablePtr<T, RcuTraits> StartWriteEmplace(Args&&... args) {
465 return WritablePtr<T, RcuTraits>(*this, std::in_place, std::forward<Args>(args)...);
466 }
467
468 /// Replaces the `Variable`'s value with the provided one.
469 void Assign(T new_value) { WritablePtr<T, RcuTraits>(*this, std::in_place, std::move(new_value)).Commit(); }
470
471 /// Replaces the `Variable`'s value with an in-place constructed one.
472 template <typename... Args>
473 void Emplace(Args&&... args) {
474 WritablePtr<T, RcuTraits>(*this, std::in_place, std::forward<Args>(args)...).Commit();
475 }
476
477 void Cleanup() {
478 std::unique_lock lock(mutex_, std::try_to_lock);
479 if (!lock.owns_lock()) {
480 // Someone is already assigning to the RCU. They will call ScanRetireList
481 // in the process.
482 return;
483 }
484 ScanRetiredList(lock);
485 }
486
487private:
488 friend class ReadablePtr<T, RcuTraits>;
489 friend class WritablePtr<T, RcuTraits>;
490
491 void DoAssign(impl::SnapshotRecord<T>& new_snapshot, std::unique_lock<MutexType>& lock) {
492 UASSERT(lock.owns_lock());
493
494 // Note: exchange RMW operation would not give any benefits here.
495 auto* const old_snapshot = current_.load();
496 current_.store(&new_snapshot, std::memory_order_seq_cst);
497
498 UASSERT(old_snapshot);
499 retired_list_.Push(*old_snapshot);
500 ScanRetiredList(lock);
501 }
502
503 template <typename... Args>
504 [[nodiscard]] impl::SnapshotRecord<T>& EmplaceSnapshot(Args&&... args) {
505 auto* const free_list_record = free_list_.list.TryPop();
506 auto& record = free_list_record ? *free_list_record : *new impl::SnapshotRecord<T>{};
507 UASSERT(!record.data);
508
509 try {
510 record.data.emplace(std::forward<Args>(args)...);
511 } catch (...) {
512 free_list_.list.Push(record);
513 throw;
514 }
515
516 return record;
517 }
518
519 void ScanRetiredList(std::unique_lock<MutexType>& lock) noexcept {
520 UASSERT(lock.owns_lock());
521 if (retired_list_.IsEmpty()) {
522 return;
523 }
524
525 concurrent::impl::AsymmetricThreadFenceHeavy();
526
527 retired_list_.RemoveAndDisposeIf(
528 [](impl::SnapshotRecord<T>& record) { return record.indicator.IsFree(); },
529 [&](impl::SnapshotRecord<T>& record) { DeleteSnapshot(record); }
530 );
531 }
532
533 void DeleteSnapshot(impl::SnapshotRecord<T>& record) noexcept {
534 static_assert(
535 noexcept(deleter_.Delete(SnapshotHandle<T>{record, free_list_})),
536 "DeleterType::Delete must be noexcept"
537 );
538 deleter_.Delete(SnapshotHandle<T>{record, free_list_});
539 }
540
541 // Covers current_ writes, free_list_.Pop, retired_list_
542 MutexType mutex_{};
543 impl::SnapshotRecordFreeList<T> free_list_;
544 impl::SnapshotRecordRetiredList<T> retired_list_;
545 // Must be placed after 'free_list_' to force sync cleanup before
546 // the destruction of free_list_.
547 DeleterType deleter_{};
548 // Must be placed after 'free_list_' and 'deleter_' so that if
549 // the initialization of current_ throws, it can be disposed properly.
550 std::atomic<impl::SnapshotRecord<T>*> current_;
551};
552
553} // namespace rcu
554
555USERVER_NAMESPACE_END