14#include <unordered_set> 
   15#include <userver/compiler/thread_local.hpp> 
   17#include <userver/concurrent/impl/asymmetric_fence.hpp> 
   18#include <userver/concurrent/impl/intrusive_hooks.hpp> 
   19#include <userver/concurrent/impl/intrusive_stack.hpp> 
   20#include <userver/concurrent/impl/striped_read_indicator.hpp> 
   21#include <userver/engine/async.hpp> 
   22#include <userver/engine/mutex.hpp> 
   23#include <userver/logging/log.hpp> 
   24#include <userver/rcu/fwd.hpp> 
   25#include <userver/utils/assert.hpp> 
   26#include <userver/utils/impl/wait_token_storage.hpp> 
   28USERVER_NAMESPACE_BEGIN
 
   40struct SnapshotRecord final {
 
   41  std::optional<T> data;
 
   42  concurrent::impl::StripedReadIndicator indicator;
 
   43  concurrent::impl::SinglyLinkedHook<SnapshotRecord> free_list_hook;
 
   44  SnapshotRecord* next_retired{
nullptr};
 
   50struct FreeListHookGetter {
 
   51  auto& operator()(SnapshotRecord<T>& node) 
const noexcept {
 
   52    return node.free_list_hook;
 
   57using SnapshotRecordFreeList =
 
   58    concurrent::impl::IntrusiveStack<SnapshotRecord<T>, FreeListHookGetter<T>>;
 
   61class SnapshotRecordRetiredList final {
 
   63  SnapshotRecordRetiredList() = 
default;
 
   65  bool IsEmpty() 
const noexcept { 
return head_ == 
nullptr; }
 
   67  void Push(SnapshotRecord<T>& record) 
noexcept {
 
   68    record.next_retired = head_;
 
   72  template <
typename Predicate, 
typename Disposer>
 
   73  void RemoveAndDisposeIf(Predicate predicate, Disposer disposer) {
 
   74    SnapshotRecord<T>** ptr_to_current = &head_;
 
   76    while (*ptr_to_current != 
nullptr) {
 
   77      SnapshotRecord<T>* 
const current = *ptr_to_current;
 
   79      if (predicate(*current)) {
 
   80        *ptr_to_current = std::exchange(current->next_retired, 
nullptr);
 
   83        ptr_to_current = ¤t->next_retired;
 
   89  SnapshotRecord<T>* head_{
nullptr};
 
   98struct DefaultRcuTraits {
 
   99  using MutexType = engine::Mutex;
 
  107template <
typename T, 
typename RcuTraits>
 
  108class [[nodiscard]] ReadablePtr final {
 
  110  explicit ReadablePtr(
const Variable<T, RcuTraits>& ptr) {
 
  111    auto* record = ptr.current_.load();
 
  116      lock_ = record->indicator.Lock();
 
  138      concurrent::impl::AsymmetricThreadFenceLight();
 
  142      auto* new_current = ptr.current_.load(std::memory_order_seq_cst);
 
  143      if (new_current == record) 
break;
 
  146      record = new_current;
 
  149    ptr_ = &*record->data;
 
  152  ReadablePtr(ReadablePtr&& other) 
noexcept = 
default;
 
  153  ReadablePtr& operator=(ReadablePtr&& other) 
noexcept = 
default;
 
  154  ReadablePtr(
const ReadablePtr& other) = 
default;
 
  155  ReadablePtr& operator=(
const ReadablePtr& other) = 
default;
 
  156  ~ReadablePtr() = 
default;
 
  158  const T* Get() 
const& {
 
  163  const T* Get() && { 
return GetOnRvalue(); }
 
  165  const T* operator->() 
const& { 
return Get(); }
 
  166  const T* operator->() && { 
return GetOnRvalue(); }
 
  168  const T& operator*() 
const& { 
return *Get(); }
 
  169  const T& operator*() && { 
return *GetOnRvalue(); }
 
  172  const T* GetOnRvalue() {
 
  173    static_assert(!
sizeof(T),
 
  174                  "Don't use temporary ReadablePtr, store it to a variable");
 
  179  concurrent::impl::StripedReadIndicatorLock lock_;
 
  192template <
typename T, 
typename RcuTraits>
 
  193class [[nodiscard]] WritablePtr final {
 
  197  explicit WritablePtr(Variable<T, RcuTraits>& var)
 
  200        record_(&var.EmplaceSnapshot(*var.current_.load()->data)) {}
 
  203  template <
typename... Args>
 
  204  WritablePtr(Variable<T, RcuTraits>& var, std::in_place_t,
 
  205              Args&&... initial_value_args)
 
  209            &var.EmplaceSnapshot(std::forward<Args>(initial_value_args)...)) {}
 
  212  WritablePtr(WritablePtr&& other) 
noexcept 
  214        lock_(std::move(other.lock_)),
 
  215        record_(std::exchange(other.record_, 
nullptr)) {}
 
  220      var_.DeleteSnapshotSync(*record_);
 
  229    var_.DoAssign(*std::exchange(record_, 
nullptr), lock_);
 
  235    return &*record_->data;
 
  238  T* Get() && { 
return GetOnRvalue(); }
 
  240  T* operator->() & { 
return Get(); }
 
  241  T* operator->() && { 
return GetOnRvalue(); }
 
  243  T& operator*() & { 
return *Get(); }
 
  244  T& operator*() && { 
return *GetOnRvalue(); }
 
  247  [[noreturn]] 
static T* GetOnRvalue() {
 
  248    static_assert(!
sizeof(T),
 
  249                  "Don't use temporary WritablePtr, store it to a variable");
 
  253  Variable<T, RcuTraits>& var_;
 
  254  std::unique_lock<
typename RcuTraits::MutexType> lock_;
 
  255  impl::SnapshotRecord<T>* record_;
 
  286template <
typename T, 
typename RcuTraits>
 
  287class Variable final {
 
  289  using MutexType = 
typename RcuTraits::MutexType;
 
  295  template <
typename... Args>
 
  298      : destruction_type_(std::is_trivially_destructible_v<T> ||
 
  299                                  std::is_same_v<T, std::string> ||
 
  300                                  !std::is_same_v<MutexType, engine::Mutex>
 
  303        current_(&EmplaceSnapshot(std::forward<Args>(initial_value_args)...)) {}
 
  310  template <
typename... Args>
 
  312                    Args&&... initial_value_args)
 
  313      : destruction_type_(destruction_type),
 
  314        current_(&EmplaceSnapshot(std::forward<Args>(initial_value_args)...)) {}
 
  316  Variable(
const Variable&) = 
delete;
 
  317  Variable(Variable&&) = 
delete;
 
  318  Variable& operator=(
const Variable&) = 
delete;
 
  319  Variable& operator=(Variable&&) = 
delete;
 
  323      auto* record = current_.load();
 
  325                  "RCU variable is destroyed while being used");
 
  329    retired_list_.RemoveAndDisposeIf(
 
  330        [](impl::SnapshotRecord<T>&) { 
return true; },
 
  331        [](impl::SnapshotRecord<T>& record) {
 
  333                      "RCU variable is destroyed while being used");
 
  338      wait_token_storage_.WaitForAllTokens();
 
  341    free_list_.DisposeUnsafe(
 
  342        [](impl::SnapshotRecord<T>& record) { 
delete &record; });
 
  346  ReadablePtr<T, RcuTraits> 
Read() 
const {
 
  347    return ReadablePtr<T, RcuTraits>(*
this);
 
  360    return WritablePtr<T, RcuTraits>(*
this);
 
  365  template <
typename... Args>
 
  367    return WritablePtr<T, RcuTraits>(*
this, std::in_place,
 
  368                                     std::forward<Args>(args)...);
 
  373    WritablePtr<T, RcuTraits>(*
this, std::in_place, std::move(new_value))
 
  378  template <
typename... Args>
 
  380    WritablePtr<T, RcuTraits>(*
this, std::in_place, std::forward<Args>(args)...)
 
  385    std::unique_lock lock(mutex_, std::try_to_lock);
 
  386    if (!lock.owns_lock()) {
 
  391    ScanRetiredList(lock);
 
  395  friend class ReadablePtr<T, RcuTraits>;
 
  396  friend class WritablePtr<T, RcuTraits>;
 
  398  void DoAssign(impl::SnapshotRecord<T>& new_snapshot,
 
  399                std::unique_lock<MutexType>& lock) {
 
  403    auto* 
const old_snapshot = current_.load();
 
  404    current_.store(&new_snapshot, std::memory_order_seq_cst);
 
  407    retired_list_.Push(*old_snapshot);
 
  408    ScanRetiredList(lock);
 
  411  template <
typename... Args>
 
  412  [[nodiscard]] impl::SnapshotRecord<T>& EmplaceSnapshot(Args&&... args) {
 
  413    auto* 
const free_list_record = free_list_.TryPop();
 
  415        free_list_record ? *free_list_record : *
new impl::SnapshotRecord<T>{};
 
  419      record.data.emplace(std::forward<Args>(args)...);
 
  421      if (free_list_record) {
 
  422        free_list_.Push(record);
 
  434  void ScanRetiredList(std::unique_lock<MutexType>& lock) 
noexcept {
 
  436    if (retired_list_.IsEmpty()) 
return;
 
  438    concurrent::impl::AsymmetricThreadFenceHeavy();
 
  440    retired_list_.RemoveAndDisposeIf(
 
  441        [](impl::SnapshotRecord<T>& record) {
 
  442          return record.indicator.IsFree();
 
  444        [&](impl::SnapshotRecord<T>& record) { DeleteSnapshot(record); });
 
  447  void DeleteSnapshot(impl::SnapshotRecord<T>& record) {
 
  448    switch (destruction_type_) {
 
  450        DeleteSnapshotSync(record);
 
  453        engine::CriticalAsyncNoSpan([
this, &record,
 
  454                                     token = wait_token_storage_.GetToken()] {
 
  455          DeleteSnapshotSync(record);
 
  461  void DeleteSnapshotSync(impl::SnapshotRecord<T>& record) 
noexcept {
 
  463    free_list_.Push(record);
 
  469  impl::SnapshotRecordFreeList<T> free_list_;
 
  470  impl::SnapshotRecordRetiredList<T> retired_list_;
 
  471  std::atomic<impl::SnapshotRecord<T>*> current_;
 
  472  utils::impl::WaitTokenStorage wait_token_storage_;