12#include <userver/concurrent/impl/asymmetric_fence.hpp>
13#include <userver/concurrent/impl/intrusive_hooks.hpp>
14#include <userver/concurrent/impl/intrusive_stack.hpp>
15#include <userver/concurrent/impl/striped_read_indicator.hpp>
16#include <userver/engine/async.hpp>
17#include <userver/engine/mutex.hpp>
18#include <userver/rcu/fwd.hpp>
19#include <userver/utils/assert.hpp>
20#include <userver/utils/impl/wait_token_storage.hpp>
22USERVER_NAMESPACE_BEGIN
34struct SnapshotRecord
final {
35 std::optional<T> data;
36 concurrent::impl::StripedReadIndicator indicator;
37 concurrent::impl::SinglyLinkedHook<SnapshotRecord> free_list_hook;
38 SnapshotRecord* next_retired{
nullptr};
44struct FreeListHookGetter {
45 auto& operator()(SnapshotRecord<T>& node)
const noexcept {
return node.free_list_hook; }
49struct SnapshotRecordFreeList {
50 SnapshotRecordFreeList() =
default;
52 ~SnapshotRecordFreeList() {
53 list.DisposeUnsafe([](SnapshotRecord<T>& record) {
delete &record; });
56 concurrent::impl::IntrusiveStack<SnapshotRecord<T>, FreeListHookGetter<T>> list;
60class SnapshotRecordRetiredList
final {
62 SnapshotRecordRetiredList() =
default;
64 bool IsEmpty()
const noexcept {
return head_ ==
nullptr; }
66 void Push(SnapshotRecord<T>& record)
noexcept {
67 record.next_retired = head_;
71 template <
typename Predicate,
typename Disposer>
72 void RemoveAndDisposeIf(Predicate predicate, Disposer disposer) {
73 SnapshotRecord<T>** ptr_to_current = &head_;
75 while (*ptr_to_current !=
nullptr) {
76 SnapshotRecord<T>*
const current = *ptr_to_current;
78 if (predicate(*current)) {
79 *ptr_to_current = std::exchange(current->next_retired,
nullptr);
82 ptr_to_current = ¤t->next_retired;
88 SnapshotRecord<T>* head_{
nullptr};
97class SnapshotHandle
final {
99 SnapshotHandle(SnapshotHandle&& other)
noexcept
100 : record_(std::exchange(other.record_,
nullptr)), free_list_(std::exchange(other.free_list_,
nullptr)) {}
103 if (record_ !=
nullptr) {
104 UASSERT(free_list_ !=
nullptr);
105 record_->data.reset();
106 free_list_->list.Push(*record_);
111 template <
typename ,
typename Traits>
112 friend class Variable;
114 template <
typename ,
typename Traits>
115 friend class WritablePtr;
117 explicit SnapshotHandle(impl::SnapshotRecord<T>& record, impl::SnapshotRecordFreeList<T>& free_list)
noexcept
118 : record_(&record), free_list_(&free_list) {}
120 impl::SnapshotRecord<T>* record_;
121 impl::SnapshotRecordFreeList<T>* free_list_;
126struct SyncDeleter
final {
127 template <
typename T>
128 void Delete(SnapshotHandle<T>&& handle)
noexcept {
129 [[maybe_unused]]
const auto for_deletion = std::move(handle);
135class AsyncDeleter
final {
137 ~AsyncDeleter() { wait_token_storage_.WaitForAllTokens(); }
139 template <
typename T>
140 void Delete(SnapshotHandle<T>&& handle)
noexcept {
141 if constexpr (std::is_trivially_destructible_v<T> || std::is_same_v<T, std::string>) {
142 SyncDeleter{}.Delete(std::move(handle));
145 engine::CriticalAsyncNoSpan(
147 [token = wait_token_storage_.GetToken(), handle = std::move(handle)]()
mutable {}
159 utils::impl::WaitTokenStorage wait_token_storage_;
172 using MutexType = engine::Mutex;
177 using DeleterType = AsyncDeleter;
186 using DeleterType = SyncDeleter;
196 using MutexType = std::mutex;
197 using DeleterType = SyncDeleter;
205template <
typename T,
typename RcuTraits>
206class [[nodiscard]] ReadablePtr
final {
208 explicit ReadablePtr(
const Variable<T, RcuTraits>& ptr) {
209 auto* record = ptr.current_.load();
214 lock_ = record->indicator.Lock();
236 concurrent::impl::AsymmetricThreadFenceLight();
240 auto* new_current = ptr.current_.load(std::memory_order_seq_cst);
241 if (new_current == record)
break;
244 record = new_current;
247 ptr_ = &*record->data;
250 ReadablePtr(ReadablePtr&& other)
noexcept =
default;
251 ReadablePtr& operator=(ReadablePtr&& other)
noexcept =
default;
252 ReadablePtr(
const ReadablePtr& other) =
default;
253 ReadablePtr& operator=(
const ReadablePtr& other) =
default;
254 ~ReadablePtr() =
default;
256 const T* Get()
const& {
261 const T* Get() && {
return GetOnRvalue(); }
263 const T* operator->()
const& {
return Get(); }
264 const T* operator->() && {
return GetOnRvalue(); }
266 const T& operator*()
const& {
return *Get(); }
267 const T& operator*() && {
return *GetOnRvalue(); }
270 const T* GetOnRvalue() {
271 static_assert(!
sizeof(T),
"Don't use temporary ReadablePtr, store it to a variable");
276 concurrent::impl::StripedReadIndicatorLock lock_;
289template <
typename T,
typename RcuTraits>
290class [[nodiscard]] WritablePtr
final {
294 explicit WritablePtr(Variable<T, RcuTraits>& var)
295 : var_(var), lock_(var.mutex_), record_(&var.EmplaceSnapshot(*var.current_.load()->data)) {}
298 template <
typename... Args>
299 WritablePtr(Variable<T, RcuTraits>& var, std::in_place_t, Args&&... initial_value_args)
300 : var_(var), lock_(var.mutex_), record_(&var.EmplaceSnapshot(std::forward<Args>(initial_value_args)...)) {}
303 WritablePtr(WritablePtr&& other)
noexcept
304 : var_(other.var_), lock_(std::move(other.lock_)), record_(std::exchange(other.record_,
nullptr)) {}
308 var_.DeleteSnapshot(*record_);
317 var_.DoAssign(*std::exchange(record_,
nullptr), lock_);
323 return &*record_->data;
326 T* Get() && {
return GetOnRvalue(); }
328 T* operator->() & {
return Get(); }
329 T* operator->() && {
return GetOnRvalue(); }
331 T& operator*() & {
return *Get(); }
332 T& operator*() && {
return *GetOnRvalue(); }
335 [[noreturn]]
static T* GetOnRvalue() {
336 static_assert(!
sizeof(T),
"Don't use temporary WritablePtr, store it to a variable");
340 Variable<T, RcuTraits>& var_;
341 std::unique_lock<
typename RcuTraits::MutexType> lock_;
342 impl::SnapshotRecord<T>* record_;
372template <
typename T,
typename RcuTraits>
373class Variable
final {
376 "RcuTraits should publicly inherit from rcu::DefaultRcuTraits"
380 using MutexType =
typename RcuTraits::MutexType;
381 using DeleterType =
typename RcuTraits::DeleterType;
386 template <
typename... Args>
388 Variable(Args&&... initial_value_args) : current_(&EmplaceSnapshot(std::forward<Args>(initial_value_args)...)) {}
390 Variable(
const Variable&) =
delete;
391 Variable(Variable&&) =
delete;
392 Variable& operator=(
const Variable&) =
delete;
393 Variable& operator=(Variable&&) =
delete;
397 auto* record = current_.load();
398 UASSERT_MSG(record->indicator.IsFree(),
"RCU variable is destroyed while being used");
402 retired_list_.RemoveAndDisposeIf(
403 [](impl::SnapshotRecord<T>&) {
return true; },
404 [](impl::SnapshotRecord<T>& record) {
405 UASSERT_MSG(record.indicator.IsFree(),
"RCU variable is destroyed while being used");
412 ReadablePtr<T, RcuTraits>
Read()
const {
return ReadablePtr<T, RcuTraits>(*
this); }
423 WritablePtr<T, RcuTraits>
StartWrite() {
return WritablePtr<T, RcuTraits>(*
this); }
427 template <
typename... Args>
429 return WritablePtr<T, RcuTraits>(*
this, std::in_place, std::forward<Args>(args)...);
433 void Assign(T new_value) { WritablePtr<T, RcuTraits>(*
this, std::in_place, std::move(new_value)).Commit(); }
436 template <
typename... Args>
438 WritablePtr<T, RcuTraits>(*
this, std::in_place, std::forward<Args>(args)...).Commit();
442 std::unique_lock lock(mutex_, std::try_to_lock);
443 if (!lock.owns_lock()) {
448 ScanRetiredList(lock);
452 friend class ReadablePtr<T, RcuTraits>;
453 friend class WritablePtr<T, RcuTraits>;
455 void DoAssign(impl::SnapshotRecord<T>& new_snapshot, std::unique_lock<MutexType>& lock) {
459 auto*
const old_snapshot = current_.load();
460 current_.store(&new_snapshot, std::memory_order_seq_cst);
463 retired_list_.Push(*old_snapshot);
464 ScanRetiredList(lock);
467 template <
typename... Args>
468 [[nodiscard]] impl::SnapshotRecord<T>& EmplaceSnapshot(Args&&... args) {
469 auto*
const free_list_record = free_list_.list.TryPop();
470 auto& record = free_list_record ? *free_list_record : *
new impl::SnapshotRecord<T>{};
474 record.data.emplace(std::forward<Args>(args)...);
476 free_list_.list.Push(record);
483 void ScanRetiredList(std::unique_lock<MutexType>& lock)
noexcept {
485 if (retired_list_.IsEmpty())
return;
487 concurrent::impl::AsymmetricThreadFenceHeavy();
489 retired_list_.RemoveAndDisposeIf(
490 [](impl::SnapshotRecord<T>& record) {
return record.indicator.IsFree(); },
491 [&](impl::SnapshotRecord<T>& record) { DeleteSnapshot(record); }
495 void DeleteSnapshot(impl::SnapshotRecord<T>& record)
noexcept {
497 noexcept(deleter_.Delete(SnapshotHandle<T>{record, free_list_})),
"DeleterType::Delete must be noexcept"
499 deleter_.Delete(SnapshotHandle<T>{record, free_list_});
504 impl::SnapshotRecordFreeList<T> free_list_;
505 impl::SnapshotRecordRetiredList<T> retired_list_;
508 DeleterType deleter_{};
511 std::atomic<impl::SnapshotRecord<T>*> current_;