9#include <unordered_set> 
   11#include <userver/engine/async.hpp> 
   12#include <userver/engine/mutex.hpp> 
   13#include <userver/logging/log.hpp> 
   14#include <userver/rcu/fwd.hpp> 
   15#include <userver/utils/assert.hpp> 
   16#include <userver/utils/impl/wait_token_storage.hpp> 
   18USERVER_NAMESPACE_BEGIN
 
   37template <
typename T, 
typename RcuTraits>
 
   38struct HazardPointerRecord final {
 
   44  static inline T* 
const kUsed = 
reinterpret_cast<T*>(1);
 
   46  explicit HazardPointerRecord(
const Variable<T, RcuTraits>& owner)
 
   49  std::atomic<T*> ptr = kUsed;
 
   50  const Variable<T, RcuTraits>& owner;
 
   51  std::atomic<HazardPointerRecord*> next{
nullptr};
 
   54  void Release() { ptr = 
nullptr; }
 
   57template <
typename T, 
typename RcuTraits>
 
   59  impl::HazardPointerRecord<T, RcuTraits>* hp{
nullptr};
 
   60  const Variable<T, RcuTraits>* variable{
nullptr};
 
   62  uint64_t variable_epoch{0};
 
   65template <
typename T, 
typename RcuTraits>
 
   66thread_local CachedData<T, RcuTraits> cache;
 
   68uint64_t GetNextEpoch() 
noexcept;
 
   76struct DefaultRcuTraits {
 
   77  using MutexType = engine::Mutex;
 
   85template <
typename T, 
typename RcuTraits>
 
   86class [[nodiscard]] ReadablePtr final {
 
   88  explicit ReadablePtr(
const Variable<T, RcuTraits>& ptr)
 
   89      : hp_record_(&ptr.MakeHazardPointer()) {
 
   96      t_ptr_ = ptr.GetCurrent();
 
   98      hp_record_->ptr.store(t_ptr_);
 
   99    } 
while (t_ptr_ != ptr.GetCurrent());
 
  102  ReadablePtr(ReadablePtr<T, RcuTraits>&& other) 
noexcept 
  103      : t_ptr_(other.t_ptr_), hp_record_(other.hp_record_) {
 
  104    other.t_ptr_ = 
nullptr;
 
  107  ReadablePtr& operator=(ReadablePtr<T, RcuTraits>&& other) 
noexcept {
 
  123    UASSERT_MSG(
this != &other, 
"Self assignment to RCU variable");
 
  124    if (
this == &other) {
 
  131      hp_record_->Release();
 
  137    hp_record_ = other.hp_record_;
 
  138    t_ptr_ = other.t_ptr_;
 
  142    other.t_ptr_ = 
nullptr;
 
  150  ReadablePtr(
const ReadablePtr<T, RcuTraits>& other)
 
  151      : ReadablePtr(other.hp_record_->owner) {}
 
  153  ReadablePtr& operator=(
const ReadablePtr<T, RcuTraits>& other) {
 
  154    if (
this != &other) *
this = ReadablePtr<T, RcuTraits>{other};
 
  160    UASSERT(hp_record_ != 
nullptr);
 
  161    hp_record_->Release();
 
  164  const T* Get() 
const& {
 
  169  const T* Get() && { 
return GetOnRvalue(); }
 
  171  const T* operator->() 
const& { 
return Get(); }
 
  172  const T* operator->() && { 
return GetOnRvalue(); }
 
  174  const T& operator*() 
const& { 
return *Get(); }
 
  175  const T& operator*() && { 
return *GetOnRvalue(); }
 
  178  const T* GetOnRvalue() {
 
  179    static_assert(!
sizeof(T),
 
  180                  "Don't use temporary ReadablePtr, store it to a variable");
 
  192  impl::HazardPointerRecord<T, RcuTraits>* hp_record_;
 
  205template <
typename T, 
typename RcuTraits>
 
  206class [[nodiscard]] WritablePtr final {
 
  213    LOG_TRACE() << 
"Start writing ptr=" << ptr_.get();
 
  217  template <
typename... Args>
 
  219              Args&&... initial_value_args)
 
  223    LOG_TRACE() << 
"Start writing ptr=" << ptr_.get()
 
  224                << 
" with custom initial value";
 
  227  WritablePtr(WritablePtr<T, RcuTraits>&& other) 
noexcept 
  229        lock_(std::move(other.lock_)),
 
  230        ptr_(std::move(other.ptr_)) {
 
  231    LOG_TRACE() << 
"Continue writing ptr=" << ptr_.get();
 
  236      LOG_TRACE() << 
"Stop writing ptr=" << ptr_.get();
 
  245    LOG_TRACE() << 
"Committing ptr=" << ptr_.get();
 
  247    std::unique_ptr<T> old_ptr(var_.current_.exchange(ptr_.release()));
 
  248    var_.Retire(std::move(old_ptr), lock_);
 
  257  T* Get() && { 
return GetOnRvalue(); }
 
  259  T* operator->() & { 
return Get(); }
 
  260  T* operator->() && { 
return GetOnRvalue(); }
 
  262  T& operator*() & { 
return *Get(); }
 
  263  T& operator*() && { 
return *GetOnRvalue(); }
 
  267    static_assert(!
sizeof(T),
 
  268                  "Don't use temporary WritablePtr, store it to a variable");
 
  272  Variable<T, RcuTraits>& var_;
 
  273  std::unique_lock<
typename RcuTraits::MutexType> lock_;
 
  274  std::unique_ptr<T> ptr_;
 
  305template <
typename T, 
typename RcuTraits>
 
  306class Variable final {
 
  308  using MutexType = 
typename RcuTraits::MutexType;
 
  314  template <
typename... Args>
 
  316      : destruction_type_(std::is_trivially_destructible_v<T> ||
 
  317                                  std::is_same_v<T, std::string> ||
 
  318                                  !std::is_same_v<MutexType, engine::Mutex>
 
  321        epoch_(impl::GetNextEpoch()),
 
  322        current_(
new T(std::forward<Args>(initial_value_args)...)) {}
 
  329  template <
typename... Args>
 
  331      : destruction_type_(destruction_type),
 
  332        epoch_(impl::GetNextEpoch()),
 
  333        current_(
new T(std::forward<Args>(initial_value_args)...)) {}
 
  335  Variable(
const Variable&) = 
delete;
 
  336  Variable(Variable&&) = 
delete;
 
  337  Variable& operator=(
const Variable&) = 
delete;
 
  338  Variable& operator=(Variable&&) = 
delete;
 
  341    delete current_.load();
 
  343    auto* hp = hp_record_head_.load();
 
  345      auto* next = hp->next.load();
 
  347                  "RCU variable is destroyed while being used");
 
  354      wait_token_storage_.WaitForAllTokens();
 
  359  ReadablePtr<T, RcuTraits> 
Read() 
const {
 
  360    return ReadablePtr<T, RcuTraits>(*
this);
 
  373    return WritablePtr<T, RcuTraits>(*
this);
 
  378  template <
typename... Args>
 
  380    return WritablePtr<T, RcuTraits>(*
this, std::in_place,
 
  381                                     std::forward<Args>(args)...);
 
  386    WritablePtr<T, RcuTraits>(*
this, std::in_place, std::move(new_value))
 
  391  template <
typename... Args>
 
  393    WritablePtr<T, RcuTraits>(*
this, std::in_place, std::forward<Args>(args)...)
 
  398    std::unique_lock lock(mutex_, std::try_to_lock);
 
  399    if (!lock.owns_lock()) {
 
  400      LOG_TRACE() << 
"Not cleaning up, someone else is holding the mutex lock";
 
  405    ScanRetiredList(CollectHazardPtrs(lock));
 
  409  T* GetCurrent() 
const { 
return current_.load(); }
 
  411  impl::HazardPointerRecord<T, RcuTraits>* MakeHazardPointerCached() 
const {
 
  412    auto& cache = impl::cache<T, RcuTraits>;
 
  415    if (hp && cache.variable == 
this && cache.variable_epoch == epoch_) {
 
  416      if (hp->ptr.load() == 
nullptr &&
 
  417          hp->ptr.compare_exchange_strong(
 
  418              ptr, impl::HazardPointerRecord<T, RcuTraits>::kUsed)) {
 
  426  impl::HazardPointerRecord<T, RcuTraits>* MakeHazardPointerFast() 
const {
 
  429    auto* hp = hp_record_head_.load();
 
  432      if (hp->ptr.load() == 
nullptr &&
 
  433          hp->ptr.compare_exchange_strong(
 
  434              t_ptr, impl::HazardPointerRecord<T, RcuTraits>::kUsed)) {
 
  443  impl::HazardPointerRecord<T, RcuTraits>& MakeHazardPointer() 
const {
 
  444    auto* hp = MakeHazardPointerCached();
 
  446      hp = MakeHazardPointerFast();
 
  448      if (!hp) hp = MakeHazardPointerSlow();
 
  450      auto& cache = impl::cache<T, RcuTraits>;
 
  452      cache.variable = 
this;
 
  453      cache.variable_epoch = epoch_;
 
  459  impl::HazardPointerRecord<T, RcuTraits>* MakeHazardPointerSlow() 
const {
 
  461    auto hp = 
new impl::HazardPointerRecord<T, RcuTraits>(*
this);
 
  462    impl::HazardPointerRecord<T, RcuTraits>* old_hp = 
nullptr;
 
  464      old_hp = hp_record_head_.load();
 
  466    } 
while (!hp_record_head_.compare_exchange_strong(old_hp, hp));
 
  470  void Retire(std::unique_ptr<T> old_ptr, std::unique_lock<MutexType>& lock) {
 
  471    LOG_TRACE() << 
"Retiring ptr=" << old_ptr.get();
 
  472    auto hazard_ptrs = CollectHazardPtrs(lock);
 
  474    if (hazard_ptrs.count(old_ptr.get()) > 0) {
 
  476      LOG_TRACE() << 
"Not retire, still used ptr=" << old_ptr.get();
 
  477      retire_list_head_.push_back(std::move(old_ptr));
 
  479      LOG_TRACE() << 
"Retire, not used ptr=" << old_ptr.get();
 
  480      DeleteAsync(std::move(old_ptr));
 
  483    ScanRetiredList(hazard_ptrs);
 
  488  void ScanRetiredList(
const std::unordered_set<T*>& hazard_ptrs) {
 
  489    for (
auto rit = retire_list_head_.begin();
 
  490         rit != retire_list_head_.end();) {
 
  491      auto current = rit++;
 
  492      if (hazard_ptrs.count(current->get()) == 0) {
 
  494        DeleteAsync(std::move(*current));
 
  495        retire_list_head_.erase(current);
 
  502  std::unordered_set<T*> CollectHazardPtrs(std::unique_lock<MutexType>&) {
 
  503    std::unordered_set<T*> hazard_ptrs;
 
  506    for (
auto* hp = hp_record_head_.load(); hp; hp = hp->next) {
 
  507      hazard_ptrs.insert(hp->ptr.load());
 
  512  void DeleteAsync(std::unique_ptr<T> ptr) {
 
  513    switch (destruction_type_) {
 
  514      case DestructionType::kSync:
 
  518        engine::CriticalAsyncNoSpan([ptr = std::move(ptr),
 
  519                                     token = wait_token_storage_
 
  520                                                 .GetToken()]() 
mutable {
 
  529  const uint64_t epoch_;
 
  531  mutable std::atomic<impl::HazardPointerRecord<T, RcuTraits>*> hp_record_head_{
 
  536  std::atomic<T*> current_;
 
  537  std::list<std::unique_ptr<T>> retire_list_head_;
 
  538  utils::impl::WaitTokenStorage wait_token_storage_;
 
  540  friend class ReadablePtr<T, RcuTraits>;
 
  541  friend class WritablePtr<T, RcuTraits>;