6#include <userver/cache/base_postgres_cache_fwd.hpp>
12#include <unordered_map>
14#include <fmt/format.h>
16#include <userver/cache/cache_statistics.hpp>
17#include <userver/cache/caching_component_base.hpp>
18#include <userver/components/component_config.hpp>
19#include <userver/components/component_context.hpp>
21#include <userver/storages/postgres/cluster.hpp>
22#include <userver/storages/postgres/component.hpp>
23#include <userver/storages/postgres/io/chrono.hpp>
25#include <userver/compiler/demangle.hpp>
26#include <userver/engine/sleep.hpp>
27#include <userver/logging/log.hpp>
28#include <userver/tracing/span.hpp>
29#include <userver/utils/assert.hpp>
30#include <userver/utils/cpu_relax.hpp>
31#include <userver/utils/meta.hpp>
32#include <userver/yaml_config/merge_schemas.hpp>
34USERVER_NAMESPACE_BEGIN
38namespace pg_cache::detail {
41using ValueType =
typename T::ValueType;
43concept HasValueType =
requires {
typename T::ValueType; };
46struct RawValueTypeImpl : std::type_identity<ValueType<T>> {};
49concept HasRawValueType =
requires {
typename T::RawValueType; };
51template <HasRawValueType T>
52struct RawValueTypeImpl<T> : std::type_identity<
typename T::RawValueType> {};
55using RawValueType =
typename RawValueTypeImpl<T>::type;
57template <
typename PostgreCachePolicy>
58auto ExtractValue(RawValueType<PostgreCachePolicy>&& raw) {
59 if constexpr (HasRawValueType<PostgreCachePolicy>) {
60 return Convert(std::move(raw), formats::
parse::To<ValueType<PostgreCachePolicy>>());
62 return std::move(raw);
68concept HasName =
requires {
69 requires !std::string_view { T::kName }
75concept HasQuery =
requires { T::kQuery; };
79concept HasGetQuery =
requires { T::GetQuery(); };
83concept HasWhere =
requires { T::kWhere; };
87concept HasOrderBy =
requires { T::kOrderBy; };
91concept HasTag =
requires { T::kTag; };
93template <
typename PostgreCachePolicy>
95 if constexpr (HasTag<PostgreCachePolicy>) {
96 return PostgreCachePolicy::kTag;
104concept HasUpdatedField =
requires { T::kUpdatedField; };
107concept WantIncrementalUpdates =
requires {
109 std::integral_constant<
bool, !std::string_view{T::kUpdatedField}.empty()>{}
110 } -> std::same_as<std::true_type>;
115concept HasKeyMember =
requires { T::kKeyMember; } && std::invocable<
decltype(T::kKeyMember), ValueType<T>>;
118using KeyMemberType = std::decay_t<std::invoke_result_t<
decltype(T::kKeyMember), ValueType<T>>>;
122concept HasSizeMethod =
requires(T t) {
125 } -> std::convertible_to<std::size_t>;
130concept HasInsertOrAssignMethod =
requires(
typename T::CacheContainer& c, KeyMemberType<T> key, ValueType<T> val) {
131 c.insert_or_assign(std::move(key), std::move(val));
136concept HasCacheInsertOrAssignFunction =
137 requires(
typename T::CacheContainer& c, ValueType<T> val, KeyMemberType<T> key) {
138 CacheInsertOrAssign(c, std::move(val), std::move(key));
142concept PolicyHasCustomCacheContainer =
requires {
typename T::CacheContainer; };
146struct DataCacheContainer {
148 meta::IsStdHashable<KeyMemberType<T>>,
149 "With default CacheContainer, key type must be std::hash-able"
152 using type = std::unordered_map<KeyMemberType<T>, ValueType<T>>;
156requires PolicyHasCustomCacheContainer<T>
157struct DataCacheContainer<T> {
158 static_assert(HasSizeMethod<
typename T::CacheContainer>,
"Custom CacheContainer must provide `size` method");
160 HasInsertOrAssignMethod<T> || HasCacheInsertOrAssignFunction<T>,
161 "Custom CacheContainer must provide `insert_or_assign` method similar to std::unordered_map's "
162 "one or CacheInsertOrAssign function"
165 using type =
typename T::CacheContainer;
169using DataCacheContainerType =
typename DataCacheContainer<T>::type;
174inline constexpr bool kIsContainerCopiedByElement =
175 meta::kIsInstantiationOf<std::unordered_map, T> || meta::kIsInstantiationOf<std::map, T>;
178std::unique_ptr<T> CopyContainer(
180 [[maybe_unused]] std::size_t cpu_relax_iterations,
183 if constexpr (kIsContainerCopiedByElement<T>) {
184 auto copy = std::make_unique<T>();
185 if constexpr (meta::kIsReservable<T>) {
186 copy->reserve(container.size());
190 for (
const auto& kv : container) {
196 return std::make_unique<T>(container);
200template <
typename Container,
typename Value,
typename KeyMember,
typename... Args>
201void CacheInsertOrAssign(Container& container, Value&& value,
const KeyMember& key_member, Args&&... ) {
203 static_assert(
sizeof...(Args) == 0);
205 auto key = std::invoke(key_member, value);
206 container.insert_or_assign(std::move(key), std::forward<Value>(value));
210void OnWritesDone(T& container) {
211 if constexpr (
requires(T& c) { c.OnWritesDone(); }) {
212 container.OnWritesDone();
217concept HasCustomUpdated =
requires(
const DataCacheContainerType<T>& cache) { T::GetLastKnownUpdated(cache); };
220struct UpdatedFieldTypeImpl : std::type_identity<storages::
postgres::TimePointTz> {};
223concept HasUpdatedFieldType =
requires {
typename T::UpdatedFieldType; };
225template <HasUpdatedFieldType T>
226struct UpdatedFieldTypeImpl<T> : std::type_identity<
typename T::UpdatedFieldType> {};
229using UpdatedFieldType =
typename UpdatedFieldTypeImpl<T>::type;
232constexpr bool CheckUpdatedFieldType() {
233 if constexpr (HasUpdatedFieldType<T>) {
234#if USERVER_POSTGRES_ENABLE_LEGACY_TIMESTAMP
236 std::is_same_v<
typename T::UpdatedFieldType, storages::postgres::TimePointTz> ||
237 std::is_same_v<
typename T::UpdatedFieldType, storages::postgres::TimePointWithoutTz> ||
238 std::is_same_v<
typename T::UpdatedFieldType, storages::postgres::TimePoint> || HasCustomUpdated<T>,
239 "Invalid UpdatedFieldType, must be either TimePointTz or "
241 "or (legacy) system_clock::time_point"
245 std::is_same_v<
typename T::UpdatedFieldType, storages::
postgres::TimePointTz> ||
246 std::is_same_v<
typename T::UpdatedFieldType, storages::
postgres::TimePointWithoutTz> ||
248 "Invalid UpdatedFieldType, must be either TimePointTz or "
254 !WantIncrementalUpdates<T>,
255 "UpdatedFieldType must be explicitly specified when using "
256 "incremental updates"
264constexpr storages::
postgres::ClusterHostTypeFlags ClusterHostType() {
265 if constexpr (
requires { T::kClusterHostType; }) {
266 return T::kClusterHostType;
274constexpr bool MayReturnNull() {
275 if constexpr (
requires { T::kMayReturnNull; }) {
276 return T::kMayReturnNull;
282template <
typename PostgreCachePolicy>
283struct PolicyChecker {
285 static_assert(HasName<PostgreCachePolicy>,
"The PosgreSQL cache policy must contain a static member `kName`");
286 static_assert(HasValueType<PostgreCachePolicy>,
"The PosgreSQL cache policy must define a type alias `ValueType`");
288 HasKeyMember<PostgreCachePolicy>,
289 "The PostgreSQL cache policy must contain a static member `kKeyMember` "
290 "with a pointer to a data or a function member with the object's key"
293 HasQuery<PostgreCachePolicy> || HasGetQuery<PostgreCachePolicy>,
294 "The PosgreSQL cache policy must contain a static data member "
295 "`kQuery` with a select statement or a static member function "
296 "`GetQuery` returning the query"
299 !(HasQuery<PostgreCachePolicy> && HasGetQuery<PostgreCachePolicy>),
300 "The PosgreSQL cache policy must define `kQuery` or "
301 "`GetQuery`, not both"
304 HasUpdatedField<PostgreCachePolicy>,
305 "The PosgreSQL cache policy must contain a static member "
306 "`kUpdatedField`. If you don't want to use incremental updates, "
307 "please set its value to `nullptr`"
309 static_assert(CheckUpdatedFieldType<PostgreCachePolicy>());
312 ClusterHostType<PostgreCachePolicy>() & storages::
postgres::kClusterHostRolesMask,
313 "Cluster host role must be specified for caching component, "
314 "please be more specific"
317 static storages::
postgres::Query GetQuery() {
318 if constexpr (HasGetQuery<PostgreCachePolicy>) {
319 return PostgreCachePolicy::GetQuery();
321 return PostgreCachePolicy::kQuery;
328inline constexpr std::chrono::minutes kDefaultFullUpdateTimeout{1};
329inline constexpr std::chrono::seconds kDefaultIncrementalUpdateTimeout{1};
330inline constexpr std::chrono::milliseconds kStatementTimeoutOff{0};
331inline constexpr std::chrono::milliseconds kCpuRelaxThreshold{10};
332inline constexpr std::chrono::milliseconds kCpuRelaxInterval{2};
334inline constexpr std::string_view kCopyStage =
"copy_data";
335inline constexpr std::string_view kFetchStage =
"fetch";
336inline constexpr std::string_view kParseStage =
"parse";
338inline constexpr std::size_t kDefaultChunkSize = 1000;
339inline constexpr std::chrono::milliseconds kDefaultSleepBetweenChunks{0};
347template <
typename PostgreCachePolicy>
348class PostgreCache
final :
public pg_cache::detail::PolicyChecker<PostgreCachePolicy>::BaseType {
351 using PolicyType = PostgreCachePolicy;
352 using ValueType = pg_cache::detail::ValueType<PolicyType>;
353 using RawValueType = pg_cache::detail::RawValueType<PolicyType>;
354 using DataType = pg_cache::detail::DataCacheContainerType<PolicyType>;
355 using PolicyCheckerType = pg_cache::detail::PolicyChecker<PostgreCachePolicy>;
356 using UpdatedFieldType = pg_cache::detail::UpdatedFieldType<PostgreCachePolicy>;
357 using BaseType =
typename PolicyCheckerType::BaseType;
360 constexpr static bool kIncrementalUpdates = pg_cache::detail::WantIncrementalUpdates<PolicyType>;
361 constexpr static auto kClusterHostTypeFlags = pg_cache::detail::ClusterHostType<PolicyType>();
362 constexpr static auto kName = PolicyType::kName;
364 PostgreCache(
const ComponentConfig&,
const ComponentContext&);
366 static yaml_config::Schema GetStaticConfigSchema();
369 using CachedData = std::unique_ptr<DataType>;
371 UpdatedFieldType GetLastUpdated(std::chrono::system_clock::time_point last_update,
const DataType& cache)
const;
375 const std::chrono::system_clock::time_point& last_update,
376 const std::chrono::system_clock::time_point& now,
377 cache::UpdateStatisticsScope& stats_scope
380 bool MayReturnNull()
const override;
385 CachedData& data_cache,
386 cache::UpdateStatisticsScope& stats_scope,
390 static storages::
postgres::Query GetAllQuery();
391 static storages::
postgres::Query GetDeltaQuery();
392 static std::string GetWhereClause();
393 static std::string GetDeltaWhereClause();
394 static std::string GetOrderByClause();
396 std::chrono::milliseconds ParseCorrection(
const ComponentConfig& config);
398 std::vector<storages::
postgres::ClusterPtr> clusters_;
400 const std::chrono::system_clock::duration correction_;
401 const std::chrono::milliseconds full_update_timeout_;
402 const std::chrono::milliseconds incremental_update_timeout_;
403 const std::size_t chunk_size_;
404 const std::chrono::milliseconds sleep_between_chunks_;
405 std::size_t cpu_relax_iterations_parse_{0};
406 std::size_t cpu_relax_iterations_copy_{0};
409template <
typename PostgreCachePolicy>
410inline constexpr bool kHasValidate<PostgreCache<PostgreCachePolicy>> =
true;
412template <
typename PostgreCachePolicy>
413PostgreCache<PostgreCachePolicy>::PostgreCache(
const ComponentConfig& config,
const ComponentContext& context)
414 : BaseType{config, context},
415 correction_{ParseCorrection(config)},
416 full_update_timeout_{
417 config
["full-update-op-timeout"].As<std
::chrono
::milliseconds
>(pg_cache::detail::kDefaultFullUpdateTimeout
)
419 incremental_update_timeout_{
420 config
["incremental-update-op-timeout"]
421 .As<std
::chrono
::milliseconds
>(pg_cache::detail::kDefaultIncrementalUpdateTimeout
)
423 chunk_size_{config
["chunk-size"].As<size_t
>(pg_cache::detail::kDefaultChunkSize
)},
424 sleep_between_chunks_{
425 config
["sleep-between-chunks"].As<std
::chrono
::milliseconds
>(pg_cache::detail::kDefaultSleepBetweenChunks
)
430 "Either set 'chunk-size' to 0, or enable PostgreSQL portals by building "
431 "the framework with CMake option USERVER_FEATURE_PATCH_LIBPQ set to ON."
435 throw std::logic_error(
436 "Incremental update support is requested in config but no update field "
437 "name is specified in traits of '" +
441 if (correction_.count() < 0) {
442 throw std::logic_error(
443 "Refusing to set forward (negative) update correction requested in "
449 const auto pg_alias = config
["pgcomponent"].As<std
::string
>("");
450 if (pg_alias.empty()) {
455 clusters_.resize(shard_count);
456 for (size_t i = 0; i < shard_count; ++i) {
465template <
typename PostgreCachePolicy>
466std::string PostgreCache<PostgreCachePolicy>::GetWhereClause() {
467 if constexpr (pg_cache::detail::HasWhere<PostgreCachePolicy>) {
468 return fmt::format(FMT_COMPILE(
"where {}"), PostgreCachePolicy::kWhere);
474template <
typename PostgreCachePolicy>
475std::string PostgreCache<PostgreCachePolicy>::GetDeltaWhereClause() {
476 if constexpr (pg_cache::detail::HasWhere<PostgreCachePolicy>) {
478 FMT_COMPILE(
"where ({}) and {} >= $1"),
479 PostgreCachePolicy::kWhere,
480 PostgreCachePolicy::kUpdatedField
483 return fmt::format(FMT_COMPILE(
"where {} >= $1"), PostgreCachePolicy::kUpdatedField);
487template <
typename PostgreCachePolicy>
488std::string PostgreCache<PostgreCachePolicy>::GetOrderByClause() {
489 if constexpr (pg_cache::detail::HasOrderBy<PostgreCachePolicy>) {
490 return fmt::format(FMT_COMPILE(
"order by {}"), PostgreCachePolicy::kOrderBy);
496template <
typename PostgreCachePolicy>
497storages::
postgres::Query PostgreCache<PostgreCachePolicy>::GetAllQuery() {
498 const storages::
postgres::Query query = PolicyCheckerType::GetQuery();
499 return fmt::format(
"{} {} {}", query
.GetStatementView(), GetWhereClause(), GetOrderByClause());
502template <
typename PostgreCachePolicy>
503storages::
postgres::Query PostgreCache<PostgreCachePolicy>::GetDeltaQuery() {
504 if constexpr (kIncrementalUpdates) {
505 const storages::
postgres::Query query = PolicyCheckerType::GetQuery();
507 fmt::format(
"{} {} {}", query
.GetStatementView(), GetDeltaWhereClause(), GetOrderByClause()),
511 return GetAllQuery();
515template <
typename PostgreCachePolicy>
516std::chrono::milliseconds PostgreCache<PostgreCachePolicy>::ParseCorrection(
const ComponentConfig& config) {
517 static constexpr std::string_view kUpdateCorrection =
"update-correction";
518 if (pg_cache::detail::HasCustomUpdated<PostgreCachePolicy> ||
521 return config
[kUpdateCorrection
].As<std
::chrono
::milliseconds
>(0
);
523 return config
[kUpdateCorrection
].As<std
::chrono
::milliseconds
>();
527template <
typename PostgreCachePolicy>
528typename PostgreCache<PostgreCachePolicy>::UpdatedFieldType PostgreCache<PostgreCachePolicy>::GetLastUpdated(
529 [[maybe_unused]] std::chrono::system_clock::time_point last_update,
530 const DataType& cache
532 if constexpr (pg_cache::detail::HasCustomUpdated<PostgreCachePolicy>) {
533 return PostgreCachePolicy::GetLastKnownUpdated(cache);
535 return UpdatedFieldType{last_update - correction_};
539template <
typename PostgreCachePolicy>
540void PostgreCache<PostgreCachePolicy>::Update(
542 const std::chrono::system_clock::time_point& last_update,
543 const std::chrono::system_clock::time_point& ,
544 cache::UpdateStatisticsScope& stats_scope
547 if constexpr (!kIncrementalUpdates) {
551 const std::chrono::milliseconds
556 auto data_cache = GetDataSnapshot(type, scope);
557 [[maybe_unused]]
const auto old_size = data_cache->size();
559 scope
.Reset(std::string{pg_cache::detail::kFetchStage}
);
563 for (
auto& cluster : clusters_) {
564 if (chunk_size_ > 0) {
565 auto trx = cluster->Begin(
566 kClusterHostTypeFlags,
568 pg::
CommandControl{timeout, pg_cache::detail::kStatementTimeoutOff}
570 auto portal = trx.MakePortal(query, GetLastUpdated(last_update, *data_cache));
572 scope
.Reset(std::string{pg_cache::detail::kFetchStage}
);
573 auto res = portal.Fetch(chunk_size_);
576 scope
.Reset(std::string{pg_cache::detail::kParseStage}
);
577 CacheResults(res, data_cache, stats_scope, scope);
578 changes += res.Size();
579 if (sleep_between_chunks_.count() > 0) {
589 kClusterHostTypeFlags,
590 pg::
CommandControl{timeout, pg_cache::detail::kStatementTimeoutOff},
592 GetLastUpdated(last_update, *data_cache)
595 kClusterHostTypeFlags,
596 pg::
CommandControl{timeout, pg_cache::detail::kStatementTimeoutOff},
601 scope
.Reset(std::string{pg_cache::detail::kParseStage}
);
602 CacheResults(res, data_cache, stats_scope, scope);
603 changes += res.Size();
609 if constexpr (pg_cache::detail::kIsContainerCopiedByElement<DataType>) {
611 const auto elapsed_copy = scope
.ElapsedTotal(std::string{pg_cache::detail::kCopyStage}
);
612 if (elapsed_copy > pg_cache::detail::kCpuRelaxThreshold) {
613 cpu_relax_iterations_copy_ =
static_cast<
614 std::size_t>(
static_cast<
double>(old_size) / (elapsed_copy / pg_cache::detail::kCpuRelaxInterval));
616 <<
"Elapsed time for copying " << kName <<
" " << elapsed_copy.count() <<
" for " << changes
617 <<
" data items is over threshold. Will relax CPU every " << cpu_relax_iterations_parse_
624 const auto elapsed_parse = scope
.ElapsedTotal(std::string{pg_cache::detail::kParseStage}
);
625 if (elapsed_parse > pg_cache::detail::kCpuRelaxThreshold) {
626 cpu_relax_iterations_parse_ =
static_cast<
627 std::size_t>(
static_cast<
double>(changes) / (elapsed_parse / pg_cache::detail::kCpuRelaxInterval));
629 <<
"Elapsed time for parsing " << kName <<
" " << elapsed_parse.count() <<
" for " << changes
630 <<
" data items is over threshold. Will relax CPU every " << cpu_relax_iterations_parse_
636 pg_cache::detail::OnWritesDone(*data_cache);
638 this->Set(std::move(data_cache));
644template <
typename PostgreCachePolicy>
645bool PostgreCache<PostgreCachePolicy>::MayReturnNull()
const {
646 return pg_cache::detail::MayReturnNull<PolicyType>();
649template <
typename PostgreCachePolicy>
650void PostgreCache<PostgreCachePolicy>::CacheResults(
652 CachedData& data_cache,
653 cache::UpdateStatisticsScope& stats_scope,
656 auto values = res.AsSetOf<RawValueType>(pg_cache::detail::GetTag<PostgreCachePolicy>());
658 for (
auto p = values.begin(); p != values.end(); ++p) {
661 using pg_cache::detail::CacheInsertOrAssign;
664 pg_cache::detail::ExtractValue<PostgreCachePolicy>(*p),
665 PostgreCachePolicy::kKeyMember
667 }
catch (
const std::exception& e) {
670 <<
"Error parsing data row in cache '" << kName <<
"' to '" <<
compiler::GetTypeName<ValueType>()
671 <<
"': " << e.what();
676template <
typename PostgreCachePolicy>
677typename PostgreCache<PostgreCachePolicy>::CachedData PostgreCache<
680 auto data =
this->Get();
682 return pg_cache::detail::CopyContainer(*data, cpu_relax_iterations_copy_, scope);
685 return std::make_unique<DataType>();
690std::string GetPostgreCacheSchema();
694template <
typename PostgreCachePolicy>
695yaml_config::Schema PostgreCache<PostgreCachePolicy>::GetStaticConfigSchema() {
696 using ParentType =
typename pg_cache::detail::PolicyChecker<PostgreCachePolicy>::BaseType;
697 return yaml_config::MergeSchemas<ParentType>(impl::GetPostgreCacheSchema());
702namespace utils::impl::projected_set {
704template <
typename Set,
typename Value,
typename KeyMember>
705void CacheInsertOrAssign(Set& set, Value&& value,
const KeyMember& ) {
706 DoInsert(set, std::forward<Value>(value));