6#include <userver/cache/base_postgres_cache_fwd.hpp>
12#include <unordered_map>
14#include <fmt/format.h>
16#include <userver/cache/cache_statistics.hpp>
17#include <userver/cache/caching_component_base.hpp>
18#include <userver/components/component_config.hpp>
19#include <userver/components/component_context.hpp>
21#include <userver/storages/postgres/cluster.hpp>
22#include <userver/storages/postgres/component.hpp>
23#include <userver/storages/postgres/io/chrono.hpp>
25#include <userver/compiler/demangle.hpp>
26#include <userver/engine/sleep.hpp>
27#include <userver/logging/log.hpp>
28#include <userver/tracing/span.hpp>
29#include <userver/utils/assert.hpp>
30#include <userver/utils/cpu_relax.hpp>
31#include <userver/utils/meta.hpp>
32#include <userver/utils/void_t.hpp>
33#include <userver/yaml_config/merge_schemas.hpp>
35USERVER_NAMESPACE_BEGIN
126namespace pg_cache::detail {
129using ValueType =
typename T::ValueType;
131concept HasValueType =
requires {
typename T::ValueType; };
134struct RawValueTypeImpl : std::type_identity<ValueType<T>> {};
137concept HasRawValueType =
requires {
typename T::RawValueType; };
139template <HasRawValueType T>
140struct RawValueTypeImpl<T> : std::type_identity<
typename T::RawValueType> {};
143using RawValueType =
typename RawValueTypeImpl<T>::type;
145template <
typename PostgreCachePolicy>
146auto ExtractValue(RawValueType<PostgreCachePolicy>&& raw) {
147 if constexpr (HasRawValueType<PostgreCachePolicy>) {
148 return Convert(std::move(raw), formats::
parse::To<ValueType<PostgreCachePolicy>>());
150 return std::move(raw);
156concept HasName =
requires {
157 requires !std::string_view { T::kName }
163concept HasQuery =
requires { T::kQuery; };
167concept HasGetQuery =
requires { T::GetQuery(); };
171concept HasWhere =
requires { T::kWhere; };
175concept HasOrderBy =
requires { T::kOrderBy; };
179concept HasTag =
requires { T::kTag; };
181template <
typename PostgreCachePolicy>
183 if constexpr (HasTag<PostgreCachePolicy>) {
184 return PostgreCachePolicy::kTag;
192concept HasUpdatedField =
requires { T::kUpdatedField; };
195concept WantIncrementalUpdates =
requires {
197 std::integral_constant<
bool, !std::string_view{T::kUpdatedField}.empty()>{}
198 } -> std::same_as<std::true_type>;
203concept HasKeyMember =
requires { T::kKeyMember; } && std::invocable<
decltype(T::kKeyMember), ValueType<T>>;
206using KeyMemberType = std::decay_t<std::invoke_result_t<
decltype(T::kKeyMember), ValueType<T>>>;
210concept HasSizeMethod =
requires(T t) {
213 } -> std::convertible_to<std::size_t>;
218concept HasInsertOrAssignMethod =
requires(
typename T::CacheContainer& c, KeyMemberType<T> key, ValueType<T> val) {
219 c.insert_or_assign(std::move(key), std::move(val));
224concept HasCacheInsertOrAssignFunction =
225 requires(
typename T::CacheContainer& c, ValueType<T> val, KeyMemberType<T> key) {
226 CacheInsertOrAssign(c, std::move(val), std::move(key));
230template <
typename T,
typename = USERVER_NAMESPACE::
utils::void_t<>>
231struct DataCacheContainer {
233 meta::IsStdHashable<KeyMemberType<T>>,
234 "With default CacheContainer, key type must be std::hash-able"
237 using type = std::unordered_map<KeyMemberType<T>, ValueType<T>>;
241struct DataCacheContainer<T, USERVER_NAMESPACE::
utils::void_t<
typename T::CacheContainer>> {
242 static_assert(HasSizeMethod<
typename T::CacheContainer>,
"Custom CacheContainer must provide `size` method");
244 HasInsertOrAssignMethod<T> || HasCacheInsertOrAssignFunction<T>,
245 "Custom CacheContainer must provide `insert_or_assign` method similar to std::unordered_map's "
246 "one or CacheInsertOrAssign function"
249 using type =
typename T::CacheContainer;
253using DataCacheContainerType =
typename DataCacheContainer<T>::type;
258inline constexpr bool kIsContainerCopiedByElement =
259 meta::kIsInstantiationOf<std::unordered_map, T> || meta::kIsInstantiationOf<std::map, T>;
262std::unique_ptr<T> CopyContainer(
264 [[maybe_unused]] std::size_t cpu_relax_iterations,
267 if constexpr (kIsContainerCopiedByElement<T>) {
268 auto copy = std::make_unique<T>();
269 if constexpr (meta::kIsReservable<T>) {
270 copy->reserve(container.size());
274 for (
const auto& kv : container) {
280 return std::make_unique<T>(container);
284template <
typename Container,
typename Value,
typename KeyMember,
typename... Args>
285void CacheInsertOrAssign(Container& container, Value&& value,
const KeyMember& key_member, Args&&... ) {
287 static_assert(
sizeof...(Args) == 0);
289 auto key = std::invoke(key_member, value);
290 container.insert_or_assign(std::move(key), std::forward<Value>(value));
294void OnWritesDone(T& container) {
295 if constexpr (
requires(T& c) { c.OnWritesDone(); }) {
296 container.OnWritesDone();
301concept HasCustomUpdated =
requires(
const DataCacheContainerType<T>& cache) { T::GetLastKnownUpdated(cache); };
304struct UpdatedFieldTypeImpl : std::type_identity<storages::
postgres::TimePointTz> {};
307concept HasUpdatedFieldType =
requires {
typename T::UpdatedFieldType; };
309template <HasUpdatedFieldType T>
310struct UpdatedFieldTypeImpl<T> : std::type_identity<
typename T::UpdatedFieldType> {};
313using UpdatedFieldType =
typename UpdatedFieldTypeImpl<T>::type;
316constexpr bool CheckUpdatedFieldType() {
317 if constexpr (HasUpdatedFieldType<T>) {
318#if USERVER_POSTGRES_ENABLE_LEGACY_TIMESTAMP
320 std::is_same_v<
typename T::UpdatedFieldType, storages::postgres::TimePointTz> ||
321 std::is_same_v<
typename T::UpdatedFieldType, storages::postgres::TimePointWithoutTz> ||
322 std::is_same_v<
typename T::UpdatedFieldType, storages::postgres::TimePoint> || HasCustomUpdated<T>,
323 "Invalid UpdatedFieldType, must be either TimePointTz or "
325 "or (legacy) system_clock::time_point"
329 std::is_same_v<
typename T::UpdatedFieldType, storages::
postgres::TimePointTz> ||
330 std::is_same_v<
typename T::UpdatedFieldType, storages::
postgres::TimePointWithoutTz> ||
332 "Invalid UpdatedFieldType, must be either TimePointTz or "
338 !WantIncrementalUpdates<T>,
339 "UpdatedFieldType must be explicitly specified when using "
340 "incremental updates"
348constexpr storages::
postgres::ClusterHostTypeFlags ClusterHostType() {
349 if constexpr (
requires { T::kClusterHostType; }) {
350 return T::kClusterHostType;
358constexpr bool MayReturnNull() {
359 if constexpr (
requires { T::kMayReturnNull; }) {
360 return T::kMayReturnNull;
366template <
typename PostgreCachePolicy>
367struct PolicyChecker {
369 static_assert(HasName<PostgreCachePolicy>,
"The PosgreSQL cache policy must contain a static member `kName`");
370 static_assert(HasValueType<PostgreCachePolicy>,
"The PosgreSQL cache policy must define a type alias `ValueType`");
372 HasKeyMember<PostgreCachePolicy>,
373 "The PostgreSQL cache policy must contain a static member `kKeyMember` "
374 "with a pointer to a data or a function member with the object's key"
377 HasQuery<PostgreCachePolicy> || HasGetQuery<PostgreCachePolicy>,
378 "The PosgreSQL cache policy must contain a static data member "
379 "`kQuery` with a select statement or a static member function "
380 "`GetQuery` returning the query"
383 !(HasQuery<PostgreCachePolicy> && HasGetQuery<PostgreCachePolicy>),
384 "The PosgreSQL cache policy must define `kQuery` or "
385 "`GetQuery`, not both"
388 HasUpdatedField<PostgreCachePolicy>,
389 "The PosgreSQL cache policy must contain a static member "
390 "`kUpdatedField`. If you don't want to use incremental updates, "
391 "please set its value to `nullptr`"
393 static_assert(CheckUpdatedFieldType<PostgreCachePolicy>());
396 ClusterHostType<PostgreCachePolicy>() & storages::
postgres::kClusterHostRolesMask,
397 "Cluster host role must be specified for caching component, "
398 "please be more specific"
401 static storages::
postgres::Query GetQuery() {
402 if constexpr (HasGetQuery<PostgreCachePolicy>) {
403 return PostgreCachePolicy::GetQuery();
405 return PostgreCachePolicy::kQuery;
412inline constexpr std::chrono::minutes kDefaultFullUpdateTimeout{1};
413inline constexpr std::chrono::seconds kDefaultIncrementalUpdateTimeout{1};
414inline constexpr std::chrono::milliseconds kStatementTimeoutOff{0};
415inline constexpr std::chrono::milliseconds kCpuRelaxThreshold{10};
416inline constexpr std::chrono::milliseconds kCpuRelaxInterval{2};
418inline constexpr std::string_view kCopyStage =
"copy_data";
419inline constexpr std::string_view kFetchStage =
"fetch";
420inline constexpr std::string_view kParseStage =
"parse";
422inline constexpr std::size_t kDefaultChunkSize = 1000;
423inline constexpr std::chrono::milliseconds kDefaultSleepBetweenChunks{0};
431template <
typename PostgreCachePolicy>
432class PostgreCache
final :
public pg_cache::detail::PolicyChecker<PostgreCachePolicy>::BaseType {
435 using PolicyType = PostgreCachePolicy;
436 using ValueType = pg_cache::detail::ValueType<PolicyType>;
437 using RawValueType = pg_cache::detail::RawValueType<PolicyType>;
438 using DataType = pg_cache::detail::DataCacheContainerType<PolicyType>;
439 using PolicyCheckerType = pg_cache::detail::PolicyChecker<PostgreCachePolicy>;
440 using UpdatedFieldType = pg_cache::detail::UpdatedFieldType<PostgreCachePolicy>;
441 using BaseType =
typename PolicyCheckerType::BaseType;
444 constexpr static bool kIncrementalUpdates = pg_cache::detail::WantIncrementalUpdates<PolicyType>;
445 constexpr static auto kClusterHostTypeFlags = pg_cache::detail::ClusterHostType<PolicyType>();
446 constexpr static auto kName = PolicyType::kName;
448 PostgreCache(
const ComponentConfig&,
const ComponentContext&);
450 static yaml_config::Schema GetStaticConfigSchema();
453 using CachedData = std::unique_ptr<DataType>;
455 UpdatedFieldType GetLastUpdated(std::chrono::system_clock::time_point last_update,
const DataType& cache)
const;
459 const std::chrono::system_clock::time_point& last_update,
460 const std::chrono::system_clock::time_point& now,
461 cache::UpdateStatisticsScope& stats_scope
464 bool MayReturnNull()
const override;
469 CachedData& data_cache,
470 cache::UpdateStatisticsScope& stats_scope,
474 static storages::
postgres::Query GetAllQuery();
475 static storages::
postgres::Query GetDeltaQuery();
476 static std::string GetWhereClause();
477 static std::string GetDeltaWhereClause();
478 static std::string GetOrderByClause();
480 std::chrono::milliseconds ParseCorrection(
const ComponentConfig& config);
482 std::vector<storages::
postgres::ClusterPtr> clusters_;
484 const std::chrono::system_clock::duration correction_;
485 const std::chrono::milliseconds full_update_timeout_;
486 const std::chrono::milliseconds incremental_update_timeout_;
487 const std::size_t chunk_size_;
488 const std::chrono::milliseconds sleep_between_chunks_;
489 std::size_t cpu_relax_iterations_parse_{0};
490 std::size_t cpu_relax_iterations_copy_{0};
493template <
typename PostgreCachePolicy>
494inline constexpr bool kHasValidate<PostgreCache<PostgreCachePolicy>> =
true;
496template <
typename PostgreCachePolicy>
497PostgreCache<PostgreCachePolicy>::PostgreCache(
const ComponentConfig& config,
const ComponentContext& context)
498 : BaseType{config, context},
499 correction_{ParseCorrection(config)},
500 full_update_timeout_{
501 config
["full-update-op-timeout"].As<std
::chrono
::milliseconds
>(pg_cache::detail::kDefaultFullUpdateTimeout
)
503 incremental_update_timeout_{
504 config
["incremental-update-op-timeout"]
505 .As<std
::chrono
::milliseconds
>(pg_cache::detail::kDefaultIncrementalUpdateTimeout
)
507 chunk_size_{config
["chunk-size"].As<size_t
>(pg_cache::detail::kDefaultChunkSize
)},
508 sleep_between_chunks_{
509 config
["sleep-between-chunks"].As<std
::chrono
::milliseconds
>(pg_cache::detail::kDefaultSleepBetweenChunks
)
514 "Either set 'chunk-size' to 0, or enable PostgreSQL portals by building "
515 "the framework with CMake option USERVER_FEATURE_PATCH_LIBPQ set to ON."
519 throw std::logic_error(
520 "Incremental update support is requested in config but no update field "
521 "name is specified in traits of '" +
525 if (correction_.count() < 0) {
526 throw std::logic_error(
527 "Refusing to set forward (negative) update correction requested in "
533 const auto pg_alias = config
["pgcomponent"].As<std
::string
>("");
534 if (pg_alias.empty()) {
539 clusters_.resize(shard_count);
540 for (size_t i = 0; i < shard_count; ++i) {
549template <
typename PostgreCachePolicy>
550std::string PostgreCache<PostgreCachePolicy>::GetWhereClause() {
551 if constexpr (pg_cache::detail::HasWhere<PostgreCachePolicy>) {
552 return fmt::format(FMT_COMPILE(
"where {}"), PostgreCachePolicy::kWhere);
558template <
typename PostgreCachePolicy>
559std::string PostgreCache<PostgreCachePolicy>::GetDeltaWhereClause() {
560 if constexpr (pg_cache::detail::HasWhere<PostgreCachePolicy>) {
562 FMT_COMPILE(
"where ({}) and {} >= $1"),
563 PostgreCachePolicy::kWhere,
564 PostgreCachePolicy::kUpdatedField
567 return fmt::format(FMT_COMPILE(
"where {} >= $1"), PostgreCachePolicy::kUpdatedField);
571template <
typename PostgreCachePolicy>
572std::string PostgreCache<PostgreCachePolicy>::GetOrderByClause() {
573 if constexpr (pg_cache::detail::HasOrderBy<PostgreCachePolicy>) {
574 return fmt::format(FMT_COMPILE(
"order by {}"), PostgreCachePolicy::kOrderBy);
580template <
typename PostgreCachePolicy>
581storages::
postgres::Query PostgreCache<PostgreCachePolicy>::GetAllQuery() {
582 const storages::
postgres::Query query = PolicyCheckerType::GetQuery();
583 return fmt::format(
"{} {} {}", query
.GetStatementView(), GetWhereClause(), GetOrderByClause());
586template <
typename PostgreCachePolicy>
587storages::
postgres::Query PostgreCache<PostgreCachePolicy>::GetDeltaQuery() {
588 if constexpr (kIncrementalUpdates) {
589 const storages::
postgres::Query query = PolicyCheckerType::GetQuery();
591 fmt::format(
"{} {} {}", query
.GetStatementView(), GetDeltaWhereClause(), GetOrderByClause()),
595 return GetAllQuery();
599template <
typename PostgreCachePolicy>
600std::chrono::milliseconds PostgreCache<PostgreCachePolicy>::ParseCorrection(
const ComponentConfig& config) {
601 static constexpr std::string_view kUpdateCorrection =
"update-correction";
602 if (pg_cache::detail::HasCustomUpdated<PostgreCachePolicy> ||
605 return config
[kUpdateCorrection
].As<std
::chrono
::milliseconds
>(0
);
607 return config
[kUpdateCorrection
].As<std
::chrono
::milliseconds
>();
611template <
typename PostgreCachePolicy>
612typename PostgreCache<PostgreCachePolicy>::UpdatedFieldType PostgreCache<PostgreCachePolicy>::GetLastUpdated(
613 [[maybe_unused]] std::chrono::system_clock::time_point last_update,
614 const DataType& cache
616 if constexpr (pg_cache::detail::HasCustomUpdated<PostgreCachePolicy>) {
617 return PostgreCachePolicy::GetLastKnownUpdated(cache);
619 return UpdatedFieldType{last_update - correction_};
623template <
typename PostgreCachePolicy>
624void PostgreCache<PostgreCachePolicy>::Update(
626 const std::chrono::system_clock::time_point& last_update,
627 const std::chrono::system_clock::time_point& ,
628 cache::UpdateStatisticsScope& stats_scope
631 if constexpr (!kIncrementalUpdates) {
635 const std::chrono::milliseconds
640 auto data_cache = GetDataSnapshot(type, scope);
641 [[maybe_unused]]
const auto old_size = data_cache->size();
643 scope
.Reset(std::string{pg_cache::detail::kFetchStage}
);
647 for (
auto& cluster : clusters_) {
648 if (chunk_size_ > 0) {
649 auto trx = cluster->Begin(
650 kClusterHostTypeFlags,
652 pg::
CommandControl{timeout, pg_cache::detail::kStatementTimeoutOff}
654 auto portal = trx.MakePortal(query, GetLastUpdated(last_update, *data_cache));
656 scope
.Reset(std::string{pg_cache::detail::kFetchStage}
);
657 auto res = portal.Fetch(chunk_size_);
660 scope
.Reset(std::string{pg_cache::detail::kParseStage}
);
661 CacheResults(res, data_cache, stats_scope, scope);
662 changes += res.Size();
663 if (sleep_between_chunks_.count() > 0) {
673 kClusterHostTypeFlags,
674 pg::
CommandControl{timeout, pg_cache::detail::kStatementTimeoutOff},
676 GetLastUpdated(last_update, *data_cache)
679 kClusterHostTypeFlags,
680 pg::
CommandControl{timeout, pg_cache::detail::kStatementTimeoutOff},
685 scope
.Reset(std::string{pg_cache::detail::kParseStage}
);
686 CacheResults(res, data_cache, stats_scope, scope);
687 changes += res.Size();
693 if constexpr (pg_cache::detail::kIsContainerCopiedByElement<DataType>) {
695 const auto elapsed_copy = scope
.ElapsedTotal(std::string{pg_cache::detail::kCopyStage}
);
696 if (elapsed_copy > pg_cache::detail::kCpuRelaxThreshold) {
697 cpu_relax_iterations_copy_ =
static_cast<
698 std::size_t>(
static_cast<
double>(old_size) / (elapsed_copy / pg_cache::detail::kCpuRelaxInterval));
700 <<
"Elapsed time for copying " << kName <<
" " << elapsed_copy.count() <<
" for " << changes
701 <<
" data items is over threshold. Will relax CPU every " << cpu_relax_iterations_parse_
708 const auto elapsed_parse = scope
.ElapsedTotal(std::string{pg_cache::detail::kParseStage}
);
709 if (elapsed_parse > pg_cache::detail::kCpuRelaxThreshold) {
710 cpu_relax_iterations_parse_ =
static_cast<
711 std::size_t>(
static_cast<
double>(changes) / (elapsed_parse / pg_cache::detail::kCpuRelaxInterval));
713 <<
"Elapsed time for parsing " << kName <<
" " << elapsed_parse.count() <<
" for " << changes
714 <<
" data items is over threshold. Will relax CPU every " << cpu_relax_iterations_parse_
720 pg_cache::detail::OnWritesDone(*data_cache);
722 this->Set(std::move(data_cache));
728template <
typename PostgreCachePolicy>
729bool PostgreCache<PostgreCachePolicy>::MayReturnNull()
const {
730 return pg_cache::detail::MayReturnNull<PolicyType>();
733template <
typename PostgreCachePolicy>
734void PostgreCache<PostgreCachePolicy>::CacheResults(
736 CachedData& data_cache,
737 cache::UpdateStatisticsScope& stats_scope,
740 auto values = res.AsSetOf<RawValueType>(pg_cache::detail::GetTag<PostgreCachePolicy>());
742 for (
auto p = values.begin(); p != values.end(); ++p) {
745 using pg_cache::detail::CacheInsertOrAssign;
748 pg_cache::detail::ExtractValue<PostgreCachePolicy>(*p),
749 PostgreCachePolicy::kKeyMember
751 }
catch (
const std::exception& e) {
754 <<
"Error parsing data row in cache '" << kName <<
"' to '" <<
compiler::GetTypeName<ValueType>()
755 <<
"': " << e.what();
760template <
typename PostgreCachePolicy>
761typename PostgreCache<PostgreCachePolicy>::CachedData PostgreCache<
764 auto data =
this->Get();
766 return pg_cache::detail::CopyContainer(*data, cpu_relax_iterations_copy_, scope);
769 return std::make_unique<DataType>();
774std::string GetPostgreCacheSchema();
778template <
typename PostgreCachePolicy>
779yaml_config::Schema PostgreCache<PostgreCachePolicy>::GetStaticConfigSchema() {
780 using ParentType =
typename pg_cache::detail::PolicyChecker<PostgreCachePolicy>::BaseType;
781 return yaml_config::MergeSchemas<ParentType>(impl::GetPostgreCacheSchema());
786namespace utils::impl::projected_set {
788template <
typename Set,
typename Value,
typename KeyMember>
789void CacheInsertOrAssign(Set& set, Value&& value,
const KeyMember& ) {
790 DoInsert(set, std::forward<Value>(value));