6#include <userver/cache/base_postgres_cache_fwd.hpp>
12#include <unordered_map>
14#include <fmt/format.h>
16#include <userver/cache/cache_statistics.hpp>
17#include <userver/cache/caching_component_base.hpp>
18#include <userver/components/component_config.hpp>
19#include <userver/components/component_context.hpp>
21#include <userver/storages/postgres/cluster.hpp>
22#include <userver/storages/postgres/component.hpp>
23#include <userver/storages/postgres/io/chrono.hpp>
25#include <userver/compiler/demangle.hpp>
26#include <userver/engine/sleep.hpp>
27#include <userver/logging/log.hpp>
28#include <userver/tracing/span.hpp>
29#include <userver/utils/assert.hpp>
30#include <userver/utils/cpu_relax.hpp>
31#include <userver/utils/meta.hpp>
32#include <userver/utils/void_t.hpp>
33#include <userver/yaml_config/merge_schemas.hpp>
35USERVER_NAMESPACE_BEGIN
126namespace pg_cache::detail {
129using ValueType =
typename T::ValueType;
131inline constexpr bool kHasValueType = meta::IsDetected<ValueType, T>;
134using RawValueTypeImpl =
typename T::RawValueType;
136inline constexpr bool kHasRawValueType = meta::IsDetected<RawValueTypeImpl, T>;
138using RawValueType = meta::DetectedOr<ValueType<T>, RawValueTypeImpl, T>;
140template <
typename PostgreCachePolicy>
141auto ExtractValue(RawValueType<PostgreCachePolicy>&& raw) {
142 if constexpr (kHasRawValueType<PostgreCachePolicy>) {
143 return Convert(std::move(raw), formats::
parse::To<ValueType<PostgreCachePolicy>>());
145 return std::move(raw);
151using HasNameImpl = std::enable_if_t<!std::string_view{T::kName}.empty()>;
153inline constexpr bool kHasName = meta::IsDetected<HasNameImpl, T>;
157using HasQueryImpl =
decltype(T::kQuery);
159inline constexpr bool kHasQuery = meta::IsDetected<HasQueryImpl, T>;
163using HasGetQueryImpl =
decltype(T::GetQuery());
165inline constexpr bool kHasGetQuery = meta::IsDetected<HasGetQueryImpl, T>;
169using HasWhere =
decltype(T::kWhere);
171inline constexpr bool kHasWhere = meta::IsDetected<HasWhere, T>;
175using HasOrderBy =
decltype(T::kOrderBy);
177inline constexpr bool kHasOrderBy = meta::IsDetected<HasOrderBy, T>;
181using HasTag =
decltype(T::kTag);
183inline constexpr bool kHasTag = meta::IsDetected<HasTag, T>;
185template <
typename PostgreCachePolicy>
187 if constexpr (kHasTag<PostgreCachePolicy>) {
188 return PostgreCachePolicy::kTag;
196using HasUpdatedField =
decltype(T::kUpdatedField);
198inline constexpr bool kHasUpdatedField = meta::IsDetected<HasUpdatedField, T>;
201using WantIncrementalUpdates = std::enable_if_t<!std::string_view{T::kUpdatedField}.empty()>;
203inline constexpr bool kWantIncrementalUpdates = meta::IsDetected<WantIncrementalUpdates, T>;
207using KeyMemberTypeImpl = std::decay_t<std::invoke_result_t<
decltype(T::kKeyMember), ValueType<T>>>;
209inline constexpr bool kHasKeyMember = meta::IsDetected<KeyMemberTypeImpl, T>;
211using KeyMemberType = meta::DetectedType<KeyMemberTypeImpl, T>;
215using SizeMethodInvokeResultImpl =
decltype(std::declval<T>().size());
217inline constexpr bool kHasSizeMethod =
218 meta::IsDetected<SizeMethodInvokeResultImpl, T> &&
219 std::is_convertible_v<SizeMethodInvokeResultImpl<T>, std::size_t>;
223using InsertOrAssignMethodInvokeResultImpl =
224 decltype(std::declval<
typename T::CacheContainer>()
225 .insert_or_assign(std::declval<KeyMemberTypeImpl<T>>(), std::declval<ValueType<T>>()));
227inline constexpr bool kHasInsertOrAssignMethod = meta::IsDetected<InsertOrAssignMethodInvokeResultImpl, T>;
231using CacheInsertOrAssignFunctionInvokeResultImpl =
decltype(CacheInsertOrAssign(
232 std::declval<
typename T::CacheContainer&>(),
233 std::declval<ValueType<T>>(),
234 std::declval<KeyMemberTypeImpl<T>>()
238 kHasCacheInsertOrAssignFunction = meta::IsDetected<CacheInsertOrAssignFunctionInvokeResultImpl, T>;
241template <
typename T,
typename = USERVER_NAMESPACE::
utils::void_t<>>
242struct DataCacheContainer {
244 meta::kIsStdHashable<KeyMemberType<T>>,
245 "With default CacheContainer, key type must be std::hash-able"
248 using type = std::unordered_map<KeyMemberType<T>, ValueType<T>>;
252struct DataCacheContainer<T, USERVER_NAMESPACE::
utils::void_t<
typename T::CacheContainer>> {
253 static_assert(kHasSizeMethod<
typename T::CacheContainer>,
"Custom CacheContainer must provide `size` method");
255 kHasInsertOrAssignMethod<T> || kHasCacheInsertOrAssignFunction<T>,
256 "Custom CacheContainer must provide `insert_or_assign` method similar to std::unordered_map's "
257 "one or CacheInsertOrAssign function"
260 using type =
typename T::CacheContainer;
264using DataCacheContainerType =
typename DataCacheContainer<T>::type;
269inline constexpr bool kIsContainerCopiedByElement =
270 meta::kIsInstantiationOf<std::unordered_map, T> || meta::kIsInstantiationOf<std::map, T>;
273std::unique_ptr<T> CopyContainer(
275 [[maybe_unused]] std::size_t cpu_relax_iterations,
278 if constexpr (kIsContainerCopiedByElement<T>) {
279 auto copy = std::make_unique<T>();
280 if constexpr (meta::kIsReservable<T>) {
281 copy->reserve(container.size());
285 for (
const auto& kv : container) {
291 return std::make_unique<T>(container);
295template <
typename Container,
typename Value,
typename KeyMember,
typename... Args>
296void CacheInsertOrAssign(Container& container, Value&& value,
const KeyMember& key_member, Args&&... ) {
298 static_assert(
sizeof...(Args) == 0);
300 auto key = std::invoke(key_member, value);
301 container.insert_or_assign(std::move(key), std::forward<Value>(value));
305using HasOnWritesDoneImpl =
decltype(std::declval<T&>().OnWritesDone());
308void OnWritesDone(T& container) {
309 if constexpr (meta::IsDetected<HasOnWritesDoneImpl, T>) {
310 container.OnWritesDone();
315using HasCustomUpdatedImpl =
decltype(T::GetLastKnownUpdated(std::declval<DataCacheContainerType<T>>()));
318inline constexpr bool kHasCustomUpdated = meta::IsDetected<HasCustomUpdatedImpl, T>;
321using UpdatedFieldTypeImpl =
typename T::UpdatedFieldType;
323inline constexpr bool kHasUpdatedFieldType = meta::IsDetected<UpdatedFieldTypeImpl, T>;
325using UpdatedFieldType = meta::DetectedOr<storages::
postgres::TimePointTz, UpdatedFieldTypeImpl, T>;
328constexpr bool CheckUpdatedFieldType() {
329 if constexpr (kHasUpdatedFieldType<T>) {
330#if USERVER_POSTGRES_ENABLE_LEGACY_TIMESTAMP
332 std::is_same_v<
typename T::UpdatedFieldType, storages::postgres::TimePointTz> ||
333 std::is_same_v<
typename T::UpdatedFieldType, storages::postgres::TimePointWithoutTz> ||
334 std::is_same_v<
typename T::UpdatedFieldType, storages::postgres::TimePoint> || kHasCustomUpdated<T>,
335 "Invalid UpdatedFieldType, must be either TimePointTz or "
337 "or (legacy) system_clock::time_point"
341 std::is_same_v<
typename T::UpdatedFieldType, storages::
postgres::TimePointTz> ||
342 std::is_same_v<
typename T::UpdatedFieldType, storages::
postgres::TimePointWithoutTz> ||
343 kHasCustomUpdated<T>,
344 "Invalid UpdatedFieldType, must be either TimePointTz or "
350 !kWantIncrementalUpdates<T>,
351 "UpdatedFieldType must be explicitly specified when using "
352 "incremental updates"
360using HasClusterHostTypeImpl =
decltype(T::kClusterHostType);
363constexpr storages::
postgres::ClusterHostTypeFlags ClusterHostType() {
364 if constexpr (meta::IsDetected<HasClusterHostTypeImpl, T>) {
365 return T::kClusterHostType;
373using HasMayReturnNull =
decltype(T::kMayReturnNull);
376constexpr bool MayReturnNull() {
377 if constexpr (meta::IsDetected<HasMayReturnNull, T>) {
378 return T::kMayReturnNull;
384template <
typename PostgreCachePolicy>
385struct PolicyChecker {
387 static_assert(kHasName<PostgreCachePolicy>,
"The PosgreSQL cache policy must contain a static member `kName`");
388 static_assert(kHasValueType<PostgreCachePolicy>,
"The PosgreSQL cache policy must define a type alias `ValueType`");
390 kHasKeyMember<PostgreCachePolicy>,
391 "The PostgreSQL cache policy must contain a static member `kKeyMember` "
392 "with a pointer to a data or a function member with the object's key"
395 kHasQuery<PostgreCachePolicy> || kHasGetQuery<PostgreCachePolicy>,
396 "The PosgreSQL cache policy must contain a static data member "
397 "`kQuery` with a select statement or a static member function "
398 "`GetQuery` returning the query"
401 !(kHasQuery<PostgreCachePolicy> && kHasGetQuery<PostgreCachePolicy>),
402 "The PosgreSQL cache policy must define `kQuery` or "
403 "`GetQuery`, not both"
406 kHasUpdatedField<PostgreCachePolicy>,
407 "The PosgreSQL cache policy must contain a static member "
408 "`kUpdatedField`. If you don't want to use incremental updates, "
409 "please set its value to `nullptr`"
411 static_assert(CheckUpdatedFieldType<PostgreCachePolicy>());
414 ClusterHostType<PostgreCachePolicy>() & storages::
postgres::kClusterHostRolesMask,
415 "Cluster host role must be specified for caching component, "
416 "please be more specific"
419 static storages::
postgres::Query GetQuery() {
420 if constexpr (kHasGetQuery<PostgreCachePolicy>) {
421 return PostgreCachePolicy::GetQuery();
423 return PostgreCachePolicy::kQuery;
430inline constexpr std::chrono::minutes kDefaultFullUpdateTimeout{1};
431inline constexpr std::chrono::seconds kDefaultIncrementalUpdateTimeout{1};
432inline constexpr std::chrono::milliseconds kStatementTimeoutOff{0};
433inline constexpr std::chrono::milliseconds kCpuRelaxThreshold{10};
434inline constexpr std::chrono::milliseconds kCpuRelaxInterval{2};
436inline constexpr std::string_view kCopyStage =
"copy_data";
437inline constexpr std::string_view kFetchStage =
"fetch";
438inline constexpr std::string_view kParseStage =
"parse";
440inline constexpr std::size_t kDefaultChunkSize = 1000;
441inline constexpr std::chrono::milliseconds kDefaultSleepBetweenChunks{0};
449template <
typename PostgreCachePolicy>
450class PostgreCache
final :
public pg_cache::detail::PolicyChecker<PostgreCachePolicy>::BaseType {
453 using PolicyType = PostgreCachePolicy;
454 using ValueType = pg_cache::detail::ValueType<PolicyType>;
455 using RawValueType = pg_cache::detail::RawValueType<PolicyType>;
456 using DataType = pg_cache::detail::DataCacheContainerType<PolicyType>;
457 using PolicyCheckerType = pg_cache::detail::PolicyChecker<PostgreCachePolicy>;
458 using UpdatedFieldType = pg_cache::detail::UpdatedFieldType<PostgreCachePolicy>;
459 using BaseType =
typename PolicyCheckerType::BaseType;
462 constexpr static bool kIncrementalUpdates = pg_cache::detail::kWantIncrementalUpdates<PolicyType>;
463 constexpr static auto kClusterHostTypeFlags = pg_cache::detail::ClusterHostType<PolicyType>();
464 constexpr static auto kName = PolicyType::kName;
466 PostgreCache(
const ComponentConfig&,
const ComponentContext&);
468 static yaml_config::Schema GetStaticConfigSchema();
471 using CachedData = std::unique_ptr<DataType>;
473 UpdatedFieldType GetLastUpdated(std::chrono::system_clock::time_point last_update,
const DataType& cache)
const;
477 const std::chrono::system_clock::time_point& last_update,
478 const std::chrono::system_clock::time_point& now,
479 cache::UpdateStatisticsScope& stats_scope
482 bool MayReturnNull()
const override;
487 CachedData& data_cache,
488 cache::UpdateStatisticsScope& stats_scope,
492 static storages::
postgres::Query GetAllQuery();
493 static storages::
postgres::Query GetDeltaQuery();
494 static std::string GetWhereClause();
495 static std::string GetDeltaWhereClause();
496 static std::string GetOrderByClause();
498 std::chrono::milliseconds ParseCorrection(
const ComponentConfig& config);
500 std::vector<storages::
postgres::ClusterPtr> clusters_;
502 const std::chrono::system_clock::duration correction_;
503 const std::chrono::milliseconds full_update_timeout_;
504 const std::chrono::milliseconds incremental_update_timeout_;
505 const std::size_t chunk_size_;
506 const std::chrono::milliseconds sleep_between_chunks_;
507 std::size_t cpu_relax_iterations_parse_{0};
508 std::size_t cpu_relax_iterations_copy_{0};
511template <
typename PostgreCachePolicy>
512inline constexpr bool kHasValidate<PostgreCache<PostgreCachePolicy>> =
true;
514template <
typename PostgreCachePolicy>
515PostgreCache<PostgreCachePolicy>::PostgreCache(
const ComponentConfig& config,
const ComponentContext& context)
516 : BaseType{config, context},
517 correction_{ParseCorrection(config)},
518 full_update_timeout_{
519 config
["full-update-op-timeout"].As<std
::chrono
::milliseconds
>(pg_cache::detail::kDefaultFullUpdateTimeout
)
521 incremental_update_timeout_{
522 config
["incremental-update-op-timeout"]
523 .As<std
::chrono
::milliseconds
>(pg_cache::detail::kDefaultIncrementalUpdateTimeout
)
525 chunk_size_{config
["chunk-size"].As<size_t
>(pg_cache::detail::kDefaultChunkSize
)},
526 sleep_between_chunks_{
527 config
["sleep-between-chunks"].As<std
::chrono
::milliseconds
>(pg_cache::detail::kDefaultSleepBetweenChunks
)
532 "Either set 'chunk-size' to 0, or enable PostgreSQL portals by building "
533 "the framework with CMake option USERVER_FEATURE_PATCH_LIBPQ set to ON."
537 throw std::logic_error(
538 "Incremental update support is requested in config but no update field "
539 "name is specified in traits of '" +
543 if (correction_.count() < 0) {
544 throw std::logic_error(
545 "Refusing to set forward (negative) update correction requested in "
551 const auto pg_alias = config
["pgcomponent"].As<std
::string
>("");
552 if (pg_alias.empty()) {
557 clusters_.resize(shard_count);
558 for (size_t i = 0; i < shard_count; ++i) {
567template <
typename PostgreCachePolicy>
568std::string PostgreCache<PostgreCachePolicy>::GetWhereClause() {
569 if constexpr (pg_cache::detail::kHasWhere<PostgreCachePolicy>) {
570 return fmt::format(FMT_COMPILE(
"where {}"), PostgreCachePolicy::kWhere);
576template <
typename PostgreCachePolicy>
577std::string PostgreCache<PostgreCachePolicy>::GetDeltaWhereClause() {
578 if constexpr (pg_cache::detail::kHasWhere<PostgreCachePolicy>) {
580 FMT_COMPILE(
"where ({}) and {} >= $1"),
581 PostgreCachePolicy::kWhere,
582 PostgreCachePolicy::kUpdatedField
585 return fmt::format(FMT_COMPILE(
"where {} >= $1"), PostgreCachePolicy::kUpdatedField);
589template <
typename PostgreCachePolicy>
590std::string PostgreCache<PostgreCachePolicy>::GetOrderByClause() {
591 if constexpr (pg_cache::detail::kHasOrderBy<PostgreCachePolicy>) {
592 return fmt::format(FMT_COMPILE(
"order by {}"), PostgreCachePolicy::kOrderBy);
598template <
typename PostgreCachePolicy>
599storages::
postgres::Query PostgreCache<PostgreCachePolicy>::GetAllQuery() {
600 const storages::
postgres::Query query = PolicyCheckerType::GetQuery();
601 return fmt::format(
"{} {} {}", query
.GetStatementView(), GetWhereClause(), GetOrderByClause());
604template <
typename PostgreCachePolicy>
605storages::
postgres::Query PostgreCache<PostgreCachePolicy>::GetDeltaQuery() {
606 if constexpr (kIncrementalUpdates) {
607 const storages::
postgres::Query query = PolicyCheckerType::GetQuery();
609 fmt::format(
"{} {} {}", query
.GetStatementView(), GetDeltaWhereClause(), GetOrderByClause()),
613 return GetAllQuery();
617template <
typename PostgreCachePolicy>
618std::chrono::milliseconds PostgreCache<PostgreCachePolicy>::ParseCorrection(
const ComponentConfig& config) {
619 static constexpr std::string_view kUpdateCorrection =
"update-correction";
620 if (pg_cache::detail::kHasCustomUpdated<PostgreCachePolicy> ||
623 return config
[kUpdateCorrection
].As<std
::chrono
::milliseconds
>(0
);
625 return config
[kUpdateCorrection
].As<std
::chrono
::milliseconds
>();
629template <
typename PostgreCachePolicy>
630typename PostgreCache<PostgreCachePolicy>::UpdatedFieldType PostgreCache<PostgreCachePolicy>::GetLastUpdated(
631 [[maybe_unused]] std::chrono::system_clock::time_point last_update,
632 const DataType& cache
634 if constexpr (pg_cache::detail::kHasCustomUpdated<PostgreCachePolicy>) {
635 return PostgreCachePolicy::GetLastKnownUpdated(cache);
637 return UpdatedFieldType{last_update - correction_};
641template <
typename PostgreCachePolicy>
642void PostgreCache<PostgreCachePolicy>::Update(
644 const std::chrono::system_clock::time_point& last_update,
645 const std::chrono::system_clock::time_point& ,
646 cache::UpdateStatisticsScope& stats_scope
649 if constexpr (!kIncrementalUpdates) {
653 const std::chrono::milliseconds
658 auto data_cache = GetDataSnapshot(type, scope);
659 [[maybe_unused]]
const auto old_size = data_cache->size();
661 scope
.Reset(std::string{pg_cache::detail::kFetchStage}
);
665 for (
auto& cluster : clusters_) {
666 if (chunk_size_ > 0) {
667 auto trx = cluster->Begin(
668 kClusterHostTypeFlags,
670 pg::
CommandControl{timeout, pg_cache::detail::kStatementTimeoutOff}
672 auto portal = trx.MakePortal(query, GetLastUpdated(last_update, *data_cache));
674 scope
.Reset(std::string{pg_cache::detail::kFetchStage}
);
675 auto res = portal.Fetch(chunk_size_);
678 scope
.Reset(std::string{pg_cache::detail::kParseStage}
);
679 CacheResults(res, data_cache, stats_scope, scope);
680 changes += res.Size();
681 if (sleep_between_chunks_.count() > 0) {
691 kClusterHostTypeFlags,
692 pg::
CommandControl{timeout, pg_cache::detail::kStatementTimeoutOff},
694 GetLastUpdated(last_update, *data_cache)
697 kClusterHostTypeFlags,
698 pg::
CommandControl{timeout, pg_cache::detail::kStatementTimeoutOff},
703 scope
.Reset(std::string{pg_cache::detail::kParseStage}
);
704 CacheResults(res, data_cache, stats_scope, scope);
705 changes += res.Size();
711 if constexpr (pg_cache::detail::kIsContainerCopiedByElement<DataType>) {
713 const auto elapsed_copy = scope
.ElapsedTotal(std::string{pg_cache::detail::kCopyStage}
);
714 if (elapsed_copy > pg_cache::detail::kCpuRelaxThreshold) {
715 cpu_relax_iterations_copy_ =
static_cast<
716 std::size_t>(
static_cast<
double>(old_size) / (elapsed_copy / pg_cache::detail::kCpuRelaxInterval));
718 <<
"Elapsed time for copying " << kName <<
" " << elapsed_copy.count() <<
" for " << changes
719 <<
" data items is over threshold. Will relax CPU every " << cpu_relax_iterations_parse_
726 const auto elapsed_parse = scope
.ElapsedTotal(std::string{pg_cache::detail::kParseStage}
);
727 if (elapsed_parse > pg_cache::detail::kCpuRelaxThreshold) {
728 cpu_relax_iterations_parse_ =
static_cast<
729 std::size_t>(
static_cast<
double>(changes) / (elapsed_parse / pg_cache::detail::kCpuRelaxInterval));
731 <<
"Elapsed time for parsing " << kName <<
" " << elapsed_parse.count() <<
" for " << changes
732 <<
" data items is over threshold. Will relax CPU every " << cpu_relax_iterations_parse_
738 pg_cache::detail::OnWritesDone(*data_cache);
740 this->Set(std::move(data_cache));
746template <
typename PostgreCachePolicy>
747bool PostgreCache<PostgreCachePolicy>::MayReturnNull()
const {
748 return pg_cache::detail::MayReturnNull<PolicyType>();
751template <
typename PostgreCachePolicy>
752void PostgreCache<PostgreCachePolicy>::CacheResults(
754 CachedData& data_cache,
755 cache::UpdateStatisticsScope& stats_scope,
758 auto values = res.AsSetOf<RawValueType>(pg_cache::detail::GetTag<PostgreCachePolicy>());
760 for (
auto p = values.begin(); p != values.end(); ++p) {
763 using pg_cache::detail::CacheInsertOrAssign;
766 pg_cache::detail::ExtractValue<PostgreCachePolicy>(*p),
767 PostgreCachePolicy::kKeyMember
769 }
catch (
const std::exception& e) {
772 <<
"Error parsing data row in cache '" << kName <<
"' to '" <<
compiler::GetTypeName<ValueType>()
773 <<
"': " << e.what();
778template <
typename PostgreCachePolicy>
779typename PostgreCache<PostgreCachePolicy>::CachedData PostgreCache<
782 auto data =
this->Get();
784 return pg_cache::detail::CopyContainer(*data, cpu_relax_iterations_copy_, scope);
787 return std::make_unique<DataType>();
792std::string GetPostgreCacheSchema();
796template <
typename PostgreCachePolicy>
797yaml_config::Schema PostgreCache<PostgreCachePolicy>::GetStaticConfigSchema() {
798 using ParentType =
typename pg_cache::detail::PolicyChecker<PostgreCachePolicy>::BaseType;
799 return yaml_config::MergeSchemas<ParentType>(impl::GetPostgreCacheSchema());
804namespace utils::impl::projected_set {
806template <
typename Set,
typename Value,
typename KeyMember>
807void CacheInsertOrAssign(Set& set, Value&& value,
const KeyMember& ) {
808 DoInsert(set, std::forward<Value>(value));