6#include <userver/cache/base_postgres_cache_fwd.hpp>
12#include <unordered_map>
14#include <fmt/format.h>
16#include <userver/cache/cache_statistics.hpp>
17#include <userver/cache/caching_component_base.hpp>
18#include <userver/components/component_config.hpp>
19#include <userver/components/component_context.hpp>
21#include <userver/storages/postgres/cluster.hpp>
22#include <userver/storages/postgres/component.hpp>
23#include <userver/storages/postgres/io/chrono.hpp>
25#include <userver/compiler/demangle.hpp>
26#include <userver/logging/log.hpp>
27#include <userver/tracing/span.hpp>
28#include <userver/utils/assert.hpp>
29#include <userver/utils/cpu_relax.hpp>
30#include <userver/utils/meta.hpp>
31#include <userver/utils/void_t.hpp>
32#include <userver/yaml_config/merge_schemas.hpp>
34USERVER_NAMESPACE_BEGIN
124namespace pg_cache::detail {
127using ValueType =
typename T::ValueType;
129inline constexpr bool kHasValueType = meta::kIsDetected<ValueType, T>;
132using RawValueTypeImpl =
typename T::RawValueType;
134inline constexpr bool kHasRawValueType = meta::kIsDetected<RawValueTypeImpl, T>;
136using RawValueType = meta::DetectedOr<ValueType<T>, RawValueTypeImpl, T>;
138template <
typename PostgreCachePolicy>
139auto ExtractValue(RawValueType<PostgreCachePolicy>&& raw) {
140 if constexpr (kHasRawValueType<PostgreCachePolicy>) {
141 return Convert(std::move(raw),
formats::
parse::
To<ValueType<PostgreCachePolicy>>());
143 return std::move(raw);
149using HasNameImpl = std::enable_if_t<!std::string_view{T::kName}.empty()>;
151inline constexpr bool kHasName = meta::kIsDetected<HasNameImpl, T>;
155using HasQueryImpl =
decltype(T::kQuery);
157inline constexpr bool kHasQuery = meta::kIsDetected<HasQueryImpl, T>;
161using HasGetQueryImpl =
decltype(T::GetQuery());
163inline constexpr bool kHasGetQuery = meta::kIsDetected<HasGetQueryImpl, T>;
167using HasWhere =
decltype(T::kWhere);
169inline constexpr bool kHasWhere = meta::kIsDetected<HasWhere, T>;
173using HasOrderBy =
decltype(T::kOrderBy);
175inline constexpr bool kHasOrderBy = meta::kIsDetected<HasOrderBy, T>;
179using HasUpdatedField =
decltype(T::kUpdatedField);
181inline constexpr bool kHasUpdatedField = meta::kIsDetected<HasUpdatedField, T>;
184using WantIncrementalUpdates = std::enable_if_t<!std::string_view{T::kUpdatedField}.empty()>;
186inline constexpr bool kWantIncrementalUpdates = meta::kIsDetected<WantIncrementalUpdates, T>;
190using KeyMemberTypeImpl = std::decay_t<std::invoke_result_t<
decltype(T::kKeyMember), ValueType<T>>>;
192inline constexpr bool kHasKeyMember = meta::kIsDetected<KeyMemberTypeImpl, T>;
194using KeyMemberType = meta::DetectedType<KeyMemberTypeImpl, T>;
197template <
typename T,
typename = USERVER_NAMESPACE::
utils::void_t<>>
198struct DataCacheContainer {
200 meta::kIsStdHashable<KeyMemberType<T>>,
201 "With default CacheContainer, key type must be std::hash-able"
204 using type = std::unordered_map<KeyMemberType<T>, ValueType<T>>;
208struct DataCacheContainer<T, USERVER_NAMESPACE::
utils::void_t<
typename T::CacheContainer>> {
209 using type =
typename T::CacheContainer;
213using DataCacheContainerType =
typename DataCacheContainer<T>::type;
218inline constexpr bool kIsContainerCopiedByElement =
219 meta::kIsInstantiationOf<std::unordered_map, T> || meta::kIsInstantiationOf<std::map, T>;
223CopyContainer(
const T& container, [[maybe_unused]] std::size_t cpu_relax_iterations,
tracing::
ScopeTime& scope) {
224 if constexpr (kIsContainerCopiedByElement<T>) {
225 auto copy = std::make_unique<T>();
226 if constexpr (meta::kIsReservable<T>) {
227 copy->reserve(container.size());
231 for (
const auto& kv : container) {
237 return std::make_unique<T>(container);
241template <
typename Container,
typename Value,
typename KeyMember,
typename... Args>
242void CacheInsertOrAssign(Container& container, Value&& value,
const KeyMember& key_member, Args&&... ) {
244 static_assert(
sizeof...(Args) == 0);
246 auto key = std::invoke(key_member, value);
247 container.insert_or_assign(std::move(key), std::forward<Value>(value));
251using HasOnWritesDoneImpl =
decltype(std::declval<T&>().OnWritesDone());
254void OnWritesDone(T& container) {
255 if constexpr (meta::kIsDetected<HasOnWritesDoneImpl, T>) {
256 container.OnWritesDone();
261using HasCustomUpdatedImpl =
decltype(T::GetLastKnownUpdated(std::declval<DataCacheContainerType<T>>()));
264inline constexpr bool kHasCustomUpdated = meta::kIsDetected<HasCustomUpdatedImpl, T>;
267using UpdatedFieldTypeImpl =
typename T::UpdatedFieldType;
269inline constexpr bool kHasUpdatedFieldType = meta::kIsDetected<UpdatedFieldTypeImpl, T>;
271using UpdatedFieldType = meta::DetectedOr<storages::
postgres::TimePointTz, UpdatedFieldTypeImpl, T>;
274constexpr bool CheckUpdatedFieldType() {
275 if constexpr (kHasUpdatedFieldType<T>) {
276#if USERVER_POSTGRES_ENABLE_LEGACY_TIMESTAMP
278 std::is_same_v<
typename T::UpdatedFieldType, storages::postgres::TimePointTz> ||
279 std::is_same_v<
typename T::UpdatedFieldType, storages::postgres::TimePointWithoutTz> ||
280 std::is_same_v<
typename T::UpdatedFieldType, storages::postgres::TimePoint> || kHasCustomUpdated<T>,
281 "Invalid UpdatedFieldType, must be either TimePointTz or "
283 "or (legacy) system_clock::time_point"
287 std::is_same_v<
typename T::UpdatedFieldType, storages::
postgres::TimePointTz> ||
288 std::is_same_v<
typename T::UpdatedFieldType, storages::
postgres::TimePointWithoutTz> ||
289 kHasCustomUpdated<T>,
290 "Invalid UpdatedFieldType, must be either TimePointTz or "
296 !kWantIncrementalUpdates<T>,
297 "UpdatedFieldType must be explicitly specified when using "
298 "incremental updates"
306using HasClusterHostTypeImpl =
decltype(T::kClusterHostType);
309constexpr storages::
postgres::ClusterHostTypeFlags ClusterHostType() {
310 if constexpr (meta::kIsDetected<HasClusterHostTypeImpl, T>) {
311 return T::kClusterHostType;
319using HasMayReturnNull =
decltype(T::kMayReturnNull);
322constexpr bool MayReturnNull() {
323 if constexpr (meta::kIsDetected<HasMayReturnNull, T>) {
324 return T::kMayReturnNull;
330template <
typename PostgreCachePolicy>
331struct PolicyChecker {
333 static_assert(kHasName<PostgreCachePolicy>,
"The PosgreSQL cache policy must contain a static member `kName`");
334 static_assert(kHasValueType<PostgreCachePolicy>,
"The PosgreSQL cache policy must define a type alias `ValueType`");
336 kHasKeyMember<PostgreCachePolicy>,
337 "The PostgreSQL cache policy must contain a static member `kKeyMember` "
338 "with a pointer to a data or a function member with the object's key"
341 kHasQuery<PostgreCachePolicy> || kHasGetQuery<PostgreCachePolicy>,
342 "The PosgreSQL cache policy must contain a static data member "
343 "`kQuery` with a select statement or a static member function "
344 "`GetQuery` returning the query"
347 !(kHasQuery<PostgreCachePolicy> && kHasGetQuery<PostgreCachePolicy>),
348 "The PosgreSQL cache policy must define `kQuery` or "
349 "`GetQuery`, not both"
352 kHasUpdatedField<PostgreCachePolicy>,
353 "The PosgreSQL cache policy must contain a static member "
354 "`kUpdatedField`. If you don't want to use incremental updates, "
355 "please set its value to `nullptr`"
357 static_assert(CheckUpdatedFieldType<PostgreCachePolicy>());
360 ClusterHostType<PostgreCachePolicy>() & storages::
postgres::kClusterHostRolesMask,
361 "Cluster host role must be specified for caching component, "
362 "please be more specific"
365 static storages::
postgres::Query GetQuery() {
366 if constexpr (kHasGetQuery<PostgreCachePolicy>) {
367 return PostgreCachePolicy::GetQuery();
369 return PostgreCachePolicy::kQuery;
376inline constexpr std::chrono::minutes kDefaultFullUpdateTimeout{1};
377inline constexpr std::chrono::seconds kDefaultIncrementalUpdateTimeout{1};
378inline constexpr std::chrono::milliseconds kStatementTimeoutOff{0};
379inline constexpr std::chrono::milliseconds kCpuRelaxThreshold{10};
380inline constexpr std::chrono::milliseconds kCpuRelaxInterval{2};
382inline constexpr std::string_view kCopyStage =
"copy_data";
383inline constexpr std::string_view kFetchStage =
"fetch";
384inline constexpr std::string_view kParseStage =
"parse";
386inline constexpr std::size_t kDefaultChunkSize = 1000;
394template <
typename PostgreCachePolicy>
395class PostgreCache
final :
public pg_cache::detail::PolicyChecker<PostgreCachePolicy>::BaseType {
398 using PolicyType = PostgreCachePolicy;
399 using ValueType = pg_cache::detail::ValueType<PolicyType>;
400 using RawValueType = pg_cache::detail::RawValueType<PolicyType>;
401 using DataType = pg_cache::detail::DataCacheContainerType<PolicyType>;
402 using PolicyCheckerType = pg_cache::detail::PolicyChecker<PostgreCachePolicy>;
403 using UpdatedFieldType = pg_cache::detail::UpdatedFieldType<PostgreCachePolicy>;
404 using BaseType =
typename PolicyCheckerType::BaseType;
407 constexpr static bool kIncrementalUpdates = pg_cache::detail::kWantIncrementalUpdates<PolicyType>;
408 constexpr static auto kClusterHostTypeFlags = pg_cache::detail::ClusterHostType<PolicyType>();
409 constexpr static auto kName = PolicyType::kName;
411 PostgreCache(
const ComponentConfig&,
const ComponentContext&);
412 ~PostgreCache()
override;
414 static yaml_config::Schema GetStaticConfigSchema();
417 using CachedData = std::unique_ptr<DataType>;
419 UpdatedFieldType GetLastUpdated(std::chrono::system_clock::time_point last_update,
const DataType& cache)
const;
423 const std::chrono::system_clock::time_point& last_update,
424 const std::chrono::system_clock::time_point& now,
425 cache::UpdateStatisticsScope& stats_scope
428 bool MayReturnNull()
const override;
433 CachedData& data_cache,
434 cache::UpdateStatisticsScope& stats_scope,
438 static storages::
postgres::Query GetAllQuery();
439 static storages::
postgres::Query GetDeltaQuery();
440 static std::string GetWhereClause();
441 static std::string GetDeltaWhereClause();
442 static std::string GetOrderByClause();
444 std::chrono::milliseconds ParseCorrection(
const ComponentConfig& config);
446 std::vector<storages::
postgres::ClusterPtr> clusters_;
448 const std::chrono::system_clock::duration correction_;
449 const std::chrono::milliseconds full_update_timeout_;
450 const std::chrono::milliseconds incremental_update_timeout_;
451 const std::size_t chunk_size_;
452 std::size_t cpu_relax_iterations_parse_{0};
453 std::size_t cpu_relax_iterations_copy_{0};
395class PostgreCache
final :
public pg_cache::detail::PolicyChecker<PostgreCachePolicy>::BaseType {
…};
456template <
typename PostgreCachePolicy>
457inline constexpr bool kHasValidate<PostgreCache<PostgreCachePolicy>> =
true;
459template <
typename PostgreCachePolicy>
460PostgreCache<PostgreCachePolicy>::PostgreCache(
const ComponentConfig& config,
const ComponentContext& context)
461 : BaseType{config, context},
462 correction_{ParseCorrection(config)},
463 full_update_timeout_{
464 config
["full-update-op-timeout"].As<std
::chrono
::milliseconds
>(pg_cache::detail::kDefaultFullUpdateTimeout
)},
465 incremental_update_timeout_{config
["incremental-update-op-timeout"].As<std
::chrono
::milliseconds
>(
466 pg_cache::detail::kDefaultIncrementalUpdateTimeout
468 chunk_size_{config
["chunk-size"].As<size_t
>(pg_cache::detail::kDefaultChunkSize
)} {
471 "Either set 'chunk-size' to 0, or enable PostgreSQL portals by building "
472 "the framework with CMake option USERVER_FEATURE_PATCH_LIBPQ set to ON."
476 throw std::logic_error(
477 "Incremental update support is requested in config but no update field "
478 "name is specified in traits of '" +
479 config.Name() +
"' cache"
482 if (correction_.count() < 0) {
483 throw std::logic_error(
484 "Refusing to set forward (negative) update correction requested in "
486 config.Name() +
"' cache"
490 const auto pg_alias = config
["pgcomponent"].As<std
::string
>("");
491 if (pg_alias.empty()) {
496 clusters_.resize(shard_count);
497 for (size_t i = 0; i < shard_count; ++i) {
501 LOG_INFO() <<
"Cache " << kName <<
" full update query `" << GetAllQuery().Statement()
502 <<
"` incremental update query `" << GetDeltaQuery().Statement() <<
"`";
504 this->StartPeriodicUpdates();
507template <
typename PostgreCachePolicy>
508PostgreCache<PostgreCachePolicy>::~PostgreCache() {
509 this->StopPeriodicUpdates();
512template <
typename PostgreCachePolicy>
513std::string PostgreCache<PostgreCachePolicy>::GetWhereClause() {
514 if constexpr (pg_cache::detail::kHasWhere<PostgreCachePolicy>) {
515 return fmt::format(FMT_COMPILE(
"where {}"), PostgreCachePolicy::kWhere);
521template <
typename PostgreCachePolicy>
522std::string PostgreCache<PostgreCachePolicy>::GetDeltaWhereClause() {
523 if constexpr (pg_cache::detail::kHasWhere<PostgreCachePolicy>) {
525 FMT_COMPILE(
"where ({}) and {} >= $1"), PostgreCachePolicy::kWhere, PostgreCachePolicy::kUpdatedField
528 return fmt::format(FMT_COMPILE(
"where {} >= $1"), PostgreCachePolicy::kUpdatedField);
532template <
typename PostgreCachePolicy>
533std::string PostgreCache<PostgreCachePolicy>::GetOrderByClause() {
534 if constexpr (pg_cache::detail::kHasOrderBy<PostgreCachePolicy>) {
535 return fmt::format(FMT_COMPILE(
"order by {}"), PostgreCachePolicy::kOrderBy);
541template <
typename PostgreCachePolicy>
542storages::
postgres::Query PostgreCache<PostgreCachePolicy>::GetAllQuery() {
543 storages::
postgres::Query query = PolicyCheckerType::GetQuery();
544 return fmt::format(
"{} {} {}", query.Statement(), GetWhereClause(), GetOrderByClause());
547template <
typename PostgreCachePolicy>
548storages::
postgres::Query PostgreCache<PostgreCachePolicy>::GetDeltaQuery() {
549 if constexpr (kIncrementalUpdates) {
550 storages::
postgres::Query query = PolicyCheckerType::GetQuery();
552 fmt::format(
"{} {} {}", query.Statement(), GetDeltaWhereClause(), GetOrderByClause()),
556 return GetAllQuery();
560template <
typename PostgreCachePolicy>
561std::chrono::milliseconds PostgreCache<PostgreCachePolicy>::ParseCorrection(
const ComponentConfig& config) {
562 static constexpr std::string_view kUpdateCorrection =
"update-correction";
563 if (pg_cache::detail::kHasCustomUpdated<PostgreCachePolicy> ||
565 return config
[kUpdateCorrection
].As<std
::chrono
::milliseconds
>(0
);
567 return config
[kUpdateCorrection
].As<std
::chrono
::milliseconds
>();
571template <
typename PostgreCachePolicy>
572typename PostgreCache<PostgreCachePolicy>::UpdatedFieldType PostgreCache<PostgreCachePolicy>::GetLastUpdated(
573 [[maybe_unused]] std::chrono::system_clock::time_point last_update,
574 const DataType& cache
576 if constexpr (pg_cache::detail::kHasCustomUpdated<PostgreCachePolicy>) {
577 return PostgreCachePolicy::GetLastKnownUpdated(cache);
579 return UpdatedFieldType{last_update - correction_};
583template <
typename PostgreCachePolicy>
584void PostgreCache<PostgreCachePolicy>::Update(
586 const std::chrono::system_clock::time_point& last_update,
587 const std::chrono::system_clock::time_point& ,
588 cache::UpdateStatisticsScope& stats_scope
591 if constexpr (!kIncrementalUpdates) {
595 const std::chrono::milliseconds timeout =
600 auto data_cache = GetDataSnapshot(type, scope);
601 [[maybe_unused]]
const auto old_size = data_cache->size();
603 scope
.Reset(std::string{pg_cache::detail::kFetchStage}
);
607 for (
auto& cluster : clusters_) {
608 if (chunk_size_ > 0) {
609 auto trx = cluster->Begin(
610 kClusterHostTypeFlags,
612 pg::
CommandControl{timeout, pg_cache::detail::kStatementTimeoutOff}
614 auto portal = trx.MakePortal(query, GetLastUpdated(last_update, *data_cache));
616 scope
.Reset(std::string{pg_cache::detail::kFetchStage}
);
617 auto res = portal.Fetch(chunk_size_);
620 scope
.Reset(std::string{pg_cache::detail::kParseStage}
);
621 CacheResults(res, data_cache, stats_scope, scope);
622 changes += res.Size();
626 bool has_parameter = query.Statement().find(
'$') != std::string::npos;
627 auto res = has_parameter ? cluster->Execute(
628 kClusterHostTypeFlags,
629 pg::
CommandControl{timeout, pg_cache::detail::kStatementTimeoutOff},
631 GetLastUpdated(last_update, *data_cache)
634 kClusterHostTypeFlags,
635 pg::
CommandControl{timeout, pg_cache::detail::kStatementTimeoutOff},
640 scope
.Reset(std::string{pg_cache::detail::kParseStage}
);
641 CacheResults(res, data_cache, stats_scope, scope);
642 changes += res.Size();
648 if constexpr (pg_cache::detail::kIsContainerCopiedByElement<DataType>) {
650 const auto elapsed_copy = scope
.ElapsedTotal(std::string{pg_cache::detail::kCopyStage}
);
651 if (elapsed_copy > pg_cache::detail::kCpuRelaxThreshold) {
652 cpu_relax_iterations_copy_ =
static_cast<std::size_t>(
653 static_cast<
double>(old_size) / (elapsed_copy / pg_cache::detail::kCpuRelaxInterval)
655 LOG_TRACE() <<
"Elapsed time for copying " << kName <<
" " << elapsed_copy.count() <<
" for " << changes
656 <<
" data items is over threshold. Will relax CPU every " << cpu_relax_iterations_parse_
663 const auto elapsed_parse = scope
.ElapsedTotal(std::string{pg_cache::detail::kParseStage}
);
664 if (elapsed_parse > pg_cache::detail::kCpuRelaxThreshold) {
665 cpu_relax_iterations_parse_ =
static_cast<std::size_t>(
666 static_cast<
double>(changes) / (elapsed_parse / pg_cache::detail::kCpuRelaxInterval)
668 LOG_TRACE() <<
"Elapsed time for parsing " << kName <<
" " << elapsed_parse.count() <<
" for " << changes
669 <<
" data items is over threshold. Will relax CPU every " << cpu_relax_iterations_parse_
675 pg_cache::detail::OnWritesDone(*data_cache);
677 this->Set(std::move(data_cache));
683template <
typename PostgreCachePolicy>
684bool PostgreCache<PostgreCachePolicy>::MayReturnNull()
const {
685 return pg_cache::detail::MayReturnNull<PolicyType>();
688template <
typename PostgreCachePolicy>
689void PostgreCache<PostgreCachePolicy>::CacheResults(
691 CachedData& data_cache,
692 cache::UpdateStatisticsScope& stats_scope,
695 auto values = res.AsSetOf<RawValueType>(storages::
postgres::kRowTag);
697 for (
auto p = values.begin(); p != values.end(); ++p) {
700 using pg_cache::detail::CacheInsertOrAssign;
702 *data_cache, pg_cache::detail::ExtractValue<PostgreCachePolicy>(*p), PostgreCachePolicy::kKeyMember
704 }
catch (
const std::exception& e) {
706 LOG_ERROR() <<
"Error parsing data row in cache '" << kName <<
"' to '"
707 <<
compiler::GetTypeName<ValueType>() <<
"': " << e.what();
712template <
typename PostgreCachePolicy>
713typename PostgreCache<PostgreCachePolicy>::CachedData
716 auto data =
this->Get();
718 return pg_cache::detail::CopyContainer(*data, cpu_relax_iterations_copy_, scope);
721 return std::make_unique<DataType>();
726std::string GetPostgreCacheSchema();
730template <
typename PostgreCachePolicy>
731yaml_config::Schema PostgreCache<PostgreCachePolicy>::GetStaticConfigSchema() {
732 using ParentType =
typename pg_cache::detail::PolicyChecker<PostgreCachePolicy>::BaseType;
733 return yaml_config::MergeSchemas<ParentType>(impl::GetPostgreCacheSchema());
738namespace utils::impl::projected_set {
740template <
typename Set,
typename Value,
typename KeyMember>
741void CacheInsertOrAssign(Set& set, Value&& value,
const KeyMember& ) {
742 DoInsert(set, std::forward<Value>(value));