6#include <userver/cache/base_postgres_cache_fwd.hpp>
12#include <unordered_map>
14#include <fmt/format.h>
16#include <userver/cache/cache_statistics.hpp>
17#include <userver/cache/caching_component_base.hpp>
18#include <userver/components/component_config.hpp>
19#include <userver/components/component_context.hpp>
21#include <userver/storages/postgres/cluster.hpp>
22#include <userver/storages/postgres/component.hpp>
23#include <userver/storages/postgres/io/chrono.hpp>
25#include <userver/compiler/demangle.hpp>
26#include <userver/engine/sleep.hpp>
27#include <userver/logging/log.hpp>
28#include <userver/tracing/span.hpp>
29#include <userver/utils/assert.hpp>
30#include <userver/utils/cpu_relax.hpp>
31#include <userver/utils/meta.hpp>
32#include <userver/utils/void_t.hpp>
33#include <userver/yaml_config/merge_schemas.hpp>
35USERVER_NAMESPACE_BEGIN
130namespace pg_cache::detail {
133using ValueType =
typename T::ValueType;
135inline constexpr bool kHasValueType = meta::IsDetected<ValueType, T>;
138using RawValueTypeImpl =
typename T::RawValueType;
140inline constexpr bool kHasRawValueType = meta::IsDetected<RawValueTypeImpl, T>;
142using RawValueType = meta::DetectedOr<ValueType<T>, RawValueTypeImpl, T>;
144template <
typename PostgreCachePolicy>
145auto ExtractValue(RawValueType<PostgreCachePolicy>&& raw) {
146 if constexpr (kHasRawValueType<PostgreCachePolicy>) {
147 return Convert(std::move(raw),
formats::
parse::
To<ValueType<PostgreCachePolicy>>());
149 return std::move(raw);
155using HasNameImpl = std::enable_if_t<!std::string_view{T::kName}.empty()>;
157inline constexpr bool kHasName = meta::IsDetected<HasNameImpl, T>;
161using HasQueryImpl =
decltype(T::kQuery);
163inline constexpr bool kHasQuery = meta::IsDetected<HasQueryImpl, T>;
167using HasGetQueryImpl =
decltype(T::GetQuery());
169inline constexpr bool kHasGetQuery = meta::IsDetected<HasGetQueryImpl, T>;
173using HasWhere =
decltype(T::kWhere);
175inline constexpr bool kHasWhere = meta::IsDetected<HasWhere, T>;
179using HasOrderBy =
decltype(T::kOrderBy);
181inline constexpr bool kHasOrderBy = meta::IsDetected<HasOrderBy, T>;
185using HasUpdatedField =
decltype(T::kUpdatedField);
187inline constexpr bool kHasUpdatedField = meta::IsDetected<HasUpdatedField, T>;
190using WantIncrementalUpdates = std::enable_if_t<!std::string_view{T::kUpdatedField}.empty()>;
192inline constexpr bool kWantIncrementalUpdates = meta::IsDetected<WantIncrementalUpdates, T>;
196using KeyMemberTypeImpl = std::decay_t<std::invoke_result_t<
decltype(T::kKeyMember), ValueType<T>>>;
198inline constexpr bool kHasKeyMember = meta::IsDetected<KeyMemberTypeImpl, T>;
200using KeyMemberType = meta::DetectedType<KeyMemberTypeImpl, T>;
204using SizeMethodInvokeResultImpl =
decltype(std::declval<T>().size());
206inline constexpr bool kHasSizeMethod = meta::IsDetected<SizeMethodInvokeResultImpl, T> &&
207 std::is_convertible_v<SizeMethodInvokeResultImpl<T>, std::size_t>;
211using InsertOrAssignMethodInvokeResultImpl =
decltype(std::declval<
typename T::CacheContainer>().insert_or_assign(
212 std::declval<KeyMemberTypeImpl<T>>(),
213 std::declval<ValueType<T>>()
216inline constexpr bool kHasInsertOrAssignMethod = meta::IsDetected<InsertOrAssignMethodInvokeResultImpl, T>;
220using CacheInsertOrAssignFunctionInvokeResultImpl =
decltype(CacheInsertOrAssign(
221 std::declval<
typename T::CacheContainer&>(),
222 std::declval<ValueType<T>>(),
223 std::declval<KeyMemberTypeImpl<T>>()
226inline constexpr bool kHasCacheInsertOrAssignFunction =
227 meta::IsDetected<CacheInsertOrAssignFunctionInvokeResultImpl, T>;
230template <
typename T,
typename = USERVER_NAMESPACE::
utils::void_t<>>
231struct DataCacheContainer {
233 meta::kIsStdHashable<KeyMemberType<T>>,
234 "With default CacheContainer, key type must be std::hash-able"
237 using type = std::unordered_map<KeyMemberType<T>, ValueType<T>>;
241struct DataCacheContainer<T, USERVER_NAMESPACE::
utils::void_t<
typename T::CacheContainer>> {
242 static_assert(kHasSizeMethod<
typename T::CacheContainer>,
"Custom CacheContainer must provide `size` method");
244 kHasInsertOrAssignMethod<T> || kHasCacheInsertOrAssignFunction<T>,
245 "Custom CacheContainer must provide `insert_or_assign` method similar to std::unordered_map's "
246 "one or CacheInsertOrAssign function"
249 using type =
typename T::CacheContainer;
253using DataCacheContainerType =
typename DataCacheContainer<T>::type;
258inline constexpr bool kIsContainerCopiedByElement =
259 meta::kIsInstantiationOf<std::unordered_map, T> || meta::kIsInstantiationOf<std::map, T>;
263CopyContainer(
const T& container, [[maybe_unused]] std::size_t cpu_relax_iterations,
tracing::ScopeTime& scope) {
264 if constexpr (kIsContainerCopiedByElement<T>) {
265 auto copy = std::make_unique<T>();
266 if constexpr (meta::kIsReservable<T>) {
267 copy->reserve(container.size());
271 for (
const auto& kv : container) {
277 return std::make_unique<T>(container);
281template <
typename Container,
typename Value,
typename KeyMember,
typename... Args>
282void CacheInsertOrAssign(Container& container, Value&& value,
const KeyMember& key_member, Args&&... ) {
284 static_assert(
sizeof...(Args) == 0);
286 auto key = std::invoke(key_member, value);
287 container.insert_or_assign(std::move(key), std::forward<Value>(value));
291using HasOnWritesDoneImpl =
decltype(std::declval<T&>().OnWritesDone());
294void OnWritesDone(T& container) {
295 if constexpr (meta::IsDetected<HasOnWritesDoneImpl, T>) {
296 container.OnWritesDone();
301using HasCustomUpdatedImpl =
decltype(T::GetLastKnownUpdated(std::declval<DataCacheContainerType<T>>()));
304inline constexpr bool kHasCustomUpdated = meta::IsDetected<HasCustomUpdatedImpl, T>;
307using UpdatedFieldTypeImpl =
typename T::UpdatedFieldType;
309inline constexpr bool kHasUpdatedFieldType = meta::IsDetected<UpdatedFieldTypeImpl, T>;
311using UpdatedFieldType = meta::DetectedOr<storages::
postgres::TimePointTz, UpdatedFieldTypeImpl, T>;
314constexpr bool CheckUpdatedFieldType() {
315 if constexpr (kHasUpdatedFieldType<T>) {
316#if USERVER_POSTGRES_ENABLE_LEGACY_TIMESTAMP
318 std::is_same_v<
typename T::UpdatedFieldType, storages::postgres::TimePointTz> ||
319 std::is_same_v<
typename T::UpdatedFieldType, storages::postgres::TimePointWithoutTz> ||
320 std::is_same_v<
typename T::UpdatedFieldType, storages::postgres::TimePoint> || kHasCustomUpdated<T>,
321 "Invalid UpdatedFieldType, must be either TimePointTz or "
323 "or (legacy) system_clock::time_point"
327 std::is_same_v<
typename T::UpdatedFieldType, storages::
postgres::TimePointTz> ||
328 std::is_same_v<
typename T::UpdatedFieldType, storages::
postgres::TimePointWithoutTz> ||
329 kHasCustomUpdated<T>,
330 "Invalid UpdatedFieldType, must be either TimePointTz or "
336 !kWantIncrementalUpdates<T>,
337 "UpdatedFieldType must be explicitly specified when using "
338 "incremental updates"
346using HasClusterHostTypeImpl =
decltype(T::kClusterHostType);
349constexpr storages::
postgres::ClusterHostTypeFlags ClusterHostType() {
350 if constexpr (meta::IsDetected<HasClusterHostTypeImpl, T>) {
351 return T::kClusterHostType;
359using HasMayReturnNull =
decltype(T::kMayReturnNull);
362constexpr bool MayReturnNull() {
363 if constexpr (meta::IsDetected<HasMayReturnNull, T>) {
364 return T::kMayReturnNull;
370template <
typename PostgreCachePolicy>
371struct PolicyChecker {
373 static_assert(kHasName<PostgreCachePolicy>,
"The PosgreSQL cache policy must contain a static member `kName`");
374 static_assert(kHasValueType<PostgreCachePolicy>,
"The PosgreSQL cache policy must define a type alias `ValueType`");
376 kHasKeyMember<PostgreCachePolicy>,
377 "The PostgreSQL cache policy must contain a static member `kKeyMember` "
378 "with a pointer to a data or a function member with the object's key"
381 kHasQuery<PostgreCachePolicy> || kHasGetQuery<PostgreCachePolicy>,
382 "The PosgreSQL cache policy must contain a static data member "
383 "`kQuery` with a select statement or a static member function "
384 "`GetQuery` returning the query"
387 !(kHasQuery<PostgreCachePolicy> && kHasGetQuery<PostgreCachePolicy>),
388 "The PosgreSQL cache policy must define `kQuery` or "
389 "`GetQuery`, not both"
392 kHasUpdatedField<PostgreCachePolicy>,
393 "The PosgreSQL cache policy must contain a static member "
394 "`kUpdatedField`. If you don't want to use incremental updates, "
395 "please set its value to `nullptr`"
397 static_assert(CheckUpdatedFieldType<PostgreCachePolicy>());
400 ClusterHostType<PostgreCachePolicy>() & storages::
postgres::kClusterHostRolesMask,
401 "Cluster host role must be specified for caching component, "
402 "please be more specific"
405 static storages::
postgres::Query GetQuery() {
406 if constexpr (kHasGetQuery<PostgreCachePolicy>) {
407 return PostgreCachePolicy::GetQuery();
409 return PostgreCachePolicy::kQuery;
416inline constexpr std::chrono::minutes kDefaultFullUpdateTimeout{1};
417inline constexpr std::chrono::seconds kDefaultIncrementalUpdateTimeout{1};
418inline constexpr std::chrono::milliseconds kStatementTimeoutOff{0};
419inline constexpr std::chrono::milliseconds kCpuRelaxThreshold{10};
420inline constexpr std::chrono::milliseconds kCpuRelaxInterval{2};
422inline constexpr std::string_view kCopyStage =
"copy_data";
423inline constexpr std::string_view kFetchStage =
"fetch";
424inline constexpr std::string_view kParseStage =
"parse";
426inline constexpr std::size_t kDefaultChunkSize = 1000;
427inline constexpr std::chrono::milliseconds kDefaultSleepBetweenChunks{0};
435template <
typename PostgreCachePolicy>
436class PostgreCache
final :
public pg_cache::detail::PolicyChecker<PostgreCachePolicy>::BaseType {
439 using PolicyType = PostgreCachePolicy;
440 using ValueType = pg_cache::detail::ValueType<PolicyType>;
441 using RawValueType = pg_cache::detail::RawValueType<PolicyType>;
442 using DataType = pg_cache::detail::DataCacheContainerType<PolicyType>;
443 using PolicyCheckerType = pg_cache::detail::PolicyChecker<PostgreCachePolicy>;
444 using UpdatedFieldType = pg_cache::detail::UpdatedFieldType<PostgreCachePolicy>;
445 using BaseType =
typename PolicyCheckerType::BaseType;
448 constexpr static bool kIncrementalUpdates = pg_cache::detail::kWantIncrementalUpdates<PolicyType>;
449 constexpr static auto kClusterHostTypeFlags = pg_cache::detail::ClusterHostType<PolicyType>();
450 constexpr static auto kName = PolicyType::kName;
452 PostgreCache(
const ComponentConfig&,
const ComponentContext&);
453 ~PostgreCache()
override;
455 static yaml_config::Schema GetStaticConfigSchema();
458 using CachedData = std::unique_ptr<DataType>;
460 UpdatedFieldType GetLastUpdated(std::chrono::system_clock::time_point last_update,
const DataType& cache)
const;
464 const std::chrono::system_clock::time_point& last_update,
465 const std::chrono::system_clock::time_point& now,
466 cache::UpdateStatisticsScope& stats_scope
469 bool MayReturnNull()
const override;
474 CachedData& data_cache,
475 cache::UpdateStatisticsScope& stats_scope,
479 static storages::
postgres::Query GetAllQuery();
480 static storages::
postgres::Query GetDeltaQuery();
481 static std::string GetWhereClause();
482 static std::string GetDeltaWhereClause();
483 static std::string GetOrderByClause();
485 std::chrono::milliseconds ParseCorrection(
const ComponentConfig& config);
487 std::vector<storages::
postgres::ClusterPtr> clusters_;
489 const std::chrono::system_clock::duration correction_;
490 const std::chrono::milliseconds full_update_timeout_;
491 const std::chrono::milliseconds incremental_update_timeout_;
492 const std::size_t chunk_size_;
493 const std::chrono::milliseconds sleep_between_chunks_;
494 std::size_t cpu_relax_iterations_parse_{0};
495 std::size_t cpu_relax_iterations_copy_{0};
498template <
typename PostgreCachePolicy>
499inline constexpr bool kHasValidate<PostgreCache<PostgreCachePolicy>> =
true;
501template <
typename PostgreCachePolicy>
502PostgreCache<PostgreCachePolicy>::PostgreCache(
const ComponentConfig& config,
const ComponentContext& context)
503 : BaseType{config, context},
504 correction_{ParseCorrection(config)},
505 full_update_timeout_{
506 config
["full-update-op-timeout"].As<std
::chrono
::milliseconds
>(pg_cache::detail::kDefaultFullUpdateTimeout
)},
507 incremental_update_timeout_{config
["incremental-update-op-timeout"].As<std
::chrono
::milliseconds
>(
508 pg_cache::detail::kDefaultIncrementalUpdateTimeout
510 chunk_size_{config
["chunk-size"].As<size_t
>(pg_cache::detail::kDefaultChunkSize
)},
511 sleep_between_chunks_{
512 config
["sleep-between-chunks"].As<std
::chrono
::milliseconds
>(pg_cache::detail::kDefaultSleepBetweenChunks
)} {
515 "Either set 'chunk-size' to 0, or enable PostgreSQL portals by building "
516 "the framework with CMake option USERVER_FEATURE_PATCH_LIBPQ set to ON."
520 throw std::logic_error(
521 "Incremental update support is requested in config but no update field "
522 "name is specified in traits of '" +
526 if (correction_.count() < 0) {
527 throw std::logic_error(
528 "Refusing to set forward (negative) update correction requested in "
534 const auto pg_alias = config
["pgcomponent"].As<std
::string
>("");
535 if (pg_alias.empty()) {
540 clusters_.resize(shard_count);
541 for (size_t i = 0; i < shard_count; ++i) {
548 this->StartPeriodicUpdates();
551template <
typename PostgreCachePolicy>
552PostgreCache<PostgreCachePolicy>::~PostgreCache() {
553 this->StopPeriodicUpdates();
556template <
typename PostgreCachePolicy>
557std::string PostgreCache<PostgreCachePolicy>::GetWhereClause() {
558 if constexpr (pg_cache::detail::kHasWhere<PostgreCachePolicy>) {
559 return fmt::format(FMT_COMPILE(
"where {}"), PostgreCachePolicy::kWhere);
565template <
typename PostgreCachePolicy>
566std::string PostgreCache<PostgreCachePolicy>::GetDeltaWhereClause() {
567 if constexpr (pg_cache::detail::kHasWhere<PostgreCachePolicy>) {
569 FMT_COMPILE(
"where ({}) and {} >= $1"), PostgreCachePolicy::kWhere, PostgreCachePolicy::kUpdatedField
572 return fmt::format(FMT_COMPILE(
"where {} >= $1"), PostgreCachePolicy::kUpdatedField);
576template <
typename PostgreCachePolicy>
577std::string PostgreCache<PostgreCachePolicy>::GetOrderByClause() {
578 if constexpr (pg_cache::detail::kHasOrderBy<PostgreCachePolicy>) {
579 return fmt::format(FMT_COMPILE(
"order by {}"), PostgreCachePolicy::kOrderBy);
585template <
typename PostgreCachePolicy>
586storages::
postgres::Query PostgreCache<PostgreCachePolicy>::GetAllQuery() {
587 const storages::
postgres::Query query = PolicyCheckerType::GetQuery();
588 return fmt::format(
"{} {} {}", query
.GetStatementView(), GetWhereClause(), GetOrderByClause());
591template <
typename PostgreCachePolicy>
592storages::
postgres::Query PostgreCache<PostgreCachePolicy>::GetDeltaQuery() {
593 if constexpr (kIncrementalUpdates) {
594 const storages::
postgres::Query query = PolicyCheckerType::GetQuery();
596 fmt::format(
"{} {} {}", query
.GetStatementView(), GetDeltaWhereClause(), GetOrderByClause()),
600 return GetAllQuery();
604template <
typename PostgreCachePolicy>
605std::chrono::milliseconds PostgreCache<PostgreCachePolicy>::ParseCorrection(
const ComponentConfig& config) {
606 static constexpr std::string_view kUpdateCorrection =
"update-correction";
607 if (pg_cache::detail::kHasCustomUpdated<PostgreCachePolicy> ||
609 return config
[kUpdateCorrection
].As<std
::chrono
::milliseconds
>(0
);
611 return config
[kUpdateCorrection
].As<std
::chrono
::milliseconds
>();
615template <
typename PostgreCachePolicy>
616typename PostgreCache<PostgreCachePolicy>::UpdatedFieldType PostgreCache<PostgreCachePolicy>::GetLastUpdated(
617 [[maybe_unused]] std::chrono::system_clock::time_point last_update,
618 const DataType& cache
620 if constexpr (pg_cache::detail::kHasCustomUpdated<PostgreCachePolicy>) {
621 return PostgreCachePolicy::GetLastKnownUpdated(cache);
623 return UpdatedFieldType{last_update - correction_};
627template <
typename PostgreCachePolicy>
628void PostgreCache<PostgreCachePolicy>::Update(
630 const std::chrono::system_clock::time_point& last_update,
631 const std::chrono::system_clock::time_point& ,
632 cache::UpdateStatisticsScope& stats_scope
635 if constexpr (!kIncrementalUpdates) {
639 const std::chrono::milliseconds timeout =
644 auto data_cache = GetDataSnapshot(type, scope);
645 [[maybe_unused]]
const auto old_size = data_cache->size();
647 scope.Reset(std::string{pg_cache::detail::kFetchStage});
651 for (
auto& cluster : clusters_) {
652 if (chunk_size_ > 0) {
653 auto trx = cluster->Begin(
654 kClusterHostTypeFlags,
656 pg::
CommandControl{timeout, pg_cache::detail::kStatementTimeoutOff}
658 auto portal = trx.MakePortal(query, GetLastUpdated(last_update, *data_cache));
660 scope.Reset(std::string{pg_cache::detail::kFetchStage});
661 auto res = portal.Fetch(chunk_size_);
664 scope.Reset(std::string{pg_cache::detail::kParseStage});
665 CacheResults(res, data_cache, stats_scope, scope);
666 changes += res.Size();
667 if (sleep_between_chunks_.count() > 0) {
674 auto res = has_parameter ? cluster->Execute(
675 kClusterHostTypeFlags,
676 pg::
CommandControl{timeout, pg_cache::detail::kStatementTimeoutOff},
678 GetLastUpdated(last_update, *data_cache)
681 kClusterHostTypeFlags,
682 pg::
CommandControl{timeout, pg_cache::detail::kStatementTimeoutOff},
687 scope.Reset(std::string{pg_cache::detail::kParseStage});
688 CacheResults(res, data_cache, stats_scope, scope);
689 changes += res.Size();
695 if constexpr (pg_cache::detail::kIsContainerCopiedByElement<DataType>) {
697 const auto elapsed_copy = scope.ElapsedTotal(std::string{pg_cache::detail::kCopyStage});
698 if (elapsed_copy > pg_cache::detail::kCpuRelaxThreshold) {
699 cpu_relax_iterations_copy_ =
static_cast<std::size_t>(
700 static_cast<
double>(old_size) / (elapsed_copy / pg_cache::detail::kCpuRelaxInterval)
702 LOG_TRACE() <<
"Elapsed time for copying " << kName <<
" " << elapsed_copy.count() <<
" for " << changes
703 <<
" data items is over threshold. Will relax CPU every " << cpu_relax_iterations_parse_
710 const auto elapsed_parse = scope.ElapsedTotal(std::string{pg_cache::detail::kParseStage});
711 if (elapsed_parse > pg_cache::detail::kCpuRelaxThreshold) {
712 cpu_relax_iterations_parse_ =
static_cast<std::size_t>(
713 static_cast<
double>(changes) / (elapsed_parse / pg_cache::detail::kCpuRelaxInterval)
715 LOG_TRACE() <<
"Elapsed time for parsing " << kName <<
" " << elapsed_parse.count() <<
" for " << changes
716 <<
" data items is over threshold. Will relax CPU every " << cpu_relax_iterations_parse_
722 pg_cache::detail::OnWritesDone(*data_cache);
724 this->Set(std::move(data_cache));
730template <
typename PostgreCachePolicy>
731bool PostgreCache<PostgreCachePolicy>::MayReturnNull()
const {
732 return pg_cache::detail::MayReturnNull<PolicyType>();
735template <
typename PostgreCachePolicy>
736void PostgreCache<PostgreCachePolicy>::CacheResults(
738 CachedData& data_cache,
739 cache::UpdateStatisticsScope& stats_scope,
742 auto values = res.AsSetOf<RawValueType>(storages::
postgres::kRowTag);
744 for (
auto p = values.begin(); p != values.end(); ++p) {
747 using pg_cache::detail::CacheInsertOrAssign;
749 *data_cache, pg_cache::detail::ExtractValue<PostgreCachePolicy>(*p), PostgreCachePolicy::kKeyMember
751 }
catch (
const std::exception& e) {
753 LOG_ERROR() <<
"Error parsing data row in cache '" << kName <<
"' to '"
754 <<
compiler::GetTypeName<ValueType>() <<
"': " << e.what();
759template <
typename PostgreCachePolicy>
760typename PostgreCache<PostgreCachePolicy>::CachedData
763 auto data =
this->Get();
765 return pg_cache::detail::CopyContainer(*data, cpu_relax_iterations_copy_, scope);
768 return std::make_unique<DataType>();
773std::string GetPostgreCacheSchema();
777template <
typename PostgreCachePolicy>
778yaml_config::Schema PostgreCache<PostgreCachePolicy>::GetStaticConfigSchema() {
779 using ParentType =
typename pg_cache::detail::PolicyChecker<PostgreCachePolicy>::BaseType;
780 return yaml_config::MergeSchemas<ParentType>(impl::GetPostgreCacheSchema());
785namespace utils::impl::projected_set {
787template <
typename Set,
typename Value,
typename KeyMember>
788void CacheInsertOrAssign(Set& set, Value&& value,
const KeyMember& ) {
789 DoInsert(set, std::forward<Value>(value));