6#include <userver/cache/base_postgres_cache_fwd.hpp>
12#include <unordered_map>
14#include <fmt/format.h>
16#include <userver/cache/cache_statistics.hpp>
17#include <userver/cache/caching_component_base.hpp>
18#include <userver/components/component_config.hpp>
19#include <userver/components/component_context.hpp>
21#include <userver/storages/postgres/cluster.hpp>
22#include <userver/storages/postgres/component.hpp>
23#include <userver/storages/postgres/io/chrono.hpp>
25#include <userver/compiler/demangle.hpp>
26#include <userver/logging/log.hpp>
27#include <userver/tracing/span.hpp>
28#include <userver/utils/assert.hpp>
29#include <userver/utils/cpu_relax.hpp>
30#include <userver/utils/meta.hpp>
31#include <userver/utils/void_t.hpp>
32#include <userver/yaml_config/merge_schemas.hpp>
34USERVER_NAMESPACE_BEGIN
116namespace pg_cache::detail {
119using ValueType =
typename T::ValueType;
121inline constexpr bool kHasValueType = meta::kIsDetected<ValueType, T>;
124using RawValueTypeImpl =
typename T::RawValueType;
126inline constexpr bool kHasRawValueType = meta::kIsDetected<RawValueTypeImpl, T>;
128using RawValueType = meta::DetectedOr<ValueType<T>, RawValueTypeImpl, T>;
130template <
typename PostgreCachePolicy>
131auto ExtractValue(RawValueType<PostgreCachePolicy>&& raw) {
132 if constexpr (kHasRawValueType<PostgreCachePolicy>) {
133 return Convert(std::move(raw),
134 formats::parse::To<ValueType<PostgreCachePolicy>>());
136 return std::move(raw);
142using HasNameImpl = std::enable_if_t<!std::string_view{T::kName}.empty()>;
144inline constexpr bool kHasName = meta::kIsDetected<HasNameImpl, T>;
148using HasQueryImpl =
decltype(T::kQuery);
150inline constexpr bool kHasQuery = meta::kIsDetected<HasQueryImpl, T>;
154using HasGetQueryImpl =
decltype(T::GetQuery());
156inline constexpr bool kHasGetQuery = meta::kIsDetected<HasGetQueryImpl, T>;
160using HasWhere =
decltype(T::kWhere);
162inline constexpr bool kHasWhere = meta::kIsDetected<HasWhere, T>;
166using HasUpdatedField =
decltype(T::kUpdatedField);
168inline constexpr bool kHasUpdatedField = meta::kIsDetected<HasUpdatedField, T>;
171using WantIncrementalUpdates =
172 std::enable_if_t<!std::string_view{T::kUpdatedField}.empty()>;
174inline constexpr bool kWantIncrementalUpdates =
175 meta::kIsDetected<WantIncrementalUpdates, T>;
179using KeyMemberTypeImpl =
180 std::decay_t<std::invoke_result_t<
decltype(T::kKeyMember), ValueType<T>>>;
182inline constexpr bool kHasKeyMember = meta::kIsDetected<KeyMemberTypeImpl, T>;
184using KeyMemberType = meta::DetectedType<KeyMemberTypeImpl, T>;
187template <
typename T,
typename = USERVER_NAMESPACE::utils::void_t<>>
188struct DataCacheContainer {
189 static_assert(meta::kIsStdHashable<KeyMemberType<T>>,
190 "With default CacheContainer, key type must be std::hash-able");
192 using type = std::unordered_map<KeyMemberType<T>, ValueType<T>>;
196struct DataCacheContainer<
197 T, USERVER_NAMESPACE::utils::void_t<
typename T::CacheContainer>> {
198 using type =
typename T::CacheContainer;
202using DataCacheContainerType =
typename DataCacheContainer<T>::type;
207inline constexpr bool kIsContainerCopiedByElement =
208 meta::kIsInstantiationOf<std::unordered_map, T> ||
209 meta::kIsInstantiationOf<std::map, T>;
212std::unique_ptr<T> CopyContainer(
213 const T& container, [[maybe_unused]] std::size_t cpu_relax_iterations,
214 tracing::ScopeTime& scope) {
215 if constexpr (kIsContainerCopiedByElement<T>) {
216 auto copy = std::make_unique<T>();
217 if constexpr (meta::kIsReservable<T>) {
218 copy->reserve(container.size());
221 utils::
CpuRelax relax
{cpu_relax_iterations
, &scope
};
222 for (
const auto& kv : container) {
228 return std::make_unique<T>(container);
232template <
typename Container,
typename Value,
typename KeyMember,
234void CacheInsertOrAssign(Container& container, Value&& value,
235 const KeyMember& key_member, Args&&... ) {
237 static_assert(
sizeof...(Args) == 0);
239 auto key = std::invoke(key_member, value);
240 container.insert_or_assign(std::move(key), std::forward<Value>(value));
244using HasOnWritesDoneImpl =
decltype(std::declval<T&>().OnWritesDone());
247void OnWritesDone(T& container) {
248 if constexpr (meta::kIsDetected<HasOnWritesDoneImpl, T>) {
249 container.OnWritesDone();
254using HasCustomUpdatedImpl =
255 decltype(T::GetLastKnownUpdated(std::declval<DataCacheContainerType<T>>()));
258inline constexpr bool kHasCustomUpdated =
259 meta::kIsDetected<HasCustomUpdatedImpl, T>;
262using UpdatedFieldTypeImpl =
typename T::UpdatedFieldType;
264inline constexpr bool kHasUpdatedFieldType =
265 meta::kIsDetected<UpdatedFieldTypeImpl, T>;
267using UpdatedFieldType =
268 meta::DetectedOr<storages::postgres::TimePointTz, UpdatedFieldTypeImpl, T>;
271constexpr bool CheckUpdatedFieldType() {
272 if constexpr (kHasUpdatedFieldType<T>) {
274 std::is_same_v<
typename T::UpdatedFieldType,
275 storages::postgres::TimePointTz> ||
276 std::is_same_v<
typename T::UpdatedFieldType,
277 storages::postgres::TimePoint> ||
278 kHasCustomUpdated<T>,
279 "Invalid UpdatedFieldType, must be either TimePointTz or TimePoint");
281 static_assert(!kWantIncrementalUpdates<T>,
282 "UpdatedFieldType must be explicitly specified when using "
283 "incremental updates");
290using HasClusterHostTypeImpl =
decltype(T::kClusterHostType);
294 if constexpr (meta::kIsDetected<HasClusterHostTypeImpl, T>) {
295 return T::kClusterHostType;
297 return storages::postgres::ClusterHostType::kSlave;
303using HasMayReturnNull =
decltype(T::kMayReturnNull);
306constexpr bool MayReturnNull() {
307 if constexpr (meta::kIsDetected<HasMayReturnNull, T>) {
308 return T::kMayReturnNull;
314template <
typename PostgreCachePolicy>
315struct PolicyChecker {
318 kHasName<PostgreCachePolicy>,
319 "The PosgreSQL cache policy must contain a static member `kName`");
321 kHasValueType<PostgreCachePolicy>,
322 "The PosgreSQL cache policy must define a type alias `ValueType`");
324 kHasKeyMember<PostgreCachePolicy>,
325 "The PostgreSQL cache policy must contain a static member `kKeyMember` "
326 "with a pointer to a data or a function member with the object's key");
327 static_assert(kHasQuery<PostgreCachePolicy> ||
328 kHasGetQuery<PostgreCachePolicy>,
329 "The PosgreSQL cache policy must contain a static data member "
330 "`kQuery` with a select statement or a static member function "
331 "`GetQuery` returning the query");
332 static_assert(!(kHasQuery<PostgreCachePolicy> &&
333 kHasGetQuery<PostgreCachePolicy>),
334 "The PosgreSQL cache policy must define `kQuery` or "
335 "`GetQuery`, not both");
337 kHasUpdatedField<PostgreCachePolicy>,
338 "The PosgreSQL cache policy must contain a static member "
339 "`kUpdatedField`. If you don't want to use incremental updates, "
340 "please set its value to `nullptr`");
341 static_assert(CheckUpdatedFieldType<PostgreCachePolicy>());
343 static_assert(ClusterHostType<PostgreCachePolicy>() &
344 storages::postgres::kClusterHostRolesMask,
345 "Cluster host role must be specified for caching component, "
346 "please be more specific");
349 if constexpr (kHasGetQuery<PostgreCachePolicy>) {
350 return PostgreCachePolicy::GetQuery();
352 return PostgreCachePolicy::kQuery;
357 CachingComponentBase<DataCacheContainerType<PostgreCachePolicy>>;
360inline constexpr std::chrono::minutes kDefaultFullUpdateTimeout{1};
361inline constexpr std::chrono::seconds kDefaultIncrementalUpdateTimeout{1};
362inline constexpr std::chrono::milliseconds kStatementTimeoutOff{0};
363inline constexpr std::chrono::milliseconds kCpuRelaxThreshold{10};
364inline constexpr std::chrono::milliseconds kCpuRelaxInterval{2};
366inline constexpr std::string_view kCopyStage =
"copy_data";
367inline constexpr std::string_view kFetchStage =
"fetch";
368inline constexpr std::string_view kParseStage =
"parse";
370inline constexpr std::size_t kDefaultChunkSize = 1000;
378template <
typename PostgreCachePolicy>
379class PostgreCache
final
380 :
public pg_cache::detail::PolicyChecker<PostgreCachePolicy>::BaseType {
383 using PolicyType = PostgreCachePolicy;
384 using ValueType = pg_cache::detail::ValueType<PolicyType>;
385 using RawValueType = pg_cache::detail::RawValueType<PolicyType>;
386 using DataType = pg_cache::detail::DataCacheContainerType<PolicyType>;
387 using PolicyCheckerType = pg_cache::detail::PolicyChecker<PostgreCachePolicy>;
388 using UpdatedFieldType =
389 pg_cache::detail::UpdatedFieldType<PostgreCachePolicy>;
390 using BaseType =
typename PolicyCheckerType::BaseType;
393 constexpr static bool kIncrementalUpdates =
394 pg_cache::detail::kWantIncrementalUpdates<PolicyType>;
395 constexpr static auto kClusterHostTypeFlags =
396 pg_cache::detail::ClusterHostType<PolicyType>();
397 constexpr static auto kName = PolicyType::kName;
399 PostgreCache(
const ComponentConfig&,
const ComponentContext&);
400 ~PostgreCache()
override;
402 static yaml_config::Schema GetStaticConfigSchema();
405 using CachedData = std::unique_ptr<DataType>;
407 UpdatedFieldType GetLastUpdated(
408 std::chrono::system_clock::time_point last_update,
409 const DataType& cache)
const;
412 const std::chrono::system_clock::time_point& last_update,
413 const std::chrono::system_clock::time_point& now,
414 cache::UpdateStatisticsScope& stats_scope)
override;
416 bool MayReturnNull()
const override;
418 CachedData GetDataSnapshot(cache::UpdateType type, tracing::ScopeTime& scope);
420 cache::UpdateStatisticsScope& stats_scope,
426 std::chrono::milliseconds ParseCorrection(
const ComponentConfig& config);
428 std::vector<storages::postgres::ClusterPtr> clusters_;
430 const std::chrono::system_clock::duration correction_;
431 const std::chrono::milliseconds full_update_timeout_;
432 const std::chrono::milliseconds incremental_update_timeout_;
433 const std::size_t chunk_size_;
434 std::size_t cpu_relax_iterations_parse_{0};
435 std::size_t cpu_relax_iterations_copy_{0};
438template <
typename PostgreCachePolicy>
439inline constexpr bool kHasValidate<PostgreCache<PostgreCachePolicy>> =
true;
441template <
typename PostgreCachePolicy>
442PostgreCache<PostgreCachePolicy>::PostgreCache(
const ComponentConfig& config,
443 const ComponentContext& context)
444 : BaseType{config, context},
445 correction_{ParseCorrection(config)},
446 full_update_timeout_{
447 config[
"full-update-op-timeout"].As<std::chrono::milliseconds>(
448 pg_cache::detail::kDefaultFullUpdateTimeout)},
449 incremental_update_timeout_{
450 config[
"incremental-update-op-timeout"].As<std::chrono::milliseconds>(
451 pg_cache::detail::kDefaultIncrementalUpdateTimeout)},
452 chunk_size_{config[
"chunk-size"].As<size_t>(
453 pg_cache::detail::kDefaultChunkSize)} {
456 "Either set 'chunk-size' to 0, or enable PostgreSQL portals by building "
457 "the framework with CMake option USERVER_FEATURE_PATCH_LIBPQ set to ON.");
459 if (
this->GetAllowedUpdateTypes() ==
461 !kIncrementalUpdates) {
462 throw std::logic_error(
463 "Incremental update support is requested in config but no update field "
464 "name is specified in traits of '" +
465 config.Name() +
"' cache");
467 if (correction_.count() < 0) {
468 throw std::logic_error(
469 "Refusing to set forward (negative) update correction requested in "
471 config.Name() +
"' cache");
474 const auto pg_alias = config[
"pgcomponent"].As<std::string>(
"");
475 if (pg_alias.empty()) {
477 "No `pgcomponent` entry in configuration"};
479 auto& pg_cluster_comp = context.FindComponent<components::Postgres>(pg_alias);
480 const auto shard_count = pg_cluster_comp.GetShardCount();
481 clusters_.resize(shard_count);
482 for (size_t i = 0; i < shard_count; ++i) {
483 clusters_[i] = pg_cluster_comp.GetClusterForShard(i);
486 LOG_INFO() <<
"Cache " << kName <<
" full update query `"
487 << GetAllQuery().Statement() <<
"` incremental update query `"
488 << GetDeltaQuery().Statement() <<
"`";
490 this->StartPeriodicUpdates();
493template <
typename PostgreCachePolicy>
494PostgreCache<PostgreCachePolicy>::~PostgreCache() {
495 this->StopPeriodicUpdates();
498template <
typename PostgreCachePolicy>
501 if constexpr (pg_cache::detail::kHasWhere<PostgreCachePolicy>) {
502 return {fmt::format(
"{} where {}", query.Statement(),
503 PostgreCachePolicy::kWhere),
510template <
typename PostgreCachePolicy>
511storages::
postgres::Query PostgreCache<PostgreCachePolicy>::GetDeltaQuery() {
512 if constexpr (kIncrementalUpdates) {
515 if constexpr (pg_cache::detail::kHasWhere<PostgreCachePolicy>) {
517 fmt::format(
"{} where ({}) and {} >= $1", query.Statement(),
518 PostgreCachePolicy::kWhere, PolicyType::kUpdatedField),
521 return {fmt::format(
"{} where {} >= $1", query.Statement(),
522 PolicyType::kUpdatedField),
526 return GetAllQuery();
530template <
typename PostgreCachePolicy>
531std::chrono::milliseconds PostgreCache<PostgreCachePolicy>::ParseCorrection(
532 const ComponentConfig& config) {
533 static constexpr std::string_view kUpdateCorrection =
"update-correction";
534 if (pg_cache::detail::kHasCustomUpdated<PostgreCachePolicy> ||
536 return config[kUpdateCorrection].As<std::chrono::milliseconds>(0);
538 return config[kUpdateCorrection].As<std::chrono::milliseconds>();
542template <
typename PostgreCachePolicy>
543typename PostgreCache<PostgreCachePolicy>::UpdatedFieldType
544PostgreCache<PostgreCachePolicy>::GetLastUpdated(
545 [[maybe_unused]] std::chrono::system_clock::time_point last_update,
546 const DataType& cache)
const {
547 if constexpr (pg_cache::detail::kHasCustomUpdated<PostgreCachePolicy>) {
548 return PostgreCachePolicy::GetLastKnownUpdated(cache);
550 return UpdatedFieldType{last_update - correction_};
554template <
typename PostgreCachePolicy>
555void PostgreCache<PostgreCachePolicy>::Update(
557 const std::chrono::system_clock::time_point& last_update,
558 const std::chrono::system_clock::time_point& ,
559 cache::UpdateStatisticsScope& stats_scope) {
561 if constexpr (!kIncrementalUpdates) {
566 const std::chrono::milliseconds timeout = (type == cache::UpdateType::kFull)
567 ? full_update_timeout_
568 : incremental_update_timeout_;
571 auto scope = tracing::Span::CurrentSpan().CreateScopeTime(
572 std::string{pg_cache::detail::kCopyStage});
573 auto data_cache = GetDataSnapshot(type, scope);
574 [[maybe_unused]]
const auto old_size = data_cache->size();
576 scope.Reset(std::string{pg_cache::detail::kFetchStage});
580 for (
auto& cluster : clusters_) {
581 if (chunk_size_ > 0) {
582 auto trx = cluster->Begin(
583 kClusterHostTypeFlags, pg::Transaction::RO,
584 pg::CommandControl{timeout, pg_cache::detail::kStatementTimeoutOff});
586 trx.MakePortal(query, GetLastUpdated(last_update, *data_cache));
588 scope.Reset(std::string{pg_cache::detail::kFetchStage});
589 auto res = portal.Fetch(chunk_size_);
590 stats_scope.IncreaseDocumentsReadCount(res.Size());
592 scope.Reset(std::string{pg_cache::detail::kParseStage});
593 CacheResults(res, data_cache, stats_scope, scope);
594 changes += res.Size();
598 bool has_parameter = query.Statement().find(
'$') != std::string::npos;
599 auto res = has_parameter
601 kClusterHostTypeFlags,
603 timeout, pg_cache::detail::kStatementTimeoutOff},
604 query, GetLastUpdated(last_update, *data_cache))
606 kClusterHostTypeFlags,
608 timeout, pg_cache::detail::kStatementTimeoutOff},
610 stats_scope.IncreaseDocumentsReadCount(res.Size());
612 scope.Reset(std::string{pg_cache::detail::kParseStage});
613 CacheResults(res, data_cache, stats_scope, scope);
614 changes += res.Size();
620 if constexpr (pg_cache::detail::kIsContainerCopiedByElement<DataType>) {
622 const auto elapsed_copy =
623 scope.ElapsedTotal(std::string{pg_cache::detail::kCopyStage});
624 if (elapsed_copy > pg_cache::detail::kCpuRelaxThreshold) {
625 cpu_relax_iterations_copy_ =
static_cast<std::size_t>(
626 static_cast<
double>(old_size) /
627 (elapsed_copy / pg_cache::detail::kCpuRelaxInterval));
628 LOG_TRACE() <<
"Elapsed time for copying " << kName <<
" "
629 << elapsed_copy.count() <<
" for " << changes
630 <<
" data items is over threshold. Will relax CPU every "
631 << cpu_relax_iterations_parse_ <<
" iterations";
637 const auto elapsed_parse =
638 scope.ElapsedTotal(std::string{pg_cache::detail::kParseStage});
639 if (elapsed_parse > pg_cache::detail::kCpuRelaxThreshold) {
640 cpu_relax_iterations_parse_ =
static_cast<std::size_t>(
641 static_cast<
double>(changes) /
642 (elapsed_parse / pg_cache::detail::kCpuRelaxInterval));
643 LOG_TRACE() <<
"Elapsed time for parsing " << kName <<
" "
644 << elapsed_parse.count() <<
" for " << changes
645 <<
" data items is over threshold. Will relax CPU every "
646 << cpu_relax_iterations_parse_ <<
" iterations";
649 if (changes > 0 || type == cache::UpdateType::kFull) {
651 stats_scope.Finish(data_cache->size());
652 pg_cache::detail::OnWritesDone(*data_cache);
653 this->Set(std::move(data_cache));
659template <
typename PostgreCachePolicy>
660bool PostgreCache<PostgreCachePolicy>::MayReturnNull()
const {
661 return pg_cache::detail::MayReturnNull<PolicyType>();
664template <
typename PostgreCachePolicy>
665void PostgreCache<PostgreCachePolicy>::CacheResults(
668 auto values = res.AsSetOf<RawValueType>(storages::postgres::kRowTag);
669 utils::
CpuRelax relax
{cpu_relax_iterations_parse_
, &scope
};
670 for (
auto p = values.begin(); p != values.end(); ++p) {
673 using pg_cache::detail::CacheInsertOrAssign;
675 *data_cache, pg_cache::detail::ExtractValue<PostgreCachePolicy>(*p),
676 PostgreCachePolicy::kKeyMember);
677 }
catch (
const std::exception& e) {
678 stats_scope.IncreaseDocumentsParseFailures(1);
679 LOG_ERROR() <<
"Error parsing data row in cache '" << kName <<
"' to '"
680 << compiler::GetTypeName<ValueType>() <<
"': " << e.what();
685template <
typename PostgreCachePolicy>
686typename PostgreCache<PostgreCachePolicy>::CachedData
687PostgreCache<PostgreCachePolicy>::GetDataSnapshot(cache::
UpdateType type,
690 auto data =
this->Get();
692 return pg_cache::detail::CopyContainer(*data, cpu_relax_iterations_copy_,
696 return std::make_unique<DataType>();
701std::string GetPostgreCacheSchema();
705template <
typename PostgreCachePolicy>
706yaml_config::Schema PostgreCache<PostgreCachePolicy>::GetStaticConfigSchema() {
708 typename pg_cache::detail::PolicyChecker<PostgreCachePolicy>::BaseType;
709 return yaml_config::MergeSchemas<ParentType>(impl::GetPostgreCacheSchema());
714namespace utils::impl::projected_set {
716template <
typename Set,
typename Value,
typename KeyMember>
717void CacheInsertOrAssign(Set& set, Value&& value,
719 DoInsert(set, std::forward<Value>(value));