userver: userver/rcu/rcu.hpp Source File
Loading...
Searching...
No Matches
rcu.hpp
Go to the documentation of this file.
1#pragma once
2
3/// @file userver/rcu/rcu.hpp
4/// @brief @copybrief rcu::Variable
5
6#include <atomic>
7#include <cstdlib>
8#include <optional>
9#include <utility>
10
11#include <userver/concurrent/impl/asymmetric_fence.hpp>
12#include <userver/concurrent/impl/intrusive_hooks.hpp>
13#include <userver/concurrent/impl/intrusive_stack.hpp>
14#include <userver/concurrent/impl/striped_read_indicator.hpp>
15#include <userver/engine/async.hpp>
16#include <userver/engine/mutex.hpp>
17#include <userver/rcu/fwd.hpp>
18#include <userver/utils/assert.hpp>
19#include <userver/utils/impl/wait_token_storage.hpp>
20
21USERVER_NAMESPACE_BEGIN
22
23/// @brief Read-Copy-Update
24///
25/// @see Based on ideas from
26/// http://www.drdobbs.com/lock-free-data-structures-with-hazard-po/184401890
27/// with modified API
28namespace rcu {
29
30namespace impl {
31
32template <typename T>
33struct SnapshotRecord final {
34 std::optional<T> data;
35 concurrent::impl::StripedReadIndicator indicator;
36 concurrent::impl::SinglyLinkedHook<SnapshotRecord> free_list_hook;
37 SnapshotRecord* next_retired{nullptr};
38};
39
40// Used instead of concurrent::impl::MemberHook to avoid instantiating
41// SnapshotRecord<T> ahead of time.
42template <typename T>
43struct FreeListHookGetter {
44 auto& operator()(SnapshotRecord<T>& node) const noexcept {
45 return node.free_list_hook;
46 }
47};
48
49template <typename T>
50using SnapshotRecordFreeList =
51 concurrent::impl::IntrusiveStack<SnapshotRecord<T>, FreeListHookGetter<T>>;
52
53template <typename T>
54class SnapshotRecordRetiredList final {
55 public:
56 SnapshotRecordRetiredList() = default;
57
58 bool IsEmpty() const noexcept { return head_ == nullptr; }
59
60 void Push(SnapshotRecord<T>& record) noexcept {
61 record.next_retired = head_;
62 head_ = &record;
63 }
64
65 template <typename Predicate, typename Disposer>
66 void RemoveAndDisposeIf(Predicate predicate, Disposer disposer) {
67 SnapshotRecord<T>** ptr_to_current = &head_;
68
69 while (*ptr_to_current != nullptr) {
70 SnapshotRecord<T>* const current = *ptr_to_current;
71
72 if (predicate(*current)) {
73 *ptr_to_current = std::exchange(current->next_retired, nullptr);
74 disposer(*current);
75 } else {
76 ptr_to_current = &current->next_retired;
77 }
78 }
79 }
80
81 private:
82 SnapshotRecord<T>* head_{nullptr};
83};
84
85} // namespace impl
86
87/// Default Rcu traits.
88/// - `MutexType` is a writer's mutex type that has to be used to protect
89/// structure on update
90template <typename T>
91struct DefaultRcuTraits {
92 using MutexType = engine::Mutex;
93};
94
95/// Reader smart pointer for rcu::Variable<T>. You may use operator*() or
96/// operator->() to do something with the stored value. Once created,
97/// ReadablePtr references the same immutable value: if Variable's value is
98/// changed during ReadablePtr lifetime, it will not affect value referenced by
99/// ReadablePtr.
100template <typename T, typename RcuTraits>
101class [[nodiscard]] ReadablePtr final {
102 public:
103 explicit ReadablePtr(const Variable<T, RcuTraits>& ptr) {
104 auto* record = ptr.current_.load();
105
106 while (true) {
107 // Lock 'record', which may or may not be 'current_' by the time we got
108 // there.
109 lock_ = record->indicator.Lock();
110
111 // seq_cst is required for indicator.Lock in the following case.
112 //
113 // Reader thread point-of-view:
114 // 1. [reader] load current_
115 // 2. [reader] indicator.Lock
116 // 3. [reader] load current_
117 // 4. [writer] store current_
118 // 5. [writer] indicator.IsFree
119 //
120 // Given seq_cst only on (3), (4), and (5), the writer can see
121 // (2) after (5). In this case the reader will think that it has
122 // successfully taken the lock, which is false.
123 //
124 // So we need seq_cst on all of (2), (3), (4), (5). Making (2) seq_cst is
125 // somewhat expensive, but this is a well-known cost of hazard pointers.
126 //
127 // The seq_cst cost can be mitigated by utilizing asymmetric fences.
128 // This asymmetric fence effectively grants std::memory_order_seq_cst
129 // to indicator.Lock when applied together with AsymmetricThreadFenceHeavy
130 // in (5). The technique is taken from Folly HazPtr.
131 concurrent::impl::AsymmetricThreadFenceLight();
132
133 // Is the record we locked 'current_'? If so, congratulations, we are
134 // holding a lock to 'current_'.
135 auto* new_current = ptr.current_.load(std::memory_order_seq_cst);
136 if (new_current == record) break;
137
138 // 'current_' changed, try again
139 record = new_current;
140 }
141
142 ptr_ = &*record->data;
143 }
144
145 ReadablePtr(ReadablePtr&& other) noexcept = default;
146 ReadablePtr& operator=(ReadablePtr&& other) noexcept = default;
147 ReadablePtr(const ReadablePtr& other) = default;
148 ReadablePtr& operator=(const ReadablePtr& other) = default;
149 ~ReadablePtr() = default;
150
151 const T* Get() const& {
152 UASSERT(ptr_);
153 return ptr_;
154 }
155
156 const T* Get() && { return GetOnRvalue(); }
157
158 const T* operator->() const& { return Get(); }
159 const T* operator->() && { return GetOnRvalue(); }
160
161 const T& operator*() const& { return *Get(); }
162 const T& operator*() && { return *GetOnRvalue(); }
163
164 private:
165 const T* GetOnRvalue() {
166 static_assert(!sizeof(T),
167 "Don't use temporary ReadablePtr, store it to a variable");
168 std::abort();
169 }
170
171 const T* ptr_;
172 concurrent::impl::StripedReadIndicatorLock lock_;
173};
174
175/// Smart pointer for rcu::Variable<T> for changing RCU value. It stores a
176/// reference to a to-be-changed value and allows one to mutate the value (e.g.
177/// add items to std::unordered_map). Changed value is not visible to readers
178/// until explicit store by Commit. Only a single writer may own a WritablePtr
179/// associated with the same Variable, so WritablePtr creates a critical
180/// section. This critical section doesn't affect readers, so a slow writer
181/// doesn't block readers.
182/// @note you may not pass WritablePtr between coroutines as it owns
183/// engine::Mutex, which must be unlocked in the same coroutine that was used to
184/// lock the mutex.
185template <typename T, typename RcuTraits>
186class [[nodiscard]] WritablePtr final {
187 public:
188 /// @cond
189 // For internal use only. Use `var.StartWrite()` instead
190 explicit WritablePtr(Variable<T, RcuTraits>& var)
191 : var_(var),
192 lock_(var.mutex_),
193 record_(&var.EmplaceSnapshot(*var.current_.load()->data)) {}
194
195 // For internal use only. Use `var.Emplace(args...)` instead
196 template <typename... Args>
197 WritablePtr(Variable<T, RcuTraits>& var, std::in_place_t,
198 Args&&... initial_value_args)
199 : var_(var),
200 lock_(var.mutex_),
201 record_(
202 &var.EmplaceSnapshot(std::forward<Args>(initial_value_args)...)) {}
203 /// @endcond
204
205 WritablePtr(WritablePtr&& other) noexcept
206 : var_(other.var_),
207 lock_(std::move(other.lock_)),
208 record_(std::exchange(other.record_, nullptr)) {}
209
210 ~WritablePtr() {
211 if (record_) {
212 // TODO should DeleteSnapshot instead?
213 var_.DeleteSnapshotSync(*record_);
214 }
215 }
216
217 /// Store the changed value in Variable. After Commit() the value becomes
218 /// visible to new readers (IOW, Variable::Read() returns ReadablePtr
219 /// referencing the stored value, not an old value).
220 void Commit() {
221 UASSERT(record_ != nullptr);
222 var_.DoAssign(*std::exchange(record_, nullptr), lock_);
223 lock_.unlock();
224 }
225
226 T* Get() & {
227 UASSERT(record_ != nullptr);
228 return &*record_->data;
229 }
230
231 T* Get() && { return GetOnRvalue(); }
232
233 T* operator->() & { return Get(); }
234 T* operator->() && { return GetOnRvalue(); }
235
236 T& operator*() & { return *Get(); }
237 T& operator*() && { return *GetOnRvalue(); }
238
239 private:
240 [[noreturn]] static T* GetOnRvalue() {
241 static_assert(!sizeof(T),
242 "Don't use temporary WritablePtr, store it to a variable");
243 std::abort();
244 }
245
246 Variable<T, RcuTraits>& var_;
247 std::unique_lock<typename RcuTraits::MutexType> lock_;
248 impl::SnapshotRecord<T>* record_;
249};
250
251/// @brief Can be passed to `rcu::Variable` as the first argument to customize
252/// whether old values should be destroyed asynchronously.
253enum class DestructionType { kSync, kAsync };
254
255/// @ingroup userver_concurrency userver_containers
256///
257/// @brief Read-Copy-Update variable
258///
259/// @see Based on ideas from
260/// http://www.drdobbs.com/lock-free-data-structures-with-hazard-po/184401890
261/// with modified API.
262///
263/// A variable with MT-access pattern "very often reads, seldom writes". It is
264/// specially optimized for reads. On read, one obtains a ReaderPtr<T> from it
265/// and uses the obtained value as long as it wants to. On write, one obtains a
266/// WritablePtr<T> with a copy of the last version of the value, makes some
267/// changes to it, and commits the result to update current variable value (does
268/// Read-Copy-Update). Old version of the value is not freed on update, it will
269/// be eventually freed when a subsequent writer identifies that nobody works
270/// with this version.
271///
272/// @note There is no way to create a "null" `Variable`.
273///
274/// ## Example usage:
275///
276/// @snippet rcu/rcu_test.cpp Sample rcu::Variable usage
277///
278/// @see @ref scripts/docs/en/userver/synchronization.md
279template <typename T, typename RcuTraits>
280class Variable final {
281 public:
282 using MutexType = typename RcuTraits::MutexType;
283
284 /// Create a new `Variable` with an in-place constructed initial value.
285 /// Asynchronous destruction is enabled by default.
286 /// @param initial_value_args arguments passed to the constructor of the
287 /// initial value
288 template <typename... Args>
289 // TODO make explicit
290 Variable(Args&&... initial_value_args)
291 : destruction_type_(std::is_trivially_destructible_v<T> ||
292 std::is_same_v<T, std::string> ||
293 !std::is_same_v<MutexType, engine::Mutex>
294 ? DestructionType::kSync
295 : DestructionType::kAsync),
296 current_(&EmplaceSnapshot(std::forward<Args>(initial_value_args)...)) {}
297
298 /// Create a new `Variable` with an in-place constructed initial value.
299 /// @param destruction_type controls whether destruction of old values should
300 /// be performed asynchronously
301 /// @param initial_value_args arguments passed to the constructor of the
302 /// initial value
303 template <typename... Args>
304 explicit Variable(DestructionType destruction_type,
305 Args&&... initial_value_args)
306 : destruction_type_(destruction_type),
307 current_(&EmplaceSnapshot(std::forward<Args>(initial_value_args)...)) {}
308
309 Variable(const Variable&) = delete;
310 Variable(Variable&&) = delete;
311 Variable& operator=(const Variable&) = delete;
312 Variable& operator=(Variable&&) = delete;
313
314 ~Variable() {
315 {
316 auto* record = current_.load();
317 UASSERT_MSG(record->indicator.IsFree(),
318 "RCU variable is destroyed while being used");
319 delete record;
320 }
321
322 retired_list_.RemoveAndDisposeIf(
323 [](impl::SnapshotRecord<T>&) { return true; },
324 [](impl::SnapshotRecord<T>& record) {
325 UASSERT_MSG(record.indicator.IsFree(),
326 "RCU variable is destroyed while being used");
327 delete &record;
328 });
329
330 if (destruction_type_ == DestructionType::kAsync) {
331 wait_token_storage_.WaitForAllTokens();
332 }
333
334 free_list_.DisposeUnsafe(
335 [](impl::SnapshotRecord<T>& record) { delete &record; });
336 }
337
338 /// Obtain a smart pointer which can be used to read the current value.
339 ReadablePtr<T, RcuTraits> Read() const {
340 return ReadablePtr<T, RcuTraits>(*this);
341 }
342
343 /// Obtain a copy of contained value.
344 T ReadCopy() const {
345 auto ptr = Read();
346 return *ptr;
347 }
348
349 /// Obtain a smart pointer that will *copy* the current value. The pointer can
350 /// be used to make changes to the value and to set the `Variable` to the
351 /// changed value.
352 WritablePtr<T, RcuTraits> StartWrite() {
353 return WritablePtr<T, RcuTraits>(*this);
354 }
355
356 /// Obtain a smart pointer to a newly in-place constructed value, but does
357 /// not replace the current one yet (in contrast with regular `Emplace`).
358 template <typename... Args>
359 WritablePtr<T, RcuTraits> StartWriteEmplace(Args&&... args) {
360 return WritablePtr<T, RcuTraits>(*this, std::in_place,
361 std::forward<Args>(args)...);
362 }
363
364 /// Replaces the `Variable`'s value with the provided one.
365 void Assign(T new_value) {
366 WritablePtr<T, RcuTraits>(*this, std::in_place, std::move(new_value))
367 .Commit();
368 }
369
370 /// Replaces the `Variable`'s value with an in-place constructed one.
371 template <typename... Args>
372 void Emplace(Args&&... args) {
373 WritablePtr<T, RcuTraits>(*this, std::in_place, std::forward<Args>(args)...)
374 .Commit();
375 }
376
377 void Cleanup() {
378 std::unique_lock lock(mutex_, std::try_to_lock);
379 if (!lock.owns_lock()) {
380 // Someone is already assigning to the RCU. They will call ScanRetireList
381 // in the process.
382 return;
383 }
384 ScanRetiredList(lock);
385 }
386
387 private:
388 friend class ReadablePtr<T, RcuTraits>;
389 friend class WritablePtr<T, RcuTraits>;
390
391 void DoAssign(impl::SnapshotRecord<T>& new_snapshot,
392 std::unique_lock<MutexType>& lock) {
393 UASSERT(lock.owns_lock());
394
395 // Note: exchange RMW operation would not give any benefits here.
396 auto* const old_snapshot = current_.load();
397 current_.store(&new_snapshot, std::memory_order_seq_cst);
398
399 UASSERT(old_snapshot);
400 retired_list_.Push(*old_snapshot);
401 ScanRetiredList(lock);
402 }
403
404 template <typename... Args>
405 [[nodiscard]] impl::SnapshotRecord<T>& EmplaceSnapshot(Args&&... args) {
406 auto* const free_list_record = free_list_.TryPop();
407 auto& record =
408 free_list_record ? *free_list_record : *new impl::SnapshotRecord<T>{};
409 UASSERT(!record.data);
410
411 try {
412 record.data.emplace(std::forward<Args>(args)...);
413 } catch (...) {
414 if (free_list_record) {
415 free_list_.Push(record);
416 } else {
417 // Important to delete this way in the rcu::Variable's constructor,
418 // otherwise we'd have to clean up free_list_ there.
419 delete &record;
420 }
421 throw;
422 }
423
424 return record;
425 }
426
427 void ScanRetiredList(std::unique_lock<MutexType>& lock) noexcept {
428 UASSERT(lock.owns_lock());
429 if (retired_list_.IsEmpty()) return;
430
431 concurrent::impl::AsymmetricThreadFenceHeavy();
432
433 retired_list_.RemoveAndDisposeIf(
434 [](impl::SnapshotRecord<T>& record) {
435 return record.indicator.IsFree();
436 },
437 [&](impl::SnapshotRecord<T>& record) { DeleteSnapshot(record); });
438 }
439
440 void DeleteSnapshot(impl::SnapshotRecord<T>& record) {
441 switch (destruction_type_) {
442 case DestructionType::kSync:
443 DeleteSnapshotSync(record);
444 break;
445 case DestructionType::kAsync:
446 engine::CriticalAsyncNoSpan([this, &record,
447 token = wait_token_storage_.GetToken()] {
448 DeleteSnapshotSync(record);
449 }).Detach();
450 break;
451 }
452 }
453
454 void DeleteSnapshotSync(impl::SnapshotRecord<T>& record) noexcept {
455 record.data.reset();
456 free_list_.Push(record);
457 }
458
459 const DestructionType destruction_type_;
460 // Covers current_ writes, free_list_.Pop, retired_list_
461 MutexType mutex_;
462 impl::SnapshotRecordFreeList<T> free_list_;
463 impl::SnapshotRecordRetiredList<T> retired_list_;
464 std::atomic<impl::SnapshotRecord<T>*> current_;
465 utils::impl::WaitTokenStorage wait_token_storage_;
466};
467
468} // namespace rcu
469
470USERVER_NAMESPACE_END