11#include <userver/concurrent/impl/asymmetric_fence.hpp>
12#include <userver/concurrent/impl/intrusive_hooks.hpp>
13#include <userver/concurrent/impl/intrusive_stack.hpp>
14#include <userver/concurrent/impl/striped_read_indicator.hpp>
15#include <userver/engine/async.hpp>
16#include <userver/engine/mutex.hpp>
17#include <userver/rcu/fwd.hpp>
18#include <userver/utils/assert.hpp>
19#include <userver/utils/impl/wait_token_storage.hpp>
21USERVER_NAMESPACE_BEGIN
33struct SnapshotRecord final {
34 std::optional<T> data;
35 concurrent::impl::StripedReadIndicator indicator;
36 concurrent::impl::SinglyLinkedHook<SnapshotRecord> free_list_hook;
37 SnapshotRecord* next_retired{
nullptr};
43struct FreeListHookGetter {
44 auto& operator()(SnapshotRecord<T>& node)
const noexcept {
45 return node.free_list_hook;
50using SnapshotRecordFreeList =
51 concurrent::impl::IntrusiveStack<SnapshotRecord<T>, FreeListHookGetter<T>>;
54class SnapshotRecordRetiredList final {
56 SnapshotRecordRetiredList() =
default;
58 bool IsEmpty()
const noexcept {
return head_ ==
nullptr; }
60 void Push(SnapshotRecord<T>& record)
noexcept {
61 record.next_retired = head_;
65 template <
typename Predicate,
typename Disposer>
66 void RemoveAndDisposeIf(Predicate predicate, Disposer disposer) {
67 SnapshotRecord<T>** ptr_to_current = &head_;
69 while (*ptr_to_current !=
nullptr) {
70 SnapshotRecord<T>*
const current = *ptr_to_current;
72 if (predicate(*current)) {
73 *ptr_to_current = std::exchange(current->next_retired,
nullptr);
76 ptr_to_current = ¤t->next_retired;
82 SnapshotRecord<T>* head_{
nullptr};
91struct DefaultRcuTraits {
92 using MutexType = engine::Mutex;
100template <
typename T,
typename RcuTraits>
101class [[nodiscard]] ReadablePtr final {
103 explicit ReadablePtr(
const Variable<T, RcuTraits>& ptr) {
104 auto* record = ptr.current_.load();
109 lock_ = record->indicator.Lock();
131 concurrent::impl::AsymmetricThreadFenceLight();
135 auto* new_current = ptr.current_.load(std::memory_order_seq_cst);
136 if (new_current == record)
break;
139 record = new_current;
142 ptr_ = &*record->data;
145 ReadablePtr(ReadablePtr&& other)
noexcept =
default;
146 ReadablePtr& operator=(ReadablePtr&& other)
noexcept =
default;
147 ReadablePtr(
const ReadablePtr& other) =
default;
148 ReadablePtr& operator=(
const ReadablePtr& other) =
default;
149 ~ReadablePtr() =
default;
151 const T* Get()
const& {
156 const T* Get() && {
return GetOnRvalue(); }
158 const T* operator->()
const& {
return Get(); }
159 const T* operator->() && {
return GetOnRvalue(); }
161 const T& operator*()
const& {
return *Get(); }
162 const T& operator*() && {
return *GetOnRvalue(); }
165 const T* GetOnRvalue() {
166 static_assert(!
sizeof(T),
167 "Don't use temporary ReadablePtr, store it to a variable");
172 concurrent::impl::StripedReadIndicatorLock lock_;
185template <
typename T,
typename RcuTraits>
186class [[nodiscard]] WritablePtr final {
190 explicit WritablePtr(Variable<T, RcuTraits>& var)
193 record_(&var.EmplaceSnapshot(*var.current_.load()->data)) {}
196 template <
typename... Args>
197 WritablePtr(Variable<T, RcuTraits>& var, std::in_place_t,
198 Args&&... initial_value_args)
202 &var.EmplaceSnapshot(std::forward<Args>(initial_value_args)...)) {}
205 WritablePtr(WritablePtr&& other)
noexcept
207 lock_(std::move(other.lock_)),
208 record_(std::exchange(other.record_,
nullptr)) {}
213 var_.DeleteSnapshotSync(*record_);
222 var_.DoAssign(*std::exchange(record_,
nullptr), lock_);
228 return &*record_->data;
231 T* Get() && {
return GetOnRvalue(); }
233 T* operator->() & {
return Get(); }
234 T* operator->() && {
return GetOnRvalue(); }
236 T& operator*() & {
return *Get(); }
237 T& operator*() && {
return *GetOnRvalue(); }
240 [[noreturn]]
static T* GetOnRvalue() {
241 static_assert(!
sizeof(T),
242 "Don't use temporary WritablePtr, store it to a variable");
246 Variable<T, RcuTraits>& var_;
247 std::unique_lock<
typename RcuTraits::MutexType> lock_;
248 impl::SnapshotRecord<T>* record_;
279template <
typename T,
typename RcuTraits>
280class Variable final {
282 using MutexType =
typename RcuTraits::MutexType;
288 template <
typename... Args>
291 : destruction_type_(std::is_trivially_destructible_v<T> ||
292 std::is_same_v<T, std::string> ||
293 !std::is_same_v<MutexType, engine::Mutex>
296 current_(&EmplaceSnapshot(std::forward<Args>(initial_value_args)...)) {}
303 template <
typename... Args>
305 Args&&... initial_value_args)
306 : destruction_type_(destruction_type),
307 current_(&EmplaceSnapshot(std::forward<Args>(initial_value_args)...)) {}
309 Variable(
const Variable&) =
delete;
310 Variable(Variable&&) =
delete;
311 Variable& operator=(
const Variable&) =
delete;
312 Variable& operator=(Variable&&) =
delete;
316 auto* record = current_.load();
318 "RCU variable is destroyed while being used");
322 retired_list_.RemoveAndDisposeIf(
323 [](impl::SnapshotRecord<T>&) {
return true; },
324 [](impl::SnapshotRecord<T>& record) {
326 "RCU variable is destroyed while being used");
331 wait_token_storage_.WaitForAllTokens();
334 free_list_.DisposeUnsafe(
335 [](impl::SnapshotRecord<T>& record) {
delete &record; });
339 ReadablePtr<T, RcuTraits>
Read()
const {
340 return ReadablePtr<T, RcuTraits>(*
this);
353 return WritablePtr<T, RcuTraits>(*
this);
358 template <
typename... Args>
360 return WritablePtr<T, RcuTraits>(*
this, std::in_place,
361 std::forward<Args>(args)...);
366 WritablePtr<T, RcuTraits>(*
this, std::in_place, std::move(new_value))
371 template <
typename... Args>
373 WritablePtr<T, RcuTraits>(*
this, std::in_place, std::forward<Args>(args)...)
378 std::unique_lock lock(mutex_, std::try_to_lock);
379 if (!lock.owns_lock()) {
384 ScanRetiredList(lock);
388 friend class ReadablePtr<T, RcuTraits>;
389 friend class WritablePtr<T, RcuTraits>;
391 void DoAssign(impl::SnapshotRecord<T>& new_snapshot,
392 std::unique_lock<MutexType>& lock) {
396 auto*
const old_snapshot = current_.load();
397 current_.store(&new_snapshot, std::memory_order_seq_cst);
400 retired_list_.Push(*old_snapshot);
401 ScanRetiredList(lock);
404 template <
typename... Args>
405 [[nodiscard]] impl::SnapshotRecord<T>& EmplaceSnapshot(Args&&... args) {
406 auto*
const free_list_record = free_list_.TryPop();
408 free_list_record ? *free_list_record : *
new impl::SnapshotRecord<T>{};
412 record.data.emplace(std::forward<Args>(args)...);
414 if (free_list_record) {
415 free_list_.Push(record);
427 void ScanRetiredList(std::unique_lock<MutexType>& lock)
noexcept {
429 if (retired_list_.IsEmpty())
return;
431 concurrent::impl::AsymmetricThreadFenceHeavy();
433 retired_list_.RemoveAndDisposeIf(
434 [](impl::SnapshotRecord<T>& record) {
435 return record.indicator.IsFree();
437 [&](impl::SnapshotRecord<T>& record) { DeleteSnapshot(record); });
440 void DeleteSnapshot(impl::SnapshotRecord<T>& record) {
441 switch (destruction_type_) {
443 DeleteSnapshotSync(record);
446 engine::CriticalAsyncNoSpan([
this, &record,
447 token = wait_token_storage_.GetToken()] {
448 DeleteSnapshotSync(record);
454 void DeleteSnapshotSync(impl::SnapshotRecord<T>& record)
noexcept {
456 free_list_.Push(record);
462 impl::SnapshotRecordFreeList<T> free_list_;
463 impl::SnapshotRecordRetiredList<T> retired_list_;
464 std::atomic<impl::SnapshotRecord<T>*> current_;
465 utils::impl::WaitTokenStorage wait_token_storage_;