6#include <userver/cache/base_postgres_cache_fwd.hpp> 
   12#include <unordered_map> 
   14#include <fmt/format.h> 
   16#include <userver/cache/cache_statistics.hpp> 
   17#include <userver/cache/caching_component_base.hpp> 
   18#include <userver/components/component_config.hpp> 
   19#include <userver/components/component_context.hpp> 
   21#include <userver/storages/postgres/cluster.hpp> 
   22#include <userver/storages/postgres/component.hpp> 
   23#include <userver/storages/postgres/io/chrono.hpp> 
   25#include <userver/compiler/demangle.hpp> 
   26#include <userver/logging/log.hpp> 
   27#include <userver/tracing/span.hpp> 
   28#include <userver/utils/assert.hpp> 
   29#include <userver/utils/cpu_relax.hpp> 
   30#include <userver/utils/meta.hpp> 
   31#include <userver/utils/void_t.hpp> 
   32#include <userver/yaml_config/merge_schemas.hpp> 
   34USERVER_NAMESPACE_BEGIN
 
  116namespace pg_cache::detail {
 
  119using ValueType = 
typename T::ValueType;
 
  121inline constexpr bool kHasValueType = meta::kIsDetected<ValueType, T>;
 
  124using RawValueTypeImpl = 
typename T::RawValueType;
 
  126inline constexpr bool kHasRawValueType = meta::kIsDetected<RawValueTypeImpl, T>;
 
  128using RawValueType = meta::DetectedOr<ValueType<T>, RawValueTypeImpl, T>;
 
  130template <
typename PostgreCachePolicy>
 
  131auto ExtractValue(RawValueType<PostgreCachePolicy>&& raw) {
 
  132  if constexpr (kHasRawValueType<PostgreCachePolicy>) {
 
  133    return Convert(std::move(raw),
 
  134                   formats::parse::To<ValueType<PostgreCachePolicy>>());
 
  136    return std::move(raw);
 
  142using HasNameImpl = std::enable_if_t<!std::string_view{T::kName}.empty()>;
 
  144inline constexpr bool kHasName = meta::kIsDetected<HasNameImpl, T>;
 
  148using HasQueryImpl = 
decltype(T::kQuery);
 
  150inline constexpr bool kHasQuery = meta::kIsDetected<HasQueryImpl, T>;
 
  154using HasGetQueryImpl = 
decltype(T::GetQuery());
 
  156inline constexpr bool kHasGetQuery = meta::kIsDetected<HasGetQueryImpl, T>;
 
  160using HasWhere = 
decltype(T::kWhere);
 
  162inline constexpr bool kHasWhere = meta::kIsDetected<HasWhere, T>;
 
  166using HasUpdatedField = 
decltype(T::kUpdatedField);
 
  168inline constexpr bool kHasUpdatedField = meta::kIsDetected<HasUpdatedField, T>;
 
  171using WantIncrementalUpdates =
 
  172    std::enable_if_t<!std::string_view{T::kUpdatedField}.empty()>;
 
  174inline constexpr bool kWantIncrementalUpdates =
 
  175    meta::kIsDetected<WantIncrementalUpdates, T>;
 
  179using KeyMemberTypeImpl =
 
  180    std::decay_t<std::invoke_result_t<
decltype(T::kKeyMember), ValueType<T>>>;
 
  182inline constexpr bool kHasKeyMember = meta::kIsDetected<KeyMemberTypeImpl, T>;
 
  184using KeyMemberType = meta::DetectedType<KeyMemberTypeImpl, T>;
 
  187template <
typename T, 
typename = USERVER_NAMESPACE::utils::void_t<>>
 
  188struct DataCacheContainer {
 
  189  static_assert(meta::kIsStdHashable<KeyMemberType<T>>,
 
  190                "With default CacheContainer, key type must be std::hash-able");
 
  192  using type = std::unordered_map<KeyMemberType<T>, ValueType<T>>;
 
  196struct DataCacheContainer<
 
  197    T, USERVER_NAMESPACE::utils::void_t<
typename T::CacheContainer>> {
 
  198  using type = 
typename T::CacheContainer;
 
  202using DataCacheContainerType = 
typename DataCacheContainer<T>::type;
 
  207inline constexpr bool kIsContainerCopiedByElement =
 
  208    meta::kIsInstantiationOf<std::unordered_map, T> ||
 
  209    meta::kIsInstantiationOf<std::map, T>;
 
  212std::unique_ptr<T> CopyContainer(
 
  213    const T& container, [[maybe_unused]] std::size_t cpu_relax_iterations,
 
  214    tracing::ScopeTime& scope) {
 
  215  if constexpr (kIsContainerCopiedByElement<T>) {
 
  216    auto copy = std::make_unique<T>();
 
  217    if constexpr (meta::kIsReservable<T>) {
 
  218      copy->reserve(container.size());
 
  221    utils::
CpuRelax relax
{cpu_relax_iterations
, &scope
};
 
  222    for (
const auto& kv : container) {
 
  228    return std::make_unique<T>(container);
 
  232template <
typename Container, 
typename Value, 
typename KeyMember,
 
  234void CacheInsertOrAssign(Container& container, Value&& value,
 
  235                         const KeyMember& key_member, Args&&... ) {
 
  237  static_assert(
sizeof...(Args) == 0);
 
  239  auto key = std::invoke(key_member, value);
 
  240  container.insert_or_assign(std::move(key), std::forward<Value>(value));
 
  244using HasOnWritesDoneImpl = 
decltype(std::declval<T&>().OnWritesDone());
 
  247void OnWritesDone(T& container) {
 
  248  if constexpr (meta::kIsDetected<HasOnWritesDoneImpl, T>) {
 
  249    container.OnWritesDone();
 
  254using HasCustomUpdatedImpl =
 
  255    decltype(T::GetLastKnownUpdated(std::declval<DataCacheContainerType<T>>()));
 
  258inline constexpr bool kHasCustomUpdated =
 
  259    meta::kIsDetected<HasCustomUpdatedImpl, T>;
 
  262using UpdatedFieldTypeImpl = 
typename T::UpdatedFieldType;
 
  264inline constexpr bool kHasUpdatedFieldType =
 
  265    meta::kIsDetected<UpdatedFieldTypeImpl, T>;
 
  267using UpdatedFieldType =
 
  268    meta::DetectedOr<storages::postgres::TimePointTz, UpdatedFieldTypeImpl, T>;
 
  271constexpr bool CheckUpdatedFieldType() {
 
  272  if constexpr (kHasUpdatedFieldType<T>) {
 
  274        std::is_same_v<
typename T::UpdatedFieldType,
 
  275                       storages::postgres::TimePointTz> ||
 
  276            std::is_same_v<
typename T::UpdatedFieldType,
 
  277                           storages::postgres::TimePoint> ||
 
  278            kHasCustomUpdated<T>,
 
  279        "Invalid UpdatedFieldType, must be either TimePointTz or TimePoint");
 
  281    static_assert(!kWantIncrementalUpdates<T>,
 
  282                  "UpdatedFieldType must be explicitly specified when using " 
  283                  "incremental updates");
 
  290using HasClusterHostTypeImpl = 
decltype(T::kClusterHostType);
 
  294  if constexpr (meta::kIsDetected<HasClusterHostTypeImpl, T>) {
 
  295    return T::kClusterHostType;
 
  297    return storages::postgres::ClusterHostType::kSlave;
 
  303using HasMayReturnNull = 
decltype(T::kMayReturnNull);
 
  306constexpr bool MayReturnNull() {
 
  307  if constexpr (meta::kIsDetected<HasMayReturnNull, T>) {
 
  308    return T::kMayReturnNull;
 
  314template <
typename PostgreCachePolicy>
 
  315struct PolicyChecker {
 
  318      kHasName<PostgreCachePolicy>,
 
  319      "The PosgreSQL cache policy must contain a static member `kName`");
 
  321      kHasValueType<PostgreCachePolicy>,
 
  322      "The PosgreSQL cache policy must define a type alias `ValueType`");
 
  324      kHasKeyMember<PostgreCachePolicy>,
 
  325      "The PostgreSQL cache policy must contain a static member `kKeyMember` " 
  326      "with a pointer to a data or a function member with the object's key");
 
  327  static_assert(kHasQuery<PostgreCachePolicy> ||
 
  328                    kHasGetQuery<PostgreCachePolicy>,
 
  329                "The PosgreSQL cache policy must contain a static data member " 
  330                "`kQuery` with a select statement or a static member function " 
  331                "`GetQuery` returning the query");
 
  332  static_assert(!(kHasQuery<PostgreCachePolicy> &&
 
  333                  kHasGetQuery<PostgreCachePolicy>),
 
  334                "The PosgreSQL cache policy must define `kQuery` or " 
  335                "`GetQuery`, not both");
 
  337      kHasUpdatedField<PostgreCachePolicy>,
 
  338      "The PosgreSQL cache policy must contain a static member " 
  339      "`kUpdatedField`. If you don't want to use incremental updates, " 
  340      "please set its value to `nullptr`");
 
  341  static_assert(CheckUpdatedFieldType<PostgreCachePolicy>());
 
  343  static_assert(ClusterHostType<PostgreCachePolicy>() &
 
  344                    storages::postgres::kClusterHostRolesMask,
 
  345                "Cluster host role must be specified for caching component, " 
  346                "please be more specific");
 
  349    if constexpr (kHasGetQuery<PostgreCachePolicy>) {
 
  350      return PostgreCachePolicy::GetQuery();
 
  352      return PostgreCachePolicy::kQuery;
 
  357      CachingComponentBase<DataCacheContainerType<PostgreCachePolicy>>;
 
  360inline constexpr std::chrono::minutes kDefaultFullUpdateTimeout{1};
 
  361inline constexpr std::chrono::seconds kDefaultIncrementalUpdateTimeout{1};
 
  362inline constexpr std::chrono::milliseconds kStatementTimeoutOff{0};
 
  363inline constexpr std::chrono::milliseconds kCpuRelaxThreshold{10};
 
  364inline constexpr std::chrono::milliseconds kCpuRelaxInterval{2};
 
  366inline constexpr std::string_view kCopyStage = 
"copy_data";
 
  367inline constexpr std::string_view kFetchStage = 
"fetch";
 
  368inline constexpr std::string_view kParseStage = 
"parse";
 
  370inline constexpr std::size_t kDefaultChunkSize = 1000;
 
  378template <
typename PostgreCachePolicy>
 
  379class PostgreCache 
final 
  380    : 
public pg_cache::detail::PolicyChecker<PostgreCachePolicy>::BaseType {
 
  383  using PolicyType = PostgreCachePolicy;
 
  384  using ValueType = pg_cache::detail::ValueType<PolicyType>;
 
  385  using RawValueType = pg_cache::detail::RawValueType<PolicyType>;
 
  386  using DataType = pg_cache::detail::DataCacheContainerType<PolicyType>;
 
  387  using PolicyCheckerType = pg_cache::detail::PolicyChecker<PostgreCachePolicy>;
 
  388  using UpdatedFieldType =
 
  389      pg_cache::detail::UpdatedFieldType<PostgreCachePolicy>;
 
  390  using BaseType = 
typename PolicyCheckerType::BaseType;
 
  393  constexpr static bool kIncrementalUpdates =
 
  394      pg_cache::detail::kWantIncrementalUpdates<PolicyType>;
 
  395  constexpr static auto kClusterHostTypeFlags =
 
  396      pg_cache::detail::ClusterHostType<PolicyType>();
 
  397  constexpr static auto kName = PolicyType::kName;
 
  399  PostgreCache(
const ComponentConfig&, 
const ComponentContext&);
 
  400  ~PostgreCache() 
override;
 
  402  static yaml_config::Schema GetStaticConfigSchema();
 
  405  using CachedData = std::unique_ptr<DataType>;
 
  407  UpdatedFieldType GetLastUpdated(
 
  408      std::chrono::system_clock::time_point last_update,
 
  409      const DataType& cache) 
const;
 
  412              const std::chrono::system_clock::time_point& last_update,
 
  413              const std::chrono::system_clock::time_point& now,
 
  414              cache::UpdateStatisticsScope& stats_scope) 
override;
 
  416  bool MayReturnNull() 
const override;
 
  418  CachedData GetDataSnapshot(cache::UpdateType type, tracing::ScopeTime& scope);
 
  420                    cache::UpdateStatisticsScope& stats_scope,
 
  426  std::chrono::milliseconds ParseCorrection(
const ComponentConfig& config);
 
  428  std::vector<storages::postgres::ClusterPtr> clusters_;
 
  430  const std::chrono::system_clock::duration correction_;
 
  431  const std::chrono::milliseconds full_update_timeout_;
 
  432  const std::chrono::milliseconds incremental_update_timeout_;
 
  433  const std::size_t chunk_size_;
 
  434  std::size_t cpu_relax_iterations_parse_{0};
 
  435  std::size_t cpu_relax_iterations_copy_{0};
 
  438template <
typename PostgreCachePolicy>
 
  439inline constexpr bool kHasValidate<PostgreCache<PostgreCachePolicy>> = 
true;
 
  441template <
typename PostgreCachePolicy>
 
  442PostgreCache<PostgreCachePolicy>::PostgreCache(
const ComponentConfig& config,
 
  443                                               const ComponentContext& context)
 
  444    : BaseType{config, context},
 
  445      correction_{ParseCorrection(config)},
 
  446      full_update_timeout_{
 
  447          config[
"full-update-op-timeout"].As<std::chrono::milliseconds>(
 
  448              pg_cache::detail::kDefaultFullUpdateTimeout)},
 
  449      incremental_update_timeout_{
 
  450          config[
"incremental-update-op-timeout"].As<std::chrono::milliseconds>(
 
  451              pg_cache::detail::kDefaultIncrementalUpdateTimeout)},
 
  452      chunk_size_{config[
"chunk-size"].As<size_t>(
 
  453          pg_cache::detail::kDefaultChunkSize)} {
 
  456      "Either set 'chunk-size' to 0, or enable PostgreSQL portals by building " 
  457      "the framework with CMake option USERVER_FEATURE_PATCH_LIBPQ set to ON.");
 
  459  if (
this->GetAllowedUpdateTypes() ==
 
  461      !kIncrementalUpdates) {
 
  462    throw std::logic_error(
 
  463        "Incremental update support is requested in config but no update field " 
  464        "name is specified in traits of '" +
 
  465        config.Name() + 
"' cache");
 
  467  if (correction_.count() < 0) {
 
  468    throw std::logic_error(
 
  469        "Refusing to set forward (negative) update correction requested in " 
  471        config.Name() + 
"' cache");
 
  474  const auto pg_alias = config[
"pgcomponent"].As<std::string>(
"");
 
  475  if (pg_alias.empty()) {
 
  477        "No `pgcomponent` entry in configuration"};
 
  479  auto& pg_cluster_comp = context.FindComponent<components::Postgres>(pg_alias);
 
  480  const auto shard_count = pg_cluster_comp.GetShardCount();
 
  481  clusters_.resize(shard_count);
 
  482  for (size_t i = 0; i < shard_count; ++i) {
 
  483    clusters_[i] = pg_cluster_comp.GetClusterForShard(i);
 
  486  LOG_INFO() << 
"Cache " << kName << 
" full update query `" 
  487             << GetAllQuery().Statement() << 
"` incremental update query `" 
  488             << GetDeltaQuery().Statement() << 
"`";
 
  490  this->StartPeriodicUpdates();
 
  493template <
typename PostgreCachePolicy>
 
  494PostgreCache<PostgreCachePolicy>::~PostgreCache() {
 
  495  this->StopPeriodicUpdates();
 
  498template <
typename PostgreCachePolicy>
 
  501  if constexpr (pg_cache::detail::kHasWhere<PostgreCachePolicy>) {
 
  502    return {fmt::format(
"{} where {}", query.Statement(),
 
  503                        PostgreCachePolicy::kWhere),
 
  510template <
typename PostgreCachePolicy>
 
  511storages::
postgres::Query PostgreCache<PostgreCachePolicy>::GetDeltaQuery() {
 
  512  if constexpr (kIncrementalUpdates) {
 
  515    if constexpr (pg_cache::detail::kHasWhere<PostgreCachePolicy>) {
 
  517          fmt::format(
"{} where ({}) and {} >= $1", query.Statement(),
 
  518                      PostgreCachePolicy::kWhere, PolicyType::kUpdatedField),
 
  521      return {fmt::format(
"{} where {} >= $1", query.Statement(),
 
  522                          PolicyType::kUpdatedField),
 
  526    return GetAllQuery();
 
  530template <
typename PostgreCachePolicy>
 
  531std::chrono::milliseconds PostgreCache<PostgreCachePolicy>::ParseCorrection(
 
  532    const ComponentConfig& config) {
 
  533  static constexpr std::string_view kUpdateCorrection = 
"update-correction";
 
  534  if (pg_cache::detail::kHasCustomUpdated<PostgreCachePolicy> ||
 
  536    return config[kUpdateCorrection].As<std::chrono::milliseconds>(0);
 
  538    return config[kUpdateCorrection].As<std::chrono::milliseconds>();
 
  542template <
typename PostgreCachePolicy>
 
  543typename PostgreCache<PostgreCachePolicy>::UpdatedFieldType
 
  544PostgreCache<PostgreCachePolicy>::GetLastUpdated(
 
  545    [[maybe_unused]] std::chrono::system_clock::time_point last_update,
 
  546    const DataType& cache) 
const {
 
  547  if constexpr (pg_cache::detail::kHasCustomUpdated<PostgreCachePolicy>) {
 
  548    return PostgreCachePolicy::GetLastKnownUpdated(cache);
 
  550    return UpdatedFieldType{last_update - correction_};
 
  554template <
typename PostgreCachePolicy>
 
  555void PostgreCache<PostgreCachePolicy>::Update(
 
  557    const std::chrono::system_clock::time_point& last_update,
 
  558    const std::chrono::system_clock::time_point& ,
 
  559    cache::UpdateStatisticsScope& stats_scope) {
 
  561  if constexpr (!kIncrementalUpdates) {
 
  566  const std::chrono::milliseconds timeout = (type == cache::UpdateType::kFull)
 
  567                                                ? full_update_timeout_
 
  568                                                : incremental_update_timeout_;
 
  571  auto scope = tracing::Span::CurrentSpan().CreateScopeTime(
 
  572      std::string{pg_cache::detail::kCopyStage});
 
  573  auto data_cache = GetDataSnapshot(type, scope);
 
  574  [[maybe_unused]] 
const auto old_size = data_cache->size();
 
  576  scope.Reset(std::string{pg_cache::detail::kFetchStage});
 
  580  for (
auto& cluster : clusters_) {
 
  581    if (chunk_size_ > 0) {
 
  582      auto trx = cluster->Begin(
 
  583          kClusterHostTypeFlags, pg::Transaction::RO,
 
  584          pg::CommandControl{timeout, pg_cache::detail::kStatementTimeoutOff});
 
  586          trx.MakePortal(query, GetLastUpdated(last_update, *data_cache));
 
  588        scope.Reset(std::string{pg_cache::detail::kFetchStage});
 
  589        auto res = portal.Fetch(chunk_size_);
 
  590        stats_scope.IncreaseDocumentsReadCount(res.Size());
 
  592        scope.Reset(std::string{pg_cache::detail::kParseStage});
 
  593        CacheResults(res, data_cache, stats_scope, scope);
 
  594        changes += res.Size();
 
  598      bool has_parameter = query.Statement().find(
'$') != std::string::npos;
 
  599      auto res = has_parameter
 
  601                           kClusterHostTypeFlags,
 
  603                               timeout, pg_cache::detail::kStatementTimeoutOff},
 
  604                           query, GetLastUpdated(last_update, *data_cache))
 
  606                           kClusterHostTypeFlags,
 
  608                               timeout, pg_cache::detail::kStatementTimeoutOff},
 
  610      stats_scope.IncreaseDocumentsReadCount(res.Size());
 
  612      scope.Reset(std::string{pg_cache::detail::kParseStage});
 
  613      CacheResults(res, data_cache, stats_scope, scope);
 
  614      changes += res.Size();
 
  620  if constexpr (pg_cache::detail::kIsContainerCopiedByElement<DataType>) {
 
  622      const auto elapsed_copy =
 
  623          scope.ElapsedTotal(std::string{pg_cache::detail::kCopyStage});
 
  624      if (elapsed_copy > pg_cache::detail::kCpuRelaxThreshold) {
 
  625        cpu_relax_iterations_copy_ = 
static_cast<std::size_t>(
 
  626            static_cast<
double>(old_size) /
 
  627            (elapsed_copy / pg_cache::detail::kCpuRelaxInterval));
 
  628        LOG_TRACE() << 
"Elapsed time for copying " << kName << 
" " 
  629                    << elapsed_copy.count() << 
" for " << changes
 
  630                    << 
" data items is over threshold. Will relax CPU every " 
  631                    << cpu_relax_iterations_parse_ << 
" iterations";
 
  637    const auto elapsed_parse =
 
  638        scope.ElapsedTotal(std::string{pg_cache::detail::kParseStage});
 
  639    if (elapsed_parse > pg_cache::detail::kCpuRelaxThreshold) {
 
  640      cpu_relax_iterations_parse_ = 
static_cast<std::size_t>(
 
  641          static_cast<
double>(changes) /
 
  642          (elapsed_parse / pg_cache::detail::kCpuRelaxInterval));
 
  643      LOG_TRACE() << 
"Elapsed time for parsing " << kName << 
" " 
  644                  << elapsed_parse.count() << 
" for " << changes
 
  645                  << 
" data items is over threshold. Will relax CPU every " 
  646                  << cpu_relax_iterations_parse_ << 
" iterations";
 
  649  if (changes > 0 || type == cache::UpdateType::kFull) {
 
  651    stats_scope.Finish(data_cache->size());
 
  652    pg_cache::detail::OnWritesDone(*data_cache);
 
  653    this->Set(std::move(data_cache));
 
  659template <
typename PostgreCachePolicy>
 
  660bool PostgreCache<PostgreCachePolicy>::MayReturnNull() 
const {
 
  661  return pg_cache::detail::MayReturnNull<PolicyType>();
 
  664template <
typename PostgreCachePolicy>
 
  665void PostgreCache<PostgreCachePolicy>::CacheResults(
 
  668  auto values = res.AsSetOf<RawValueType>(storages::postgres::kRowTag);
 
  669  utils::
CpuRelax relax
{cpu_relax_iterations_parse_
, &scope
};
 
  670  for (
auto p = values.begin(); p != values.end(); ++p) {
 
  673      using pg_cache::detail::CacheInsertOrAssign;
 
  675          *data_cache, pg_cache::detail::ExtractValue<PostgreCachePolicy>(*p),
 
  676          PostgreCachePolicy::kKeyMember);
 
  677    } 
catch (
const std::exception& e) {
 
  678      stats_scope.IncreaseDocumentsParseFailures(1);
 
  679      LOG_ERROR() << 
"Error parsing data row in cache '" << kName << 
"' to '" 
  680                  << compiler::GetTypeName<ValueType>() << 
"': " << e.what();
 
  685template <
typename PostgreCachePolicy>
 
  686typename PostgreCache<PostgreCachePolicy>::CachedData
 
  687PostgreCache<PostgreCachePolicy>::GetDataSnapshot(cache::
UpdateType type,
 
  690    auto data = 
this->Get();
 
  692      return pg_cache::detail::CopyContainer(*data, cpu_relax_iterations_copy_,
 
  696  return std::make_unique<DataType>();
 
  701std::string GetPostgreCacheSchema();
 
  705template <
typename PostgreCachePolicy>
 
  706yaml_config::Schema PostgreCache<PostgreCachePolicy>::GetStaticConfigSchema() {
 
  708      typename pg_cache::detail::PolicyChecker<PostgreCachePolicy>::BaseType;
 
  709  return yaml_config::MergeSchemas<ParentType>(impl::GetPostgreCacheSchema());
 
  714namespace utils::impl::projected_set {
 
  716template <
typename Set, 
typename Value, 
typename KeyMember>
 
  717void CacheInsertOrAssign(Set& set, Value&& value,
 
  719  DoInsert(set, std::forward<Value>(value));