userver: userver/rcu/rcu.hpp Source File
Loading...
Searching...
No Matches
rcu.hpp
Go to the documentation of this file.
1#pragma once
2
3/// @file userver/rcu/rcu.hpp
4/// @brief @copybrief rcu::Variable
5
6#include <atomic>
7#include <cstdlib>
8#include <mutex>
9#include <optional>
10#include <utility>
11
12#include <userver/concurrent/impl/asymmetric_fence.hpp>
13#include <userver/concurrent/impl/intrusive_hooks.hpp>
14#include <userver/concurrent/impl/intrusive_stack.hpp>
15#include <userver/concurrent/impl/striped_read_indicator.hpp>
16#include <userver/engine/async.hpp>
17#include <userver/engine/mutex.hpp>
18#include <userver/rcu/fwd.hpp>
19#include <userver/utils/assert.hpp>
20#include <userver/utils/impl/wait_token_storage.hpp>
21
22USERVER_NAMESPACE_BEGIN
23
24/// @brief Read-Copy-Update
25///
26/// @see Based on ideas from
27/// http://www.drdobbs.com/lock-free-data-structures-with-hazard-po/184401890
28/// with modified API
29namespace rcu {
30
31namespace impl {
32
33template <typename T>
34struct SnapshotRecord final {
35 std::optional<T> data;
36 concurrent::impl::StripedReadIndicator indicator;
37 concurrent::impl::SinglyLinkedHook<SnapshotRecord> free_list_hook;
38 SnapshotRecord* next_retired{nullptr};
39};
40
41// Used instead of concurrent::impl::MemberHook to avoid instantiating
42// SnapshotRecord<T> ahead of time.
43template <typename T>
44struct FreeListHookGetter {
45 auto& operator()(SnapshotRecord<T>& node) const noexcept { return node.free_list_hook; }
46};
47
48template <typename T>
49struct SnapshotRecordFreeList {
50 SnapshotRecordFreeList() = default;
51
52 ~SnapshotRecordFreeList() {
53 list.DisposeUnsafe([](SnapshotRecord<T>& record) { delete &record; });
54 }
55
56 concurrent::impl::IntrusiveStack<SnapshotRecord<T>, FreeListHookGetter<T>> list;
57};
58
59template <typename T>
60class SnapshotRecordRetiredList final {
61public:
62 SnapshotRecordRetiredList() = default;
63
64 bool IsEmpty() const noexcept { return head_ == nullptr; }
65
66 void Push(SnapshotRecord<T>& record) noexcept {
67 record.next_retired = head_;
68 head_ = &record;
69 }
70
71 template <typename Predicate, typename Disposer>
72 void RemoveAndDisposeIf(Predicate predicate, Disposer disposer) {
73 SnapshotRecord<T>** ptr_to_current = &head_;
74
75 while (*ptr_to_current != nullptr) {
76 SnapshotRecord<T>* const current = *ptr_to_current;
77
78 if (predicate(*current)) {
79 *ptr_to_current = std::exchange(current->next_retired, nullptr);
80 disposer(*current);
81 } else {
82 ptr_to_current = &current->next_retired;
83 }
84 }
85 }
86
87private:
88 SnapshotRecord<T>* head_{nullptr};
89};
90
91} // namespace impl
92
93/// @brief A handle to the retired object version, which an RCU deleter should
94/// clean up.
95/// @see rcu::DefaultRcuTraits
96template <typename T>
97class SnapshotHandle final {
98public:
99 SnapshotHandle(SnapshotHandle&& other) noexcept
100 : record_(std::exchange(other.record_, nullptr)), free_list_(std::exchange(other.free_list_, nullptr)) {}
101
102 ~SnapshotHandle() {
103 if (record_ != nullptr) {
104 UASSERT(free_list_ != nullptr);
105 record_->data.reset();
106 free_list_->list.Push(*record_);
107 }
108 }
109
110private:
111 template <typename /*T*/, typename Traits>
112 friend class Variable;
113
114 template <typename /*T*/, typename Traits>
115 friend class WritablePtr;
116
117 explicit SnapshotHandle(impl::SnapshotRecord<T>& record, impl::SnapshotRecordFreeList<T>& free_list) noexcept
118 : record_(&record), free_list_(&free_list) {}
119
120 impl::SnapshotRecord<T>* record_;
121 impl::SnapshotRecordFreeList<T>* free_list_;
122};
123
124/// @brief Destroys retired objects synchronously.
125/// @see rcu::DefaultRcuTraits
126struct SyncDeleter final {
127 template <typename T>
128 void Delete(SnapshotHandle<T>&& handle) noexcept {
129 [[maybe_unused]] const auto for_deletion = std::move(handle);
130 }
131};
132
133/// @brief Destroys retired objects asynchronously in the same `TaskProcessor`.
134/// @see rcu::DefaultRcuTraits
135class AsyncDeleter final {
136public:
137 ~AsyncDeleter() { wait_token_storage_.WaitForAllTokens(); }
138
139 template <typename T>
140 void Delete(SnapshotHandle<T>&& handle) noexcept {
141 if constexpr (std::is_trivially_destructible_v<T> || std::is_same_v<T, std::string>) {
142 SyncDeleter{}.Delete(std::move(handle));
143 } else {
144 try {
145 engine::CriticalAsyncNoSpan(
146 // The order of captures is important, 'handle' must be destroyed before 'token'.
147 [token = wait_token_storage_.GetToken(), handle = std::move(handle)]() mutable {}
148 ).Detach();
149 // NOLINTNEXTLINE(bugprone-empty-catch)
150 } catch (...) {
151 // Task creation somehow failed.
152 // `handle` will be destroyed synchronously, because it is already moved
153 // into the task's lambda.
154 }
155 }
156 }
157
158private:
159 utils::impl::WaitTokenStorage wait_token_storage_;
160};
161
162/// @brief Default RCU traits. Deletes garbage asynchronously.
163/// Designed for storing data of multi-megabyte or multi-gigabyte caches.
164/// @note Allows reads from any kind of thread.
165/// Only allows writes from coroutine threads.
166/// @see rcu::Variable
167/// @see rcu::SyncRcuTraits
168/// @see rcu::BlockingRcuTraits
170 /// `MutexType` is a writer's mutex type that has to be used to protect
171 /// structure on update.
172 using MutexType = engine::Mutex;
173
174 /// `DeleterType` is used to delete retired objects. It should:
175 /// 1. should contain `void Delete(SnapshotHandle<T>) noexcept`;
176 /// 2. force synchronous cleanup of remaining handles on destruction.
177 using DeleterType = AsyncDeleter;
178};
179
180/// @brief Deletes garbage synchronously.
181/// Designed for storing small amounts of data with relatively fast destructors.
182/// @note Allows reads from any kind of thread.
183/// Only allows writes from coroutine threads.
184/// @see rcu::DefaultRcuTraits
186 using DeleterType = SyncDeleter;
187};
188
189/// @brief Rcu traits for using outside of coroutines.
190/// @note Allows reads from any kind of thread.
191/// Only allows writes from NON-coroutine threads.
192/// @warning Blocks writing threads which are coroutines, which can cause
193/// deadlocks and hangups.
194/// @see rcu::DefaultRcuTraits
196 using MutexType = std::mutex;
197 using DeleterType = SyncDeleter;
198};
199
200/// Reader smart pointer for rcu::Variable<T>. You may use operator*() or
201/// operator->() to do something with the stored value. Once created,
202/// ReadablePtr references the same immutable value: if Variable's value is
203/// changed during ReadablePtr lifetime, it will not affect value referenced by
204/// ReadablePtr.
205template <typename T, typename RcuTraits>
206class [[nodiscard]] ReadablePtr final {
207public:
208 explicit ReadablePtr(const Variable<T, RcuTraits>& ptr) {
209 auto* record = ptr.current_.load();
210
211 while (true) {
212 // Lock 'record', which may or may not be 'current_' by the time we got
213 // there.
214 lock_ = record->indicator.Lock();
215
216 // seq_cst is required for indicator.Lock in the following case.
217 //
218 // Reader thread point-of-view:
219 // 1. [reader] load current_
220 // 2. [reader] indicator.Lock
221 // 3. [reader] load current_
222 // 4. [writer] store current_
223 // 5. [writer] indicator.IsFree
224 //
225 // Given seq_cst only on (3), (4), and (5), the writer can see
226 // (2) after (5). In this case the reader will think that it has
227 // successfully taken the lock, which is false.
228 //
229 // So we need seq_cst on all of (2), (3), (4), (5). Making (2) seq_cst is
230 // somewhat expensive, but this is a well-known cost of hazard pointers.
231 //
232 // The seq_cst cost can be mitigated by utilizing asymmetric fences.
233 // This asymmetric fence effectively grants std::memory_order_seq_cst
234 // to indicator.Lock when applied together with AsymmetricThreadFenceHeavy
235 // in (5). The technique is taken from Folly HazPtr.
236 concurrent::impl::AsymmetricThreadFenceLight();
237
238 // Is the record we locked 'current_'? If so, congratulations, we are
239 // holding a lock to 'current_'.
240 auto* new_current = ptr.current_.load(std::memory_order_seq_cst);
241 if (new_current == record) break;
242
243 // 'current_' changed, try again
244 record = new_current;
245 }
246
247 ptr_ = &*record->data;
248 }
249
250 ReadablePtr(ReadablePtr&& other) noexcept = default;
251 ReadablePtr& operator=(ReadablePtr&& other) noexcept = default;
252 ReadablePtr(const ReadablePtr& other) = default;
253 ReadablePtr& operator=(const ReadablePtr& other) = default;
254 ~ReadablePtr() = default;
255
256 const T* Get() const& {
257 UASSERT(ptr_);
258 return ptr_;
259 }
260
261 const T* Get() && { return GetOnRvalue(); }
262
263 const T* operator->() const& { return Get(); }
264 const T* operator->() && { return GetOnRvalue(); }
265
266 const T& operator*() const& { return *Get(); }
267 const T& operator*() && { return *GetOnRvalue(); }
268
269private:
270 const T* GetOnRvalue() {
271 static_assert(!sizeof(T), "Don't use temporary ReadablePtr, store it to a variable");
272 std::abort();
273 }
274
275 const T* ptr_;
276 concurrent::impl::StripedReadIndicatorLock lock_;
277};
278
279/// Smart pointer for rcu::Variable<T> for changing RCU value. It stores a
280/// reference to a to-be-changed value and allows one to mutate the value (e.g.
281/// add items to std::unordered_map). Changed value is not visible to readers
282/// until explicit store by Commit. Only a single writer may own a WritablePtr
283/// associated with the same Variable, so WritablePtr creates a critical
284/// section. This critical section doesn't affect readers, so a slow writer
285/// doesn't block readers.
286/// @note you may not pass WritablePtr between coroutines as it owns
287/// engine::Mutex, which must be unlocked in the same coroutine that was used to
288/// lock the mutex.
289template <typename T, typename RcuTraits>
290class [[nodiscard]] WritablePtr final {
291public:
292 /// @cond
293 // For internal use only. Use `var.StartWrite()` instead
294 explicit WritablePtr(Variable<T, RcuTraits>& var)
295 : var_(var), lock_(var.mutex_), record_(&var.EmplaceSnapshot(*var.current_.load()->data)) {}
296
297 // For internal use only. Use `var.Emplace(args...)` instead
298 template <typename... Args>
299 WritablePtr(Variable<T, RcuTraits>& var, std::in_place_t, Args&&... initial_value_args)
300 : var_(var), lock_(var.mutex_), record_(&var.EmplaceSnapshot(std::forward<Args>(initial_value_args)...)) {}
301 /// @endcond
302
303 WritablePtr(WritablePtr&& other) noexcept
304 : var_(other.var_), lock_(std::move(other.lock_)), record_(std::exchange(other.record_, nullptr)) {}
305
306 ~WritablePtr() {
307 if (record_) {
308 var_.DeleteSnapshot(*record_);
309 }
310 }
311
312 /// Store the changed value in Variable. After Commit() the value becomes
313 /// visible to new readers (IOW, Variable::Read() returns ReadablePtr
314 /// referencing the stored value, not an old value).
315 void Commit() {
316 UASSERT(record_ != nullptr);
317 var_.DoAssign(*std::exchange(record_, nullptr), lock_);
318 lock_.unlock();
319 }
320
321 T* Get() & {
322 UASSERT(record_ != nullptr);
323 return &*record_->data;
324 }
325
326 T* Get() && { return GetOnRvalue(); }
327
328 T* operator->() & { return Get(); }
329 T* operator->() && { return GetOnRvalue(); }
330
331 T& operator*() & { return *Get(); }
332 T& operator*() && { return *GetOnRvalue(); }
333
334private:
335 [[noreturn]] static T* GetOnRvalue() {
336 static_assert(!sizeof(T), "Don't use temporary WritablePtr, store it to a variable");
337 std::abort();
338 }
339
340 Variable<T, RcuTraits>& var_;
341 std::unique_lock<typename RcuTraits::MutexType> lock_;
342 impl::SnapshotRecord<T>* record_;
343};
344
345/// @ingroup userver_concurrency userver_containers
346///
347/// @brief Read-Copy-Update variable
348///
349/// @see Based on ideas from
350/// http://www.drdobbs.com/lock-free-data-structures-with-hazard-po/184401890
351/// with modified API.
352///
353/// A variable with MT-access pattern "very often reads, seldom writes". It is
354/// specially optimized for reads. On read, one obtains a ReaderPtr<T> from it
355/// and uses the obtained value as long as it wants to. On write, one obtains a
356/// WritablePtr<T> with a copy of the last version of the value, makes some
357/// changes to it, and commits the result to update current variable value (does
358/// Read-Copy-Update). Old version of the value is not freed on update, it will
359/// be eventually freed when a subsequent writer identifies that nobody works
360/// with this version.
361///
362/// @note There is no way to create a "null" `Variable`.
363///
364/// ## Example usage:
365///
366/// @snippet rcu/rcu_test.cpp Sample rcu::Variable usage
367///
368/// @see @ref scripts/docs/en/userver/synchronization.md
369///
370/// @tparam T the stored value
371/// @tparam RcuTraits traits, should inherit from rcu::DefaultRcuTraits
372template <typename T, typename RcuTraits>
373class Variable final {
374 static_assert(
375 std::is_base_of_v<DefaultRcuTraits, RcuTraits>,
376 "RcuTraits should publicly inherit from rcu::DefaultRcuTraits"
377 );
378
379public:
380 using MutexType = typename RcuTraits::MutexType;
381 using DeleterType = typename RcuTraits::DeleterType;
382
383 /// @brief Create a new `Variable` with an in-place constructed initial value.
384 /// @param initial_value_args arguments passed to the constructor of the
385 /// initial value
386 template <typename... Args>
387 // TODO make explicit
388 Variable(Args&&... initial_value_args) : current_(&EmplaceSnapshot(std::forward<Args>(initial_value_args)...)) {}
389
390 Variable(const Variable&) = delete;
391 Variable(Variable&&) = delete;
392 Variable& operator=(const Variable&) = delete;
393 Variable& operator=(Variable&&) = delete;
394
395 ~Variable() {
396 {
397 auto* record = current_.load();
398 UASSERT_MSG(record->indicator.IsFree(), "RCU variable is destroyed while being used");
399 delete record;
400 }
401
402 retired_list_.RemoveAndDisposeIf(
403 [](impl::SnapshotRecord<T>&) { return true; },
404 [](impl::SnapshotRecord<T>& record) {
405 UASSERT_MSG(record.indicator.IsFree(), "RCU variable is destroyed while being used");
406 delete &record;
407 }
408 );
409 }
410
411 /// Obtain a smart pointer which can be used to read the current value.
412 ReadablePtr<T, RcuTraits> Read() const { return ReadablePtr<T, RcuTraits>(*this); }
413
414 /// Obtain a copy of contained value.
415 T ReadCopy() const {
416 auto ptr = Read();
417 return *ptr;
418 }
419
420 /// Obtain a smart pointer that will *copy* the current value. The pointer can
421 /// be used to make changes to the value and to set the `Variable` to the
422 /// changed value.
423 WritablePtr<T, RcuTraits> StartWrite() { return WritablePtr<T, RcuTraits>(*this); }
424
425 /// Obtain a smart pointer to a newly in-place constructed value, but does
426 /// not replace the current one yet (in contrast with regular `Emplace`).
427 template <typename... Args>
428 WritablePtr<T, RcuTraits> StartWriteEmplace(Args&&... args) {
429 return WritablePtr<T, RcuTraits>(*this, std::in_place, std::forward<Args>(args)...);
430 }
431
432 /// Replaces the `Variable`'s value with the provided one.
433 void Assign(T new_value) { WritablePtr<T, RcuTraits>(*this, std::in_place, std::move(new_value)).Commit(); }
434
435 /// Replaces the `Variable`'s value with an in-place constructed one.
436 template <typename... Args>
437 void Emplace(Args&&... args) {
438 WritablePtr<T, RcuTraits>(*this, std::in_place, std::forward<Args>(args)...).Commit();
439 }
440
441 void Cleanup() {
442 std::unique_lock lock(mutex_, std::try_to_lock);
443 if (!lock.owns_lock()) {
444 // Someone is already assigning to the RCU. They will call ScanRetireList
445 // in the process.
446 return;
447 }
448 ScanRetiredList(lock);
449 }
450
451private:
452 friend class ReadablePtr<T, RcuTraits>;
453 friend class WritablePtr<T, RcuTraits>;
454
455 void DoAssign(impl::SnapshotRecord<T>& new_snapshot, std::unique_lock<MutexType>& lock) {
456 UASSERT(lock.owns_lock());
457
458 // Note: exchange RMW operation would not give any benefits here.
459 auto* const old_snapshot = current_.load();
460 current_.store(&new_snapshot, std::memory_order_seq_cst);
461
462 UASSERT(old_snapshot);
463 retired_list_.Push(*old_snapshot);
464 ScanRetiredList(lock);
465 }
466
467 template <typename... Args>
468 [[nodiscard]] impl::SnapshotRecord<T>& EmplaceSnapshot(Args&&... args) {
469 auto* const free_list_record = free_list_.list.TryPop();
470 auto& record = free_list_record ? *free_list_record : *new impl::SnapshotRecord<T>{};
471 UASSERT(!record.data);
472
473 try {
474 record.data.emplace(std::forward<Args>(args)...);
475 } catch (...) {
476 free_list_.list.Push(record);
477 throw;
478 }
479
480 return record;
481 }
482
483 void ScanRetiredList(std::unique_lock<MutexType>& lock) noexcept {
484 UASSERT(lock.owns_lock());
485 if (retired_list_.IsEmpty()) return;
486
487 concurrent::impl::AsymmetricThreadFenceHeavy();
488
489 retired_list_.RemoveAndDisposeIf(
490 [](impl::SnapshotRecord<T>& record) { return record.indicator.IsFree(); },
491 [&](impl::SnapshotRecord<T>& record) { DeleteSnapshot(record); }
492 );
493 }
494
495 void DeleteSnapshot(impl::SnapshotRecord<T>& record) noexcept {
496 static_assert(
497 noexcept(deleter_.Delete(SnapshotHandle<T>{record, free_list_})), "DeleterType::Delete must be noexcept"
498 );
499 deleter_.Delete(SnapshotHandle<T>{record, free_list_});
500 }
501
502 // Covers current_ writes, free_list_.Pop, retired_list_
503 MutexType mutex_{};
504 impl::SnapshotRecordFreeList<T> free_list_;
505 impl::SnapshotRecordRetiredList<T> retired_list_;
506 // Must be placed after 'free_list_' to force sync cleanup before
507 // the destruction of free_list_.
508 DeleterType deleter_{};
509 // Must be placed after 'free_list_' and 'deleter_' so that if
510 // the initialization of current_ throws, it can be disposed properly.
511 std::atomic<impl::SnapshotRecord<T>*> current_;
512};
513
514} // namespace rcu
515
516USERVER_NAMESPACE_END