userver: userver/rcu/rcu.hpp Source File
Loading...
Searching...
No Matches
rcu.hpp
Go to the documentation of this file.
1#pragma once
2
3/// @file userver/rcu/rcu.hpp
4/// @brief @copybrief rcu::Variable
5
6#include <atomic>
7#include <cstdlib>
8#include <mutex>
9#include <optional>
10#include <utility>
11
12#include <userver/compiler/impl/lifetime.hpp>
13#include <userver/concurrent/impl/asymmetric_fence.hpp>
14#include <userver/concurrent/impl/intrusive_hooks.hpp>
15#include <userver/concurrent/impl/intrusive_stack.hpp>
16#include <userver/concurrent/impl/striped_read_indicator.hpp>
17#include <userver/engine/async.hpp>
18#include <userver/engine/mutex.hpp>
19#include <userver/rcu/fwd.hpp>
20#include <userver/utils/assert.hpp>
21#include <userver/utils/impl/wait_token_storage.hpp>
22
23USERVER_NAMESPACE_BEGIN
24
25/// @brief Read-Copy-Update
26///
27/// @see Based on ideas from
28/// http://www.drdobbs.com/lock-free-data-structures-with-hazard-po/184401890
29/// with modified API
30namespace rcu {
31
32namespace impl {
33
34template <typename T>
35struct SnapshotRecord final {
36 std::optional<T> data;
37 concurrent::impl::StripedReadIndicator indicator;
38 concurrent::impl::SinglyLinkedHook<SnapshotRecord> free_list_hook;
39 SnapshotRecord* next_retired{nullptr};
40};
41
42// Used instead of concurrent::impl::MemberHook to avoid instantiating
43// SnapshotRecord<T> ahead of time.
44template <typename T>
45struct FreeListHookGetter {
46 static auto& GetHook(SnapshotRecord<T>& node) noexcept { return node.free_list_hook; }
47};
48
49template <typename T>
50struct SnapshotRecordFreeList {
51 SnapshotRecordFreeList() = default;
52
53 ~SnapshotRecordFreeList() {
54 list.DisposeUnsafe([](SnapshotRecord<T>& record) { delete &record; });
55 }
56
57 concurrent::impl::IntrusiveStack<SnapshotRecord<T>, FreeListHookGetter<T>> list;
58};
59
60template <typename T>
61class SnapshotRecordRetiredList final {
62public:
63 SnapshotRecordRetiredList() = default;
64
65 bool IsEmpty() const noexcept { return head_ == nullptr; }
66
67 void Push(SnapshotRecord<T>& record) noexcept {
68 record.next_retired = head_;
69 head_ = &record;
70 }
71
72 template <typename Predicate, typename Disposer>
73 void RemoveAndDisposeIf(Predicate predicate, Disposer disposer) {
74 SnapshotRecord<T>** ptr_to_current = &head_;
75
76 while (*ptr_to_current != nullptr) {
77 SnapshotRecord<T>* const current = *ptr_to_current;
78
79 if (predicate(*current)) {
80 *ptr_to_current = std::exchange(current->next_retired, nullptr);
81 disposer(*current);
82 } else {
83 ptr_to_current = &current->next_retired;
84 }
85 }
86 }
87
88private:
89 SnapshotRecord<T>* head_{nullptr};
90};
91
92class ExclusiveMutex final {
93public:
94 void lock() {
95 const bool was_locked = is_locked_.exchange(true);
97 !was_locked,
98 "Detected a race condition when multiple writers Assign to an rcu::Variable concurrently. The value that "
99 "will remain in rcu::Variable when the dust settles is unspecified."
100 );
101 }
102
103 void unlock() noexcept { is_locked_.store(false); }
104
105private:
106 std::atomic<bool> is_locked_{false};
107};
108
109} // namespace impl
110
111/// @brief A handle to the retired object version, which an RCU deleter should
112/// clean up.
113/// @see rcu::DefaultRcuTraits
114template <typename T>
115class SnapshotHandle final {
116public:
117 SnapshotHandle(SnapshotHandle&& other) noexcept
118 : record_(std::exchange(other.record_, nullptr)), free_list_(std::exchange(other.free_list_, nullptr)) {}
119
120 ~SnapshotHandle() {
121 if (record_ != nullptr) {
122 UASSERT(free_list_ != nullptr);
123 record_->data.reset();
124 free_list_->list.Push(*record_);
125 }
126 }
127
128private:
129 template <typename /*T*/, typename Traits>
130 friend class Variable;
131
132 template <typename /*T*/, typename Traits>
133 friend class WritablePtr;
134
135 explicit SnapshotHandle(impl::SnapshotRecord<T>& record, impl::SnapshotRecordFreeList<T>& free_list) noexcept
136 : record_(&record), free_list_(&free_list) {}
137
138 impl::SnapshotRecord<T>* record_;
139 impl::SnapshotRecordFreeList<T>* free_list_;
140};
141
142/// @brief Destroys retired objects synchronously.
143/// @see rcu::DefaultRcuTraits
144struct SyncDeleter final {
145 template <typename T>
146 void Delete(SnapshotHandle<T>&& handle) noexcept {
147 [[maybe_unused]] const auto for_deletion = std::move(handle);
148 }
149};
150
151/// @brief Destroys retired objects asynchronously in the same `TaskProcessor`.
152/// @see rcu::DefaultRcuTraits
153class AsyncDeleter final {
154public:
155 ~AsyncDeleter() { wait_token_storage_.WaitForAllTokens(); }
156
157 template <typename T>
158 void Delete(SnapshotHandle<T>&& handle) noexcept {
159 if constexpr (std::is_trivially_destructible_v<T> || std::is_same_v<T, std::string>) {
160 SyncDeleter{}.Delete(std::move(handle));
161 } else {
162 try {
163 engine::DetachUnscopedUnsafe(engine::CriticalAsyncNoSpan(
164 // The order of captures is important, 'handle' must be destroyed before 'token'.
165 [token = wait_token_storage_.GetToken(), handle = std::move(handle)]() mutable {}
166 ));
167 // NOLINTNEXTLINE(bugprone-empty-catch)
168 } catch (...) {
169 // Task creation somehow failed.
170 // `handle` will be destroyed synchronously, because it is already moved
171 // into the task's lambda.
172 }
173 }
174 }
175
176private:
177 utils::impl::WaitTokenStorage wait_token_storage_;
178};
179
180/// @brief Default RCU traits. Deletes garbage asynchronously.
181/// Designed for storing data of multi-megabyte or multi-gigabyte caches.
182/// @note Allows reads from any kind of thread.
183/// Only allows writes from coroutine threads.
184/// @see rcu::Variable
185/// @see rcu::SyncRcuTraits
186/// @see rcu::BlockingRcuTraits
188 /// `MutexType` is a writer's mutex type that has to be used to protect
189 /// structure on update.
190 using MutexType = engine::Mutex;
191
192 /// `DeleterType` is used to delete retired objects. It should:
193 /// 1. should contain `void Delete(SnapshotHandle<T>) noexcept`;
194 /// 2. force synchronous cleanup of remaining handles on destruction.
195 using DeleterType = AsyncDeleter;
196};
197
198/// @brief Deletes garbage synchronously.
199/// Designed for storing small amounts of data with relatively fast destructors.
200/// @note Allows reads from any kind of thread.
201/// Only allows writes from coroutine threads.
202/// @see rcu::DefaultRcuTraits
204 using DeleterType = SyncDeleter;
205};
206
207/// @brief Rcu traits for using outside of coroutines.
208/// @note Allows reads from any kind of thread.
209/// Only allows writes from NON-coroutine threads.
210/// @warning Blocks writing threads which are coroutines, which can cause
211/// deadlocks and hangups.
212/// @see rcu::DefaultRcuTraits
214 using MutexType = std::mutex;
215 using DeleterType = SyncDeleter;
216};
217
218/// @brief Rcu traits that only allow a single writer.
219/// Detects race conditions when multiple writers call `Assign` concurrently.
220/// @note Allows reads and writes from any kind of thread.
221/// @see rcu::DefaultRcuTraits
223 using MutexType = impl::ExclusiveMutex;
224 using DeleterType = SyncDeleter;
225};
226
227/// Reader smart pointer for rcu::Variable<T>. You may use operator*() or
228/// operator->() to do something with the stored value. Once created,
229/// ReadablePtr references the same immutable value: if Variable's value is
230/// changed during ReadablePtr lifetime, it will not affect value referenced by
231/// ReadablePtr.
232template <typename T, typename RcuTraits>
233class [[nodiscard]] ReadablePtr final {
234public:
235 explicit ReadablePtr(const Variable<T, RcuTraits>& ptr) {
236 auto* record = ptr.current_.load();
237
238 while (true) {
239 // Lock 'record', which may or may not be 'current_' by the time we got
240 // there.
241 lock_ = record->indicator.GetLock();
242
243 // seq_cst is required for indicator.Lock in the following case.
244 //
245 // Reader thread point-of-view:
246 // 1. [reader] load current_
247 // 2. [reader] indicator.Lock
248 // 3. [reader] load current_
249 // 4. [writer] store current_
250 // 5. [writer] indicator.IsFree
251 //
252 // Given seq_cst only on (3), (4), and (5), the writer can see
253 // (2) after (5). In this case the reader will think that it has
254 // successfully taken the lock, which is false.
255 //
256 // So we need seq_cst on all of (2), (3), (4), (5). Making (2) seq_cst is
257 // somewhat expensive, but this is a well-known cost of hazard pointers.
258 //
259 // The seq_cst cost can be mitigated by utilizing asymmetric fences.
260 // This asymmetric fence effectively grants std::memory_order_seq_cst
261 // to indicator.Lock when applied together with AsymmetricThreadFenceHeavy
262 // in (5). The technique is taken from Folly HazPtr.
263 concurrent::impl::AsymmetricThreadFenceLight();
264
265 // Is the record we locked 'current_'? If so, congratulations, we are
266 // holding a lock to 'current_'.
267 auto* new_current = ptr.current_.load(std::memory_order_seq_cst);
268 if (new_current == record) {
269 break;
270 }
271
272 // 'current_' changed, try again
273 record = new_current;
274 }
275
276 ptr_ = &*record->data;
277 }
278
279 ReadablePtr(ReadablePtr&& other) noexcept = default;
280 ReadablePtr& operator=(ReadablePtr&& other) noexcept = default;
281 ReadablePtr(const ReadablePtr& other) = default;
282 ReadablePtr& operator=(const ReadablePtr& other) = default;
283 ~ReadablePtr() = default;
284
285 const T* Get() const& USERVER_IMPL_LIFETIME_BOUND {
286 UASSERT(ptr_);
287 return ptr_;
288 }
289
290 const T* Get() && { return GetOnRvalue(); }
291
292 const T* operator->() const& USERVER_IMPL_LIFETIME_BOUND { return Get(); }
293 const T* operator->() && { return GetOnRvalue(); }
294
295 const T& operator*() const& USERVER_IMPL_LIFETIME_BOUND { return *Get(); }
296 const T& operator*() && { return *GetOnRvalue(); }
297
298private:
299 const T* GetOnRvalue() {
300 static_assert(!sizeof(T), "Don't use temporary ReadablePtr, store it to a variable");
301 std::abort();
302 }
303
304 const T* ptr_;
305 concurrent::impl::StripedReadIndicatorLock lock_;
306};
307
308/// Smart pointer for rcu::Variable<T> for changing RCU value. It stores a
309/// reference to a to-be-changed value and allows one to mutate the value (e.g.
310/// add items to std::unordered_map). Changed value is not visible to readers
311/// until explicit store by Commit. Only a single writer may own a WritablePtr
312/// associated with the same Variable, so WritablePtr creates a critical
313/// section. This critical section doesn't affect readers, so a slow writer
314/// doesn't block readers.
315/// @note you may not pass WritablePtr between coroutines as it owns
316/// engine::Mutex, which must be unlocked in the same coroutine that was used to
317/// lock the mutex.
318template <typename T, typename RcuTraits>
319class [[nodiscard]] WritablePtr final {
320public:
321 /// @cond
322 // For internal use only. Use `var.StartWrite()` instead
323 explicit WritablePtr(Variable<T, RcuTraits>& var)
324 : var_(var),
325 lock_(var.mutex_),
326 record_(&var.EmplaceSnapshot(*var.current_.load()->data))
327 {}
328
329 // For internal use only. Use `var.Emplace(args...)` instead
330 template <typename... Args>
331 WritablePtr(Variable<T, RcuTraits>& var, std::in_place_t, Args&&... initial_value_args)
332 : var_(var),
333 lock_(var.mutex_),
334 record_(&var.EmplaceSnapshot(std::forward<Args>(initial_value_args)...))
335 {}
336 /// @endcond
337
338 WritablePtr(WritablePtr&& other) noexcept
339 : var_(other.var_), lock_(std::move(other.lock_)), record_(std::exchange(other.record_, nullptr)) {}
340
341 ~WritablePtr() {
342 if (record_) {
343 var_.DeleteSnapshot(*record_);
344 }
345 }
346
347 /// Store the changed value in Variable. After Commit() the value becomes
348 /// visible to new readers (IOW, Variable::Read() returns ReadablePtr
349 /// referencing the stored value, not an old value).
350 void Commit() {
351 UASSERT(record_ != nullptr);
352 var_.DoAssign(*std::exchange(record_, nullptr), lock_);
353 lock_.unlock();
354 }
355
356 T* Get() & USERVER_IMPL_LIFETIME_BOUND {
357 UASSERT(record_ != nullptr);
358 return &*record_->data;
359 }
360
361 T* Get() && { return GetOnRvalue(); }
362
363 T* operator->() & USERVER_IMPL_LIFETIME_BOUND { return Get(); }
364 T* operator->() && { return GetOnRvalue(); }
365
366 T& operator*() & USERVER_IMPL_LIFETIME_BOUND { return *Get(); }
367 T& operator*() && { return *GetOnRvalue(); }
368
369private:
370 [[noreturn]] static T* GetOnRvalue() {
371 static_assert(!sizeof(T), "Don't use temporary WritablePtr, store it to a variable");
372 std::abort();
373 }
374
375 Variable<T, RcuTraits>& var_;
376 std::unique_lock<typename RcuTraits::MutexType> lock_;
377 impl::SnapshotRecord<T>* record_;
378};
379
380/// @ingroup userver_concurrency userver_containers
381///
382/// @brief Read-Copy-Update variable
383///
384/// @see Based on ideas from
385/// http://www.drdobbs.com/lock-free-data-structures-with-hazard-po/184401890
386/// with modified API.
387///
388/// A variable with MT-access pattern "very often reads, seldom writes". It is
389/// specially optimized for reads. On read, one obtains a ReaderPtr<T> from it
390/// and uses the obtained value as long as it wants to. On write, one obtains a
391/// WritablePtr<T> with a copy of the last version of the value, makes some
392/// changes to it, and commits the result to update current variable value (does
393/// Read-Copy-Update). Old version of the value is not freed on update, it will
394/// be eventually freed when a subsequent writer identifies that nobody works
395/// with this version.
396///
397/// @note There is no way to create a "null" `Variable`.
398///
399/// ## Example usage:
400///
401/// @snippet rcu/rcu_test.cpp Sample rcu::Variable usage
402///
403/// @see @ref scripts/docs/en/userver/synchronization.md
404///
405/// @tparam T the stored value
406/// @tparam RcuTraits traits, should inherit from rcu::DefaultRcuTraits
407template <typename T, typename RcuTraits>
408class Variable final {
409 static_assert(
410 std::is_base_of_v<DefaultRcuTraits, RcuTraits>,
411 "RcuTraits should publicly inherit from rcu::DefaultRcuTraits"
412 );
413
414public:
415 using MutexType = typename RcuTraits::MutexType;
416 using DeleterType = typename RcuTraits::DeleterType;
417
418 /// @brief Create a new `Variable` with an in-place constructed initial value.
419 /// @param initial_value_args arguments passed to the constructor of the
420 /// initial value
421 template <typename... Args>
422 // TODO make explicit
423 Variable(Args&&... initial_value_args)
424 : current_(&EmplaceSnapshot(std::forward<Args>(initial_value_args)...))
425 {}
426
427 Variable(const Variable&) = delete;
428 Variable(Variable&&) = delete;
429 Variable& operator=(const Variable&) = delete;
430 Variable& operator=(Variable&&) = delete;
431
432 ~Variable() {
433 {
434 auto* record = current_.load();
435 UASSERT_MSG(record->indicator.IsFree(), "RCU variable is destroyed while being used");
436 delete record;
437 }
438
439 retired_list_.RemoveAndDisposeIf(
440 [](impl::SnapshotRecord<T>&) { return true; },
441 [](impl::SnapshotRecord<T>& record) {
442 UASSERT_MSG(record.indicator.IsFree(), "RCU variable is destroyed while being used");
443 delete &record;
444 }
445 );
446 }
447
448 /// Obtain a smart pointer which can be used to read the current value.
449 ReadablePtr<T, RcuTraits> Read() const { return ReadablePtr<T, RcuTraits>(*this); }
450
451 /// Obtain a copy of contained value.
452 T ReadCopy() const {
453 auto ptr = Read();
454 return *ptr;
455 }
456
457 /// Obtain a smart pointer that will *copy* the current value. The pointer can
458 /// be used to make changes to the value and to set the `Variable` to the
459 /// changed value.
460 WritablePtr<T, RcuTraits> StartWrite() { return WritablePtr<T, RcuTraits>(*this); }
461
462 /// Obtain a smart pointer to a newly in-place constructed value, but does
463 /// not replace the current one yet (in contrast with regular `Emplace`).
464 template <typename... Args>
465 WritablePtr<T, RcuTraits> StartWriteEmplace(Args&&... args) {
466 return WritablePtr<T, RcuTraits>(*this, std::in_place, std::forward<Args>(args)...);
467 }
468
469 /// Replaces the `Variable`'s value with the provided one.
470 void Assign(T new_value) { WritablePtr<T, RcuTraits>(*this, std::in_place, std::move(new_value)).Commit(); }
471
472 /// Replaces the `Variable`'s value with an in-place constructed one.
473 template <typename... Args>
474 void Emplace(Args&&... args) {
475 WritablePtr<T, RcuTraits>(*this, std::in_place, std::forward<Args>(args)...).Commit();
476 }
477
478 void Cleanup() {
479 std::unique_lock lock(mutex_, std::try_to_lock);
480 if (!lock.owns_lock()) {
481 // Someone is already assigning to the RCU. They will call ScanRetireList
482 // in the process.
483 return;
484 }
485 ScanRetiredList(lock);
486 }
487
488private:
489 friend class ReadablePtr<T, RcuTraits>;
490 friend class WritablePtr<T, RcuTraits>;
491
492 void DoAssign(impl::SnapshotRecord<T>& new_snapshot, std::unique_lock<MutexType>& lock) {
493 UASSERT(lock.owns_lock());
494
495 // Note: exchange RMW operation would not give any benefits here.
496 auto* const old_snapshot = current_.load();
497 current_.store(&new_snapshot, std::memory_order_seq_cst);
498
499 UASSERT(old_snapshot);
500 retired_list_.Push(*old_snapshot);
501 ScanRetiredList(lock);
502 }
503
504 template <typename... Args>
505 [[nodiscard]] impl::SnapshotRecord<T>& EmplaceSnapshot(Args&&... args) {
506 auto* const free_list_record = free_list_.list.TryPop();
507 auto& record = free_list_record ? *free_list_record : *new impl::SnapshotRecord<T>{};
508 UASSERT(!record.data);
509
510 try {
511 record.data.emplace(std::forward<Args>(args)...);
512 } catch (...) {
513 free_list_.list.Push(record);
514 throw;
515 }
516
517 return record;
518 }
519
520 void ScanRetiredList(std::unique_lock<MutexType>& lock) noexcept {
521 UASSERT(lock.owns_lock());
522 if (retired_list_.IsEmpty()) {
523 return;
524 }
525
526 concurrent::impl::AsymmetricThreadFenceHeavy();
527
528 retired_list_.RemoveAndDisposeIf(
529 [](impl::SnapshotRecord<T>& record) { return record.indicator.IsFree(); },
530 [&](impl::SnapshotRecord<T>& record) { DeleteSnapshot(record); }
531 );
532 }
533
534 void DeleteSnapshot(impl::SnapshotRecord<T>& record) noexcept {
535 static_assert(
536 noexcept(deleter_.Delete(SnapshotHandle<T>{record, free_list_})),
537 "DeleterType::Delete must be noexcept"
538 );
539 deleter_.Delete(SnapshotHandle<T>{record, free_list_});
540 }
541
542 // Covers current_ writes, free_list_.Pop, retired_list_
543 MutexType mutex_{};
544 impl::SnapshotRecordFreeList<T> free_list_;
545 impl::SnapshotRecordRetiredList<T> retired_list_;
546 // Must be placed after 'free_list_' to force sync cleanup before
547 // the destruction of free_list_.
548 DeleterType deleter_{};
549 // Must be placed after 'free_list_' and 'deleter_' so that if
550 // the initialization of current_ throws, it can be disposed properly.
551 std::atomic<impl::SnapshotRecord<T>*> current_;
552};
553
554} // namespace rcu
555
556USERVER_NAMESPACE_END