userver: userver/rcu/rcu.hpp Source File
⚠️ This is the documentation for an old userver version. Click here to switch to the latest version.
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages Concepts
rcu.hpp
Go to the documentation of this file.
1#pragma once
2
3/// @file userver/rcu/rcu.hpp
4/// @brief @copybrief rcu::Variable
5
6#include <atomic>
7#include <cstdlib>
8#include <memory>
9#include <optional>
10#include <utility>
11
12// TODO remove extra includes
13#include <list>
14#include <unordered_set>
15#include <userver/compiler/thread_local.hpp>
16
17#include <userver/concurrent/impl/asymmetric_fence.hpp>
18#include <userver/concurrent/impl/intrusive_hooks.hpp>
19#include <userver/concurrent/impl/intrusive_stack.hpp>
20#include <userver/concurrent/impl/striped_read_indicator.hpp>
21#include <userver/engine/async.hpp>
22#include <userver/engine/mutex.hpp>
23#include <userver/logging/log.hpp>
24#include <userver/rcu/fwd.hpp>
25#include <userver/utils/assert.hpp>
26#include <userver/utils/impl/wait_token_storage.hpp>
27
28USERVER_NAMESPACE_BEGIN
29
30/// @brief Read-Copy-Update
31///
32/// @see Based on ideas from
33/// http://www.drdobbs.com/lock-free-data-structures-with-hazard-po/184401890
34/// with modified API
35namespace rcu {
36
37namespace impl {
38
39template <typename T>
40struct SnapshotRecord final {
41 std::optional<T> data;
42 concurrent::impl::StripedReadIndicator indicator;
43 concurrent::impl::SinglyLinkedHook<SnapshotRecord> free_list_hook;
44 SnapshotRecord* next_retired{nullptr};
45};
46
47// Used instead of concurrent::impl::MemberHook to avoid instantiating
48// SnapshotRecord<T> ahead of time.
49template <typename T>
50struct FreeListHookGetter {
51 auto& operator()(SnapshotRecord<T>& node) const noexcept {
52 return node.free_list_hook;
53 }
54};
55
56template <typename T>
57using SnapshotRecordFreeList =
58 concurrent::impl::IntrusiveStack<SnapshotRecord<T>, FreeListHookGetter<T>>;
59
60template <typename T>
61class SnapshotRecordRetiredList final {
62 public:
63 SnapshotRecordRetiredList() = default;
64
65 bool IsEmpty() const noexcept { return head_ == nullptr; }
66
67 void Push(SnapshotRecord<T>& record) noexcept {
68 record.next_retired = head_;
69 head_ = &record;
70 }
71
72 template <typename Predicate, typename Disposer>
73 void RemoveAndDisposeIf(Predicate predicate, Disposer disposer) {
74 SnapshotRecord<T>** ptr_to_current = &head_;
75
76 while (*ptr_to_current != nullptr) {
77 SnapshotRecord<T>* const current = *ptr_to_current;
78
79 if (predicate(*current)) {
80 *ptr_to_current = std::exchange(current->next_retired, nullptr);
81 disposer(*current);
82 } else {
83 ptr_to_current = &current->next_retired;
84 }
85 }
86 }
87
88 private:
89 SnapshotRecord<T>* head_{nullptr};
90};
91
92} // namespace impl
93
94/// Default Rcu traits.
95/// - `MutexType` is a writer's mutex type that has to be used to protect
96/// structure on update
97template <typename T>
98struct DefaultRcuTraits {
99 using MutexType = engine::Mutex;
100};
101
102/// Reader smart pointer for rcu::Variable<T>. You may use operator*() or
103/// operator->() to do something with the stored value. Once created,
104/// ReadablePtr references the same immutable value: if Variable's value is
105/// changed during ReadablePtr lifetime, it will not affect value referenced by
106/// ReadablePtr.
107template <typename T, typename RcuTraits>
108class [[nodiscard]] ReadablePtr final {
109 public:
110 explicit ReadablePtr(const Variable<T, RcuTraits>& ptr) {
111 auto* record = ptr.current_.load();
112
113 while (true) {
114 // Lock 'record', which may or may not be 'current_' by the time we got
115 // there.
116 lock_ = record->indicator.Lock();
117
118 // seq_cst is required for indicator.Lock in the following case.
119 //
120 // Reader thread point-of-view:
121 // 1. [reader] load current_
122 // 2. [reader] indicator.Lock
123 // 3. [reader] load current_
124 // 4. [writer] store current_
125 // 5. [writer] indicator.IsFree
126 //
127 // Given seq_cst only on (3), (4), and (5), the writer can see
128 // (2) after (5). In this case the reader will think that it has
129 // successfully taken the lock, which is false.
130 //
131 // So we need seq_cst on all of (2), (3), (4), (5). Making (2) seq_cst is
132 // somewhat expensive, but this is a well-known cost of hazard pointers.
133 //
134 // The seq_cst cost can be mitigated by utilizing asymmetric fences.
135 // This asymmetric fence effectively grants std::memory_order_seq_cst
136 // to indicator.Lock when applied together with AsymmetricThreadFenceHeavy
137 // in (5). The technique is taken from Folly HazPtr.
138 concurrent::impl::AsymmetricThreadFenceLight();
139
140 // Is the record we locked 'current_'? If so, congratulations, we are
141 // holding a lock to 'current_'.
142 auto* new_current = ptr.current_.load(std::memory_order_seq_cst);
143 if (new_current == record) break;
144
145 // 'current_' changed, try again
146 record = new_current;
147 }
148
149 ptr_ = &*record->data;
150 }
151
152 ReadablePtr(ReadablePtr&& other) noexcept = default;
153 ReadablePtr& operator=(ReadablePtr&& other) noexcept = default;
154 ReadablePtr(const ReadablePtr& other) = default;
155 ReadablePtr& operator=(const ReadablePtr& other) = default;
156 ~ReadablePtr() = default;
157
158 const T* Get() const& {
159 UASSERT(ptr_);
160 return ptr_;
161 }
162
163 const T* Get() && { return GetOnRvalue(); }
164
165 const T* operator->() const& { return Get(); }
166 const T* operator->() && { return GetOnRvalue(); }
167
168 const T& operator*() const& { return *Get(); }
169 const T& operator*() && { return *GetOnRvalue(); }
170
171 private:
172 const T* GetOnRvalue() {
173 static_assert(!sizeof(T),
174 "Don't use temporary ReadablePtr, store it to a variable");
175 std::abort();
176 }
177
178 const T* ptr_;
179 concurrent::impl::StripedReadIndicatorLock lock_;
180};
181
182/// Smart pointer for rcu::Variable<T> for changing RCU value. It stores a
183/// reference to a to-be-changed value and allows one to mutate the value (e.g.
184/// add items to std::unordered_map). Changed value is not visible to readers
185/// until explicit store by Commit. Only a single writer may own a WritablePtr
186/// associated with the same Variable, so WritablePtr creates a critical
187/// section. This critical section doesn't affect readers, so a slow writer
188/// doesn't block readers.
189/// @note you may not pass WritablePtr between coroutines as it owns
190/// engine::Mutex, which must be unlocked in the same coroutine that was used to
191/// lock the mutex.
192template <typename T, typename RcuTraits>
193class [[nodiscard]] WritablePtr final {
194 public:
195 /// @cond
196 // For internal use only. Use `var.StartWrite()` instead
197 explicit WritablePtr(Variable<T, RcuTraits>& var)
198 : var_(var),
199 lock_(var.mutex_),
200 record_(&var.EmplaceSnapshot(*var.current_.load()->data)) {}
201
202 // For internal use only. Use `var.Emplace(args...)` instead
203 template <typename... Args>
204 WritablePtr(Variable<T, RcuTraits>& var, std::in_place_t,
205 Args&&... initial_value_args)
206 : var_(var),
207 lock_(var.mutex_),
208 record_(
209 &var.EmplaceSnapshot(std::forward<Args>(initial_value_args)...)) {}
210 /// @endcond
211
212 WritablePtr(WritablePtr&& other) noexcept
213 : var_(other.var_),
214 lock_(std::move(other.lock_)),
215 record_(std::exchange(other.record_, nullptr)) {}
216
217 ~WritablePtr() {
218 if (record_) {
219 // TODO should DeleteSnapshot instead?
220 var_.DeleteSnapshotSync(*record_);
221 }
222 }
223
224 /// Store the changed value in Variable. After Commit() the value becomes
225 /// visible to new readers (IOW, Variable::Read() returns ReadablePtr
226 /// referencing the stored value, not an old value).
227 void Commit() {
228 UASSERT(record_ != nullptr);
229 var_.DoAssign(*std::exchange(record_, nullptr), lock_);
230 lock_.unlock();
231 }
232
233 T* Get() & {
234 UASSERT(record_ != nullptr);
235 return &*record_->data;
236 }
237
238 T* Get() && { return GetOnRvalue(); }
239
240 T* operator->() & { return Get(); }
241 T* operator->() && { return GetOnRvalue(); }
242
243 T& operator*() & { return *Get(); }
244 T& operator*() && { return *GetOnRvalue(); }
245
246 private:
247 [[noreturn]] static T* GetOnRvalue() {
248 static_assert(!sizeof(T),
249 "Don't use temporary WritablePtr, store it to a variable");
250 std::abort();
251 }
252
253 Variable<T, RcuTraits>& var_;
254 std::unique_lock<typename RcuTraits::MutexType> lock_;
255 impl::SnapshotRecord<T>* record_;
256};
257
258/// @brief Can be passed to `rcu::Variable` as the first argument to customize
259/// whether old values should be destroyed asynchronously.
260enum class DestructionType { kSync, kAsync };
261
262/// @ingroup userver_concurrency userver_containers
263///
264/// @brief Read-Copy-Update variable
265///
266/// @see Based on ideas from
267/// http://www.drdobbs.com/lock-free-data-structures-with-hazard-po/184401890
268/// with modified API.
269///
270/// A variable with MT-access pattern "very often reads, seldom writes". It is
271/// specially optimized for reads. On read, one obtains a ReaderPtr<T> from it
272/// and uses the obtained value as long as it wants to. On write, one obtains a
273/// WritablePtr<T> with a copy of the last version of the value, makes some
274/// changes to it, and commits the result to update current variable value (does
275/// Read-Copy-Update). Old version of the value is not freed on update, it will
276/// be eventually freed when a subsequent writer identifies that nobody works
277/// with this version.
278///
279/// @note There is no way to create a "null" `Variable`.
280///
281/// ## Example usage:
282///
283/// @snippet rcu/rcu_test.cpp Sample rcu::Variable usage
284///
285/// @see @ref scripts/docs/en/userver/synchronization.md
286template <typename T, typename RcuTraits>
287class Variable final {
288 public:
289 using MutexType = typename RcuTraits::MutexType;
290
291 /// Create a new `Variable` with an in-place constructed initial value.
292 /// Asynchronous destruction is enabled by default.
293 /// @param initial_value_args arguments passed to the constructor of the
294 /// initial value
295 template <typename... Args>
296 // TODO make explicit
297 Variable(Args&&... initial_value_args)
298 : destruction_type_(std::is_trivially_destructible_v<T> ||
299 std::is_same_v<T, std::string> ||
300 !std::is_same_v<MutexType, engine::Mutex>
301 ? DestructionType::kSync
302 : DestructionType::kAsync),
303 current_(&EmplaceSnapshot(std::forward<Args>(initial_value_args)...)) {}
304
305 /// Create a new `Variable` with an in-place constructed initial value.
306 /// @param destruction_type controls whether destruction of old values should
307 /// be performed asynchronously
308 /// @param initial_value_args arguments passed to the constructor of the
309 /// initial value
310 template <typename... Args>
311 explicit Variable(DestructionType destruction_type,
312 Args&&... initial_value_args)
313 : destruction_type_(destruction_type),
314 current_(&EmplaceSnapshot(std::forward<Args>(initial_value_args)...)) {}
315
316 Variable(const Variable&) = delete;
317 Variable(Variable&&) = delete;
318 Variable& operator=(const Variable&) = delete;
319 Variable& operator=(Variable&&) = delete;
320
321 ~Variable() {
322 {
323 auto* record = current_.load();
324 UASSERT_MSG(record->indicator.IsFree(),
325 "RCU variable is destroyed while being used");
326 delete record;
327 }
328
329 retired_list_.RemoveAndDisposeIf(
330 [](impl::SnapshotRecord<T>&) { return true; },
331 [](impl::SnapshotRecord<T>& record) {
332 UASSERT_MSG(record.indicator.IsFree(),
333 "RCU variable is destroyed while being used");
334 delete &record;
335 });
336
337 if (destruction_type_ == DestructionType::kAsync) {
338 wait_token_storage_.WaitForAllTokens();
339 }
340
341 free_list_.DisposeUnsafe(
342 [](impl::SnapshotRecord<T>& record) { delete &record; });
343 }
344
345 /// Obtain a smart pointer which can be used to read the current value.
346 ReadablePtr<T, RcuTraits> Read() const {
347 return ReadablePtr<T, RcuTraits>(*this);
348 }
349
350 /// Obtain a copy of contained value.
351 T ReadCopy() const {
352 auto ptr = Read();
353 return *ptr;
354 }
355
356 /// Obtain a smart pointer that will *copy* the current value. The pointer can
357 /// be used to make changes to the value and to set the `Variable` to the
358 /// changed value.
359 WritablePtr<T, RcuTraits> StartWrite() {
360 return WritablePtr<T, RcuTraits>(*this);
361 }
362
363 /// Obtain a smart pointer to a newly in-place constructed value, but does
364 /// not replace the current one yet (in contrast with regular `Emplace`).
365 template <typename... Args>
366 WritablePtr<T, RcuTraits> StartWriteEmplace(Args&&... args) {
367 return WritablePtr<T, RcuTraits>(*this, std::in_place,
368 std::forward<Args>(args)...);
369 }
370
371 /// Replaces the `Variable`'s value with the provided one.
372 void Assign(T new_value) {
373 WritablePtr<T, RcuTraits>(*this, std::in_place, std::move(new_value))
374 .Commit();
375 }
376
377 /// Replaces the `Variable`'s value with an in-place constructed one.
378 template <typename... Args>
379 void Emplace(Args&&... args) {
380 WritablePtr<T, RcuTraits>(*this, std::in_place, std::forward<Args>(args)...)
381 .Commit();
382 }
383
384 void Cleanup() {
385 std::unique_lock lock(mutex_, std::try_to_lock);
386 if (!lock.owns_lock()) {
387 // Someone is already assigning to the RCU. They will call ScanRetireList
388 // in the process.
389 return;
390 }
391 ScanRetiredList(lock);
392 }
393
394 private:
395 friend class ReadablePtr<T, RcuTraits>;
396 friend class WritablePtr<T, RcuTraits>;
397
398 void DoAssign(impl::SnapshotRecord<T>& new_snapshot,
399 std::unique_lock<MutexType>& lock) {
400 UASSERT(lock.owns_lock());
401
402 // Note: exchange RMW operation would not give any benefits here.
403 auto* const old_snapshot = current_.load();
404 current_.store(&new_snapshot, std::memory_order_seq_cst);
405
406 UASSERT(old_snapshot);
407 retired_list_.Push(*old_snapshot);
408 ScanRetiredList(lock);
409 }
410
411 template <typename... Args>
412 [[nodiscard]] impl::SnapshotRecord<T>& EmplaceSnapshot(Args&&... args) {
413 auto* const free_list_record = free_list_.TryPop();
414 auto& record =
415 free_list_record ? *free_list_record : *new impl::SnapshotRecord<T>{};
416 UASSERT(!record.data);
417
418 try {
419 record.data.emplace(std::forward<Args>(args)...);
420 } catch (...) {
421 if (free_list_record) {
422 free_list_.Push(record);
423 } else {
424 // Important to delete this way in the rcu::Variable's constructor,
425 // otherwise we'd have to clean up free_list_ there.
426 delete &record;
427 }
428 throw;
429 }
430
431 return record;
432 }
433
434 void ScanRetiredList(std::unique_lock<MutexType>& lock) noexcept {
435 UASSERT(lock.owns_lock());
436 if (retired_list_.IsEmpty()) return;
437
438 concurrent::impl::AsymmetricThreadFenceHeavy();
439
440 retired_list_.RemoveAndDisposeIf(
441 [](impl::SnapshotRecord<T>& record) {
442 return record.indicator.IsFree();
443 },
444 [&](impl::SnapshotRecord<T>& record) { DeleteSnapshot(record); });
445 }
446
447 void DeleteSnapshot(impl::SnapshotRecord<T>& record) {
448 switch (destruction_type_) {
449 case DestructionType::kSync:
450 DeleteSnapshotSync(record);
451 break;
452 case DestructionType::kAsync:
453 engine::CriticalAsyncNoSpan([this, &record,
454 token = wait_token_storage_.GetToken()] {
455 DeleteSnapshotSync(record);
456 }).Detach();
457 break;
458 }
459 }
460
461 void DeleteSnapshotSync(impl::SnapshotRecord<T>& record) noexcept {
462 record.data.reset();
463 free_list_.Push(record);
464 }
465
466 const DestructionType destruction_type_;
467 // Covers current_ writes, free_list_.Pop, retired_list_
468 MutexType mutex_;
469 impl::SnapshotRecordFreeList<T> free_list_;
470 impl::SnapshotRecordRetiredList<T> retired_list_;
471 std::atomic<impl::SnapshotRecord<T>*> current_;
472 utils::impl::WaitTokenStorage wait_token_storage_;
473};
474
475} // namespace rcu
476
477USERVER_NAMESPACE_END