9#include <unordered_set>
11#include <userver/compiler/thread_local.hpp>
12#include <userver/engine/async.hpp>
13#include <userver/engine/mutex.hpp>
14#include <userver/logging/log.hpp>
15#include <userver/rcu/fwd.hpp>
16#include <userver/utils/assert.hpp>
17#include <userver/utils/impl/wait_token_storage.hpp>
19USERVER_NAMESPACE_BEGIN
38template <
typename T,
typename RcuTraits>
39struct HazardPointerRecord final {
45 static inline T*
const kUsed =
reinterpret_cast<T*>(1);
47 explicit HazardPointerRecord(
const Variable<T, RcuTraits>& owner)
50 std::atomic<T*> ptr = kUsed;
51 const Variable<T, RcuTraits>& owner;
52 std::atomic<HazardPointerRecord*> next{
nullptr};
55 void Release() { ptr =
nullptr; }
58template <
typename T,
typename RcuTraits>
60 impl::HazardPointerRecord<T, RcuTraits>* hp{
nullptr};
61 const Variable<T, RcuTraits>* variable{
nullptr};
63 uint64_t variable_epoch{0};
66template <
typename T,
typename RcuTraits>
67inline compiler::ThreadLocal local_cached_data =
68 [] {
return CachedData<T, RcuTraits>{}; };
70uint64_t GetNextEpoch()
noexcept;
78struct DefaultRcuTraits {
79 using MutexType = engine::Mutex;
87template <
typename T,
typename RcuTraits>
88class [[nodiscard]] ReadablePtr final {
90 explicit ReadablePtr(
const Variable<T, RcuTraits>& ptr)
91 : hp_record_(&ptr.MakeHazardPointer()) {
98 t_ptr_ = ptr.GetCurrent();
100 hp_record_->ptr.store(t_ptr_);
101 }
while (t_ptr_ != ptr.GetCurrent());
104 ReadablePtr(ReadablePtr<T, RcuTraits>&& other)
noexcept
105 : t_ptr_(other.t_ptr_), hp_record_(other.hp_record_) {
106 other.t_ptr_ =
nullptr;
109 ReadablePtr& operator=(ReadablePtr<T, RcuTraits>&& other)
noexcept {
125 UASSERT_MSG(
this != &other,
"Self assignment to RCU variable");
126 if (
this == &other) {
133 hp_record_->Release();
139 hp_record_ = other.hp_record_;
140 t_ptr_ = other.t_ptr_;
144 other.t_ptr_ =
nullptr;
152 ReadablePtr(
const ReadablePtr<T, RcuTraits>& other)
153 : ReadablePtr(other.hp_record_->owner) {}
155 ReadablePtr& operator=(
const ReadablePtr<T, RcuTraits>& other) {
156 if (
this != &other) *
this = ReadablePtr<T, RcuTraits>{other};
162 UASSERT(hp_record_ !=
nullptr);
163 hp_record_->Release();
166 const T* Get()
const& {
171 const T* Get() && {
return GetOnRvalue(); }
173 const T* operator->()
const& {
return Get(); }
174 const T* operator->() && {
return GetOnRvalue(); }
176 const T& operator*()
const& {
return *Get(); }
177 const T& operator*() && {
return *GetOnRvalue(); }
180 const T* GetOnRvalue() {
181 static_assert(!
sizeof(T),
182 "Don't use temporary ReadablePtr, store it to a variable");
194 impl::HazardPointerRecord<T, RcuTraits>* hp_record_;
207template <
typename T,
typename RcuTraits>
208class [[nodiscard]] WritablePtr final {
215 LOG_TRACE() <<
"Start writing ptr=" << ptr_.get();
219 template <
typename... Args>
221 Args&&... initial_value_args)
225 LOG_TRACE() <<
"Start writing ptr=" << ptr_.get()
226 <<
" with custom initial value";
229 WritablePtr(WritablePtr<T, RcuTraits>&& other)
noexcept
231 lock_(std::move(other.lock_)),
232 ptr_(std::move(other.ptr_)) {
233 LOG_TRACE() <<
"Continue writing ptr=" << ptr_.get();
238 LOG_TRACE() <<
"Stop writing ptr=" << ptr_.get();
247 LOG_TRACE() <<
"Committing ptr=" << ptr_.get();
249 std::unique_ptr<T> old_ptr(var_.current_.exchange(ptr_.release()));
250 var_.Retire(std::move(old_ptr), lock_);
259 T* Get() && {
return GetOnRvalue(); }
261 T* operator->() & {
return Get(); }
262 T* operator->() && {
return GetOnRvalue(); }
264 T& operator*() & {
return *Get(); }
265 T& operator*() && {
return *GetOnRvalue(); }
269 static_assert(!
sizeof(T),
270 "Don't use temporary WritablePtr, store it to a variable");
274 Variable<T, RcuTraits>& var_;
275 std::unique_lock<
typename RcuTraits::MutexType> lock_;
276 std::unique_ptr<T> ptr_;
307template <
typename T,
typename RcuTraits>
308class Variable final {
310 using MutexType =
typename RcuTraits::MutexType;
316 template <
typename... Args>
318 : destruction_type_(std::is_trivially_destructible_v<T> ||
319 std::is_same_v<T, std::string> ||
320 !std::is_same_v<MutexType, engine::Mutex>
323 epoch_(impl::GetNextEpoch()),
324 current_(
new T(std::forward<Args>(initial_value_args)...)) {}
331 template <
typename... Args>
333 : destruction_type_(destruction_type),
334 epoch_(impl::GetNextEpoch()),
335 current_(
new T(std::forward<Args>(initial_value_args)...)) {}
337 Variable(
const Variable&) =
delete;
338 Variable(Variable&&) =
delete;
339 Variable& operator=(
const Variable&) =
delete;
340 Variable& operator=(Variable&&) =
delete;
343 delete current_.load();
345 auto* hp = hp_record_head_.load();
347 auto* next = hp->next.load();
349 "RCU variable is destroyed while being used");
356 wait_token_storage_.WaitForAllTokens();
361 ReadablePtr<T, RcuTraits>
Read()
const {
362 return ReadablePtr<T, RcuTraits>(*
this);
375 return WritablePtr<T, RcuTraits>(*
this);
380 template <
typename... Args>
382 return WritablePtr<T, RcuTraits>(*
this, std::in_place,
383 std::forward<Args>(args)...);
388 WritablePtr<T, RcuTraits>(*
this, std::in_place, std::move(new_value))
393 template <
typename... Args>
395 WritablePtr<T, RcuTraits>(*
this, std::in_place, std::forward<Args>(args)...)
400 std::unique_lock lock(mutex_, std::try_to_lock);
401 if (!lock.owns_lock()) {
402 LOG_TRACE() <<
"Not cleaning up, someone else is holding the mutex lock";
407 ScanRetiredList(CollectHazardPtrs(lock));
411 T* GetCurrent()
const {
return current_.load(); }
413 impl::HazardPointerRecord<T, RcuTraits>* MakeHazardPointerCached(
414 impl::CachedData<T, RcuTraits>& cache)
const {
417 if (hp && cache.variable ==
this && cache.variable_epoch == epoch_) {
418 if (hp->ptr.load() ==
nullptr &&
419 hp->ptr.compare_exchange_strong(
420 ptr, impl::HazardPointerRecord<T, RcuTraits>::kUsed)) {
428 impl::HazardPointerRecord<T, RcuTraits>* MakeHazardPointerFast()
const {
431 auto* hp = hp_record_head_.load();
434 if (hp->ptr.load() ==
nullptr &&
435 hp->ptr.compare_exchange_strong(
436 t_ptr, impl::HazardPointerRecord<T, RcuTraits>::kUsed)) {
445 impl::HazardPointerRecord<T, RcuTraits>& MakeHazardPointer()
const {
446 auto cache = impl::local_cached_data<T, RcuTraits>.Use();
447 auto* hp = MakeHazardPointerCached(*cache);
449 hp = MakeHazardPointerFast();
451 if (!hp) hp = MakeHazardPointerSlow();
454 cache->variable =
this;
455 cache->variable_epoch = epoch_;
461 impl::HazardPointerRecord<T, RcuTraits>* MakeHazardPointerSlow()
const {
463 auto hp =
new impl::HazardPointerRecord<T, RcuTraits>(*
this);
464 impl::HazardPointerRecord<T, RcuTraits>* old_hp =
nullptr;
466 old_hp = hp_record_head_.load();
468 }
while (!hp_record_head_.compare_exchange_strong(old_hp, hp));
472 void Retire(std::unique_ptr<T> old_ptr, std::unique_lock<MutexType>& lock) {
473 LOG_TRACE() <<
"Retiring ptr=" << old_ptr.get();
474 auto hazard_ptrs = CollectHazardPtrs(lock);
476 if (hazard_ptrs.count(old_ptr.get()) > 0) {
478 LOG_TRACE() <<
"Not retire, still used ptr=" << old_ptr.get();
479 retire_list_head_.push_back(std::move(old_ptr));
481 LOG_TRACE() <<
"Retire, not used ptr=" << old_ptr.get();
482 DeleteAsync(std::move(old_ptr));
485 ScanRetiredList(hazard_ptrs);
490 void ScanRetiredList(
const std::unordered_set<T*>& hazard_ptrs) {
491 for (
auto rit = retire_list_head_.begin();
492 rit != retire_list_head_.end();) {
493 auto current = rit++;
494 if (hazard_ptrs.count(current->get()) == 0) {
496 DeleteAsync(std::move(*current));
497 retire_list_head_.erase(current);
504 std::unordered_set<T*> CollectHazardPtrs(std::unique_lock<MutexType>&) {
505 std::unordered_set<T*> hazard_ptrs;
508 for (
auto* hp = hp_record_head_.load(); hp; hp = hp->next) {
509 hazard_ptrs.insert(hp->ptr.load());
514 void DeleteAsync(std::unique_ptr<T> ptr) {
515 switch (destruction_type_) {
516 case DestructionType::kSync:
520 engine::CriticalAsyncNoSpan([ptr = std::move(ptr),
521 token = wait_token_storage_
522 .GetToken()]()
mutable {
531 const uint64_t epoch_;
533 mutable std::atomic<impl::HazardPointerRecord<T, RcuTraits>*> hp_record_head_{
538 std::atomic<T*> current_;
539 std::list<std::unique_ptr<T>> retire_list_head_;
540 utils::impl::WaitTokenStorage wait_token_storage_;
542 friend class ReadablePtr<T, RcuTraits>;
543 friend class WritablePtr<T, RcuTraits>;