9#include <unordered_set>
11#include <userver/engine/async.hpp>
12#include <userver/engine/mutex.hpp>
13#include <userver/logging/log.hpp>
14#include <userver/rcu/fwd.hpp>
15#include <userver/utils/assert.hpp>
16#include <userver/utils/impl/wait_token_storage.hpp>
18USERVER_NAMESPACE_BEGIN
37template <
typename T,
typename RcuTraits>
38struct HazardPointerRecord final {
44 static inline T*
const kUsed =
reinterpret_cast<T*>(1);
46 explicit HazardPointerRecord(
const Variable<T, RcuTraits>& owner)
49 std::atomic<T*> ptr = kUsed;
50 const Variable<T, RcuTraits>& owner;
51 std::atomic<HazardPointerRecord*> next{
nullptr};
54 void Release() { ptr =
nullptr; }
57template <
typename T,
typename RcuTraits>
59 impl::HazardPointerRecord<T, RcuTraits>* hp{
nullptr};
60 const Variable<T, RcuTraits>* variable{
nullptr};
62 uint64_t variable_epoch{0};
65template <
typename T,
typename RcuTraits>
66thread_local CachedData<T, RcuTraits> cache;
68uint64_t GetNextEpoch()
noexcept;
76struct DefaultRcuTraits {
77 using MutexType = engine::Mutex;
85template <
typename T,
typename RcuTraits>
86class [[nodiscard]] ReadablePtr final {
88 explicit ReadablePtr(
const Variable<T, RcuTraits>& ptr)
89 : hp_record_(&ptr.MakeHazardPointer()) {
96 t_ptr_ = ptr.GetCurrent();
98 hp_record_->ptr.store(t_ptr_);
99 }
while (t_ptr_ != ptr.GetCurrent());
102 ReadablePtr(ReadablePtr<T, RcuTraits>&& other)
noexcept
103 : t_ptr_(other.t_ptr_), hp_record_(other.hp_record_) {
104 other.t_ptr_ =
nullptr;
107 ReadablePtr& operator=(ReadablePtr<T, RcuTraits>&& other)
noexcept {
123 UASSERT_MSG(
this != &other,
"Self assignment to RCU variable");
124 if (
this == &other) {
131 hp_record_->Release();
137 hp_record_ = other.hp_record_;
138 t_ptr_ = other.t_ptr_;
142 other.t_ptr_ =
nullptr;
150 ReadablePtr(
const ReadablePtr<T, RcuTraits>& other)
151 : ReadablePtr(other.hp_record_->owner) {}
153 ReadablePtr& operator=(
const ReadablePtr<T, RcuTraits>& other) {
154 if (
this != &other) *
this = ReadablePtr<T, RcuTraits>{other};
160 UASSERT(hp_record_ !=
nullptr);
161 hp_record_->Release();
164 const T* Get()
const& {
169 const T* Get() && {
return GetOnRvalue(); }
171 const T* operator->()
const& {
return Get(); }
172 const T* operator->() && {
return GetOnRvalue(); }
174 const T& operator*()
const& {
return *Get(); }
175 const T& operator*() && {
return *GetOnRvalue(); }
178 const T* GetOnRvalue() {
179 static_assert(!
sizeof(T),
180 "Don't use temporary ReadablePtr, store it to a variable");
192 impl::HazardPointerRecord<T, RcuTraits>* hp_record_;
205template <
typename T,
typename RcuTraits>
206class [[nodiscard]] WritablePtr final {
213 LOG_TRACE() <<
"Start writing ptr=" << ptr_.get();
217 template <
typename... Args>
219 Args&&... initial_value_args)
223 LOG_TRACE() <<
"Start writing ptr=" << ptr_.get()
224 <<
" with custom initial value";
227 WritablePtr(WritablePtr<T, RcuTraits>&& other)
noexcept
229 lock_(std::move(other.lock_)),
230 ptr_(std::move(other.ptr_)) {
231 LOG_TRACE() <<
"Continue writing ptr=" << ptr_.get();
236 LOG_TRACE() <<
"Stop writing ptr=" << ptr_.get();
245 LOG_TRACE() <<
"Committing ptr=" << ptr_.get();
247 std::unique_ptr<T> old_ptr(var_.current_.exchange(ptr_.release()));
248 var_.Retire(std::move(old_ptr), lock_);
257 T* Get() && {
return GetOnRvalue(); }
259 T* operator->() & {
return Get(); }
260 T* operator->() && {
return GetOnRvalue(); }
262 T& operator*() & {
return *Get(); }
263 T& operator*() && {
return *GetOnRvalue(); }
267 static_assert(!
sizeof(T),
268 "Don't use temporary WritablePtr, store it to a variable");
272 Variable<T, RcuTraits>& var_;
273 std::unique_lock<
typename RcuTraits::MutexType> lock_;
274 std::unique_ptr<T> ptr_;
305template <
typename T,
typename RcuTraits>
306class Variable final {
308 using MutexType =
typename RcuTraits::MutexType;
314 template <
typename... Args>
316 : destruction_type_(std::is_trivially_destructible_v<T> ||
317 std::is_same_v<T, std::string> ||
318 !std::is_same_v<MutexType, engine::Mutex>
321 epoch_(impl::GetNextEpoch()),
322 current_(
new T(std::forward<Args>(initial_value_args)...)) {}
329 template <
typename... Args>
331 : destruction_type_(destruction_type),
332 epoch_(impl::GetNextEpoch()),
333 current_(
new T(std::forward<Args>(initial_value_args)...)) {}
335 Variable(
const Variable&) =
delete;
336 Variable(Variable&&) =
delete;
337 Variable& operator=(
const Variable&) =
delete;
338 Variable& operator=(Variable&&) =
delete;
341 delete current_.load();
343 auto* hp = hp_record_head_.load();
345 auto* next = hp->next.load();
347 "RCU variable is destroyed while being used");
354 wait_token_storage_.WaitForAllTokens();
359 ReadablePtr<T, RcuTraits>
Read()
const {
360 return ReadablePtr<T, RcuTraits>(*
this);
373 return WritablePtr<T, RcuTraits>(*
this);
378 template <
typename... Args>
380 return WritablePtr<T, RcuTraits>(*
this, std::in_place,
381 std::forward<Args>(args)...);
386 WritablePtr<T, RcuTraits>(*
this, std::in_place, std::move(new_value))
391 template <
typename... Args>
393 WritablePtr<T, RcuTraits>(*
this, std::in_place, std::forward<Args>(args)...)
398 std::unique_lock lock(mutex_, std::try_to_lock);
399 if (!lock.owns_lock()) {
400 LOG_TRACE() <<
"Not cleaning up, someone else is holding the mutex lock";
405 ScanRetiredList(CollectHazardPtrs(lock));
409 T* GetCurrent()
const {
return current_.load(); }
411 impl::HazardPointerRecord<T, RcuTraits>* MakeHazardPointerCached()
const {
412 auto& cache = impl::cache<T, RcuTraits>;
415 if (hp && cache.variable ==
this && cache.variable_epoch == epoch_) {
416 if (hp->ptr.load() ==
nullptr &&
417 hp->ptr.compare_exchange_strong(
418 ptr, impl::HazardPointerRecord<T, RcuTraits>::kUsed)) {
426 impl::HazardPointerRecord<T, RcuTraits>* MakeHazardPointerFast()
const {
429 auto* hp = hp_record_head_.load();
432 if (hp->ptr.load() ==
nullptr &&
433 hp->ptr.compare_exchange_strong(
434 t_ptr, impl::HazardPointerRecord<T, RcuTraits>::kUsed)) {
443 impl::HazardPointerRecord<T, RcuTraits>& MakeHazardPointer()
const {
444 auto* hp = MakeHazardPointerCached();
446 hp = MakeHazardPointerFast();
448 if (!hp) hp = MakeHazardPointerSlow();
450 auto& cache = impl::cache<T, RcuTraits>;
452 cache.variable =
this;
453 cache.variable_epoch = epoch_;
459 impl::HazardPointerRecord<T, RcuTraits>* MakeHazardPointerSlow()
const {
461 auto hp =
new impl::HazardPointerRecord<T, RcuTraits>(*
this);
462 impl::HazardPointerRecord<T, RcuTraits>* old_hp =
nullptr;
464 old_hp = hp_record_head_.load();
466 }
while (!hp_record_head_.compare_exchange_strong(old_hp, hp));
470 void Retire(std::unique_ptr<T> old_ptr, std::unique_lock<MutexType>& lock) {
471 LOG_TRACE() <<
"Retiring ptr=" << old_ptr.get();
472 auto hazard_ptrs = CollectHazardPtrs(lock);
474 if (hazard_ptrs.count(old_ptr.get()) > 0) {
476 LOG_TRACE() <<
"Not retire, still used ptr=" << old_ptr.get();
477 retire_list_head_.push_back(std::move(old_ptr));
479 LOG_TRACE() <<
"Retire, not used ptr=" << old_ptr.get();
480 DeleteAsync(std::move(old_ptr));
483 ScanRetiredList(hazard_ptrs);
488 void ScanRetiredList(
const std::unordered_set<T*>& hazard_ptrs) {
489 for (
auto rit = retire_list_head_.begin();
490 rit != retire_list_head_.end();) {
491 auto current = rit++;
492 if (hazard_ptrs.count(current->get()) == 0) {
494 DeleteAsync(std::move(*current));
495 retire_list_head_.erase(current);
502 std::unordered_set<T*> CollectHazardPtrs(std::unique_lock<MutexType>&) {
503 std::unordered_set<T*> hazard_ptrs;
506 for (
auto* hp = hp_record_head_.load(); hp; hp = hp->next) {
507 hazard_ptrs.insert(hp->ptr.load());
512 void DeleteAsync(std::unique_ptr<T> ptr) {
513 switch (destruction_type_) {
514 case DestructionType::kSync:
518 engine::CriticalAsyncNoSpan([ptr = std::move(ptr),
519 token = wait_token_storage_
520 .GetToken()]()
mutable {
529 const uint64_t epoch_;
531 mutable std::atomic<impl::HazardPointerRecord<T, RcuTraits>*> hp_record_head_{
536 std::atomic<T*> current_;
537 std::list<std::unique_ptr<T>> retire_list_head_;
538 utils::impl::WaitTokenStorage wait_token_storage_;
540 friend class ReadablePtr<T, RcuTraits>;
541 friend class WritablePtr<T, RcuTraits>;