14#include <unordered_set>
15#include <userver/compiler/thread_local.hpp>
17#include <userver/concurrent/impl/asymmetric_fence.hpp>
18#include <userver/concurrent/impl/intrusive_hooks.hpp>
19#include <userver/concurrent/impl/intrusive_stack.hpp>
20#include <userver/concurrent/impl/striped_read_indicator.hpp>
21#include <userver/engine/async.hpp>
22#include <userver/engine/mutex.hpp>
23#include <userver/logging/log.hpp>
24#include <userver/rcu/fwd.hpp>
25#include <userver/utils/assert.hpp>
26#include <userver/utils/impl/wait_token_storage.hpp>
28USERVER_NAMESPACE_BEGIN
40struct SnapshotRecord final {
41 std::optional<T> data;
42 concurrent::impl::StripedReadIndicator indicator;
43 concurrent::impl::SinglyLinkedHook<SnapshotRecord> free_list_hook;
44 SnapshotRecord* next_retired{
nullptr};
50struct FreeListHookGetter {
51 auto& operator()(SnapshotRecord<T>& node)
const noexcept {
52 return node.free_list_hook;
57using SnapshotRecordFreeList =
58 concurrent::impl::IntrusiveStack<SnapshotRecord<T>, FreeListHookGetter<T>>;
61class SnapshotRecordRetiredList final {
63 SnapshotRecordRetiredList() =
default;
65 bool IsEmpty()
const noexcept {
return head_ ==
nullptr; }
67 void Push(SnapshotRecord<T>& record)
noexcept {
68 record.next_retired = head_;
72 template <
typename Predicate,
typename Disposer>
73 void RemoveAndDisposeIf(Predicate predicate, Disposer disposer) {
74 SnapshotRecord<T>** ptr_to_current = &head_;
76 while (*ptr_to_current !=
nullptr) {
77 SnapshotRecord<T>*
const current = *ptr_to_current;
79 if (predicate(*current)) {
80 *ptr_to_current = std::exchange(current->next_retired,
nullptr);
83 ptr_to_current = ¤t->next_retired;
89 SnapshotRecord<T>* head_{
nullptr};
98struct DefaultRcuTraits {
99 using MutexType = engine::Mutex;
107template <
typename T,
typename RcuTraits>
108class [[nodiscard]] ReadablePtr final {
110 explicit ReadablePtr(
const Variable<T, RcuTraits>& ptr) {
111 auto* record = ptr.current_.load();
116 lock_ = record->indicator.Lock();
138 concurrent::impl::AsymmetricThreadFenceLight();
142 auto* new_current = ptr.current_.load(std::memory_order_seq_cst);
143 if (new_current == record)
break;
146 record = new_current;
149 ptr_ = &*record->data;
152 ReadablePtr(ReadablePtr&& other)
noexcept =
default;
153 ReadablePtr& operator=(ReadablePtr&& other)
noexcept =
default;
154 ReadablePtr(
const ReadablePtr& other) =
default;
155 ReadablePtr& operator=(
const ReadablePtr& other) =
default;
156 ~ReadablePtr() =
default;
158 const T* Get()
const& {
163 const T* Get() && {
return GetOnRvalue(); }
165 const T* operator->()
const& {
return Get(); }
166 const T* operator->() && {
return GetOnRvalue(); }
168 const T& operator*()
const& {
return *Get(); }
169 const T& operator*() && {
return *GetOnRvalue(); }
172 const T* GetOnRvalue() {
173 static_assert(!
sizeof(T),
174 "Don't use temporary ReadablePtr, store it to a variable");
179 concurrent::impl::StripedReadIndicatorLock lock_;
192template <
typename T,
typename RcuTraits>
193class [[nodiscard]] WritablePtr final {
197 explicit WritablePtr(Variable<T, RcuTraits>& var)
200 record_(&var.EmplaceSnapshot(*var.current_.load()->data)) {}
203 template <
typename... Args>
204 WritablePtr(Variable<T, RcuTraits>& var, std::in_place_t,
205 Args&&... initial_value_args)
209 &var.EmplaceSnapshot(std::forward<Args>(initial_value_args)...)) {}
212 WritablePtr(WritablePtr&& other)
noexcept
214 lock_(std::move(other.lock_)),
215 record_(std::exchange(other.record_,
nullptr)) {}
220 var_.DeleteSnapshotSync(*record_);
229 var_.DoAssign(*std::exchange(record_,
nullptr), lock_);
235 return &*record_->data;
238 T* Get() && {
return GetOnRvalue(); }
240 T* operator->() & {
return Get(); }
241 T* operator->() && {
return GetOnRvalue(); }
243 T& operator*() & {
return *Get(); }
244 T& operator*() && {
return *GetOnRvalue(); }
247 [[noreturn]]
static T* GetOnRvalue() {
248 static_assert(!
sizeof(T),
249 "Don't use temporary WritablePtr, store it to a variable");
253 Variable<T, RcuTraits>& var_;
254 std::unique_lock<
typename RcuTraits::MutexType> lock_;
255 impl::SnapshotRecord<T>* record_;
286template <
typename T,
typename RcuTraits>
287class Variable final {
289 using MutexType =
typename RcuTraits::MutexType;
295 template <
typename... Args>
298 : destruction_type_(std::is_trivially_destructible_v<T> ||
299 std::is_same_v<T, std::string> ||
300 !std::is_same_v<MutexType, engine::Mutex>
303 current_(&EmplaceSnapshot(std::forward<Args>(initial_value_args)...)) {}
310 template <
typename... Args>
312 Args&&... initial_value_args)
313 : destruction_type_(destruction_type),
314 current_(&EmplaceSnapshot(std::forward<Args>(initial_value_args)...)) {}
316 Variable(
const Variable&) =
delete;
317 Variable(Variable&&) =
delete;
318 Variable& operator=(
const Variable&) =
delete;
319 Variable& operator=(Variable&&) =
delete;
323 auto* record = current_.load();
325 "RCU variable is destroyed while being used");
329 retired_list_.RemoveAndDisposeIf(
330 [](impl::SnapshotRecord<T>&) {
return true; },
331 [](impl::SnapshotRecord<T>& record) {
333 "RCU variable is destroyed while being used");
338 wait_token_storage_.WaitForAllTokens();
341 free_list_.DisposeUnsafe(
342 [](impl::SnapshotRecord<T>& record) {
delete &record; });
346 ReadablePtr<T, RcuTraits>
Read()
const {
347 return ReadablePtr<T, RcuTraits>(*
this);
360 return WritablePtr<T, RcuTraits>(*
this);
365 template <
typename... Args>
367 return WritablePtr<T, RcuTraits>(*
this, std::in_place,
368 std::forward<Args>(args)...);
373 WritablePtr<T, RcuTraits>(*
this, std::in_place, std::move(new_value))
378 template <
typename... Args>
380 WritablePtr<T, RcuTraits>(*
this, std::in_place, std::forward<Args>(args)...)
385 std::unique_lock lock(mutex_, std::try_to_lock);
386 if (!lock.owns_lock()) {
391 ScanRetiredList(lock);
395 friend class ReadablePtr<T, RcuTraits>;
396 friend class WritablePtr<T, RcuTraits>;
398 void DoAssign(impl::SnapshotRecord<T>& new_snapshot,
399 std::unique_lock<MutexType>& lock) {
403 auto*
const old_snapshot = current_.load();
404 current_.store(&new_snapshot, std::memory_order_seq_cst);
407 retired_list_.Push(*old_snapshot);
408 ScanRetiredList(lock);
411 template <
typename... Args>
412 [[nodiscard]] impl::SnapshotRecord<T>& EmplaceSnapshot(Args&&... args) {
413 auto*
const free_list_record = free_list_.TryPop();
415 free_list_record ? *free_list_record : *
new impl::SnapshotRecord<T>{};
419 record.data.emplace(std::forward<Args>(args)...);
421 if (free_list_record) {
422 free_list_.Push(record);
434 void ScanRetiredList(std::unique_lock<MutexType>& lock)
noexcept {
436 if (retired_list_.IsEmpty())
return;
438 concurrent::impl::AsymmetricThreadFenceHeavy();
440 retired_list_.RemoveAndDisposeIf(
441 [](impl::SnapshotRecord<T>& record) {
442 return record.indicator.IsFree();
444 [&](impl::SnapshotRecord<T>& record) { DeleteSnapshot(record); });
447 void DeleteSnapshot(impl::SnapshotRecord<T>& record) {
448 switch (destruction_type_) {
450 DeleteSnapshotSync(record);
453 engine::CriticalAsyncNoSpan([
this, &record,
454 token = wait_token_storage_.GetToken()] {
455 DeleteSnapshotSync(record);
461 void DeleteSnapshotSync(impl::SnapshotRecord<T>& record)
noexcept {
463 free_list_.Push(record);
469 impl::SnapshotRecordFreeList<T> free_list_;
470 impl::SnapshotRecordRetiredList<T> retired_list_;
471 std::atomic<impl::SnapshotRecord<T>*> current_;
472 utils::impl::WaitTokenStorage wait_token_storage_;