6#include <userver/cache/base_postgres_cache_fwd.hpp> 
   12#include <unordered_map> 
   14#include <fmt/format.h> 
   16#include <userver/cache/cache_statistics.hpp> 
   17#include <userver/cache/caching_component_base.hpp> 
   18#include <userver/components/component_config.hpp> 
   19#include <userver/components/component_context.hpp> 
   21#include <userver/storages/postgres/cluster.hpp> 
   22#include <userver/storages/postgres/component.hpp> 
   23#include <userver/storages/postgres/io/chrono.hpp> 
   25#include <userver/compiler/demangle.hpp> 
   26#include <userver/logging/log.hpp> 
   27#include <userver/tracing/span.hpp> 
   28#include <userver/utils/assert.hpp> 
   29#include <userver/utils/cpu_relax.hpp> 
   30#include <userver/utils/meta.hpp> 
   31#include <userver/utils/void_t.hpp> 
   32#include <userver/yaml_config/merge_schemas.hpp> 
   34USERVER_NAMESPACE_BEGIN
 
  113namespace pg_cache::detail {
 
  116using ValueType = 
typename T::ValueType;
 
  118inline constexpr bool kHasValueType = meta::kIsDetected<ValueType, T>;
 
  121using RawValueTypeImpl = 
typename T::RawValueType;
 
  123inline constexpr bool kHasRawValueType = meta::kIsDetected<RawValueTypeImpl, T>;
 
  125using RawValueType = meta::DetectedOr<ValueType<T>, RawValueTypeImpl, T>;
 
  127template <
typename PostgreCachePolicy>
 
  128auto ExtractValue(RawValueType<PostgreCachePolicy>&& raw) {
 
  129  if constexpr (kHasRawValueType<PostgreCachePolicy>) {
 
  130    return Convert(std::move(raw),
 
  131                   formats::parse::To<ValueType<PostgreCachePolicy>>());
 
  133    return std::move(raw);
 
  139using HasNameImpl = std::enable_if_t<!std::string_view{T::kName}.empty()>;
 
  141inline constexpr bool kHasName = meta::kIsDetected<HasNameImpl, T>;
 
  145using HasQueryImpl = 
decltype(T::kQuery);
 
  147inline constexpr bool kHasQuery = meta::kIsDetected<HasQueryImpl, T>;
 
  151using HasGetQueryImpl = 
decltype(T::GetQuery());
 
  153inline constexpr bool kHasGetQuery = meta::kIsDetected<HasGetQueryImpl, T>;
 
  157using HasWhere = 
decltype(T::kWhere);
 
  159inline constexpr bool kHasWhere = meta::kIsDetected<HasWhere, T>;
 
  163using HasUpdatedField = 
decltype(T::kUpdatedField);
 
  165inline constexpr bool kHasUpdatedField = meta::kIsDetected<HasUpdatedField, T>;
 
  168using WantIncrementalUpdates =
 
  169    std::enable_if_t<!std::string_view{T::kUpdatedField}.empty()>;
 
  171inline constexpr bool kWantIncrementalUpdates =
 
  172    meta::kIsDetected<WantIncrementalUpdates, T>;
 
  176using KeyMemberTypeImpl =
 
  177    std::decay_t<std::invoke_result_t<
decltype(T::kKeyMember), ValueType<T>>>;
 
  179inline constexpr bool kHasKeyMember = meta::kIsDetected<KeyMemberTypeImpl, T>;
 
  181using KeyMemberType = meta::DetectedType<KeyMemberTypeImpl, T>;
 
  184template <
typename T, 
typename = USERVER_NAMESPACE::utils::void_t<>>
 
  185struct DataCacheContainer {
 
  186  static_assert(meta::kIsStdHashable<KeyMemberType<T>>,
 
  187                "With default CacheContainer, key type must be std::hash-able");
 
  189  using type = std::unordered_map<KeyMemberType<T>, ValueType<T>>;
 
  193struct DataCacheContainer<
 
  194    T, USERVER_NAMESPACE::utils::void_t<
typename T::CacheContainer>> {
 
  195  using type = 
typename T::CacheContainer;
 
  199using DataCacheContainerType = 
typename DataCacheContainer<T>::type;
 
  204inline constexpr bool kIsContainerCopiedByElement =
 
  205    meta::kIsInstantiationOf<std::unordered_map, T> ||
 
  206    meta::kIsInstantiationOf<std::map, T>;
 
  209std::unique_ptr<T> CopyContainer(
 
  210    const T& container, [[maybe_unused]] std::size_t cpu_relax_iterations,
 
  211    tracing::ScopeTime& scope) {
 
  212  if constexpr (kIsContainerCopiedByElement<T>) {
 
  213    auto copy = std::make_unique<T>();
 
  214    if constexpr (meta::kIsReservable<T>) {
 
  215      copy->reserve(container.size());
 
  219    for (
const auto& kv : container) {
 
  225    return std::make_unique<T>(container);
 
  229template <
typename Container, 
typename Value, 
typename KeyMember,
 
  231void CacheInsertOrAssign(Container& container, Value&& value,
 
  232                         const KeyMember& key_member, Args&&... ) {
 
  234  static_assert(
sizeof...(Args) == 0);
 
  236  auto key = std::invoke(key_member, value);
 
  237  container.insert_or_assign(std::move(key), std::forward<Value>(value));
 
  241using HasOnWritesDoneImpl = 
decltype(std::declval<T&>().OnWritesDone());
 
  244void OnWritesDone(T& container) {
 
  245  if constexpr (meta::kIsDetected<HasOnWritesDoneImpl, T>) {
 
  246    container.OnWritesDone();
 
  251using HasCustomUpdatedImpl =
 
  252    decltype(T::GetLastKnownUpdated(std::declval<DataCacheContainerType<T>>()));
 
  255inline constexpr bool kHasCustomUpdated =
 
  256    meta::kIsDetected<HasCustomUpdatedImpl, T>;
 
  259using UpdatedFieldTypeImpl = 
typename T::UpdatedFieldType;
 
  261inline constexpr bool kHasUpdatedFieldType =
 
  262    meta::kIsDetected<UpdatedFieldTypeImpl, T>;
 
  264using UpdatedFieldType =
 
  265    meta::DetectedOr<storages::postgres::TimePointTz, UpdatedFieldTypeImpl, T>;
 
  268constexpr bool CheckUpdatedFieldType() {
 
  269  if constexpr (kHasUpdatedFieldType<T>) {
 
  271        std::is_same_v<
typename T::UpdatedFieldType,
 
  272                       storages::postgres::TimePointTz> ||
 
  273            std::is_same_v<
typename T::UpdatedFieldType,
 
  274                           storages::postgres::TimePoint> ||
 
  275            kHasCustomUpdated<T>,
 
  276        "Invalid UpdatedFieldType, must be either TimePointTz or TimePoint");
 
  278    static_assert(!kWantIncrementalUpdates<T>,
 
  279                  "UpdatedFieldType must be explicitly specified when using " 
  280                  "incremental updates");
 
  287using HasClusterHostTypeImpl = 
decltype(T::kClusterHostType);
 
  290constexpr storages::
postgres::ClusterHostTypeFlags ClusterHostType() {
 
  291  if constexpr (meta::kIsDetected<HasClusterHostTypeImpl, T>) {
 
  292    return T::kClusterHostType;
 
  294    return storages::postgres::ClusterHostType::kSlave;
 
  300using HasMayReturnNull = 
decltype(T::kMayReturnNull);
 
  303constexpr bool MayReturnNull() {
 
  304  if constexpr (meta::kIsDetected<HasMayReturnNull, T>) {
 
  305    return T::kMayReturnNull;
 
  311template <
typename PostgreCachePolicy>
 
  312struct PolicyChecker {
 
  315      kHasName<PostgreCachePolicy>,
 
  316      "The PosgreSQL cache policy must contain a static member `kName`");
 
  318      kHasValueType<PostgreCachePolicy>,
 
  319      "The PosgreSQL cache policy must define a type alias `ValueType`");
 
  321      kHasKeyMember<PostgreCachePolicy>,
 
  322      "The PostgreSQL cache policy must contain a static member `kKeyMember` " 
  323      "with a pointer to a data or a function member with the object's key");
 
  324  static_assert(kHasQuery<PostgreCachePolicy> ||
 
  325                    kHasGetQuery<PostgreCachePolicy>,
 
  326                "The PosgreSQL cache policy must contain a static data member " 
  327                "`kQuery` with a select statement or a static member function " 
  328                "`GetQuery` returning the query");
 
  329  static_assert(!(kHasQuery<PostgreCachePolicy> &&
 
  330                  kHasGetQuery<PostgreCachePolicy>),
 
  331                "The PosgreSQL cache policy must define `kQuery` or " 
  332                "`GetQuery`, not both");
 
  334      kHasUpdatedField<PostgreCachePolicy>,
 
  335      "The PosgreSQL cache policy must contain a static member " 
  336      "`kUpdatedField`. If you don't want to use incremental updates, " 
  337      "please set its value to `nullptr`");
 
  338  static_assert(CheckUpdatedFieldType<PostgreCachePolicy>());
 
  340  static_assert(ClusterHostType<PostgreCachePolicy>() &
 
  341                    storages::postgres::kClusterHostRolesMask,
 
  342                "Cluster host role must be specified for caching component, " 
  343                "please be more specific");
 
  346    if constexpr (kHasGetQuery<PostgreCachePolicy>) {
 
  347      return PostgreCachePolicy::GetQuery();
 
  349      return PostgreCachePolicy::kQuery;
 
  354      CachingComponentBase<DataCacheContainerType<PostgreCachePolicy>>;
 
  357inline constexpr std::chrono::minutes kDefaultFullUpdateTimeout{1};
 
  358inline constexpr std::chrono::seconds kDefaultIncrementalUpdateTimeout{1};
 
  359inline constexpr std::chrono::milliseconds kStatementTimeoutOff{0};
 
  360inline constexpr std::chrono::milliseconds kCpuRelaxThreshold{10};
 
  361inline constexpr std::chrono::milliseconds kCpuRelaxInterval{2};
 
  363inline constexpr std::string_view kCopyStage = 
"copy_data";
 
  364inline constexpr std::string_view kFetchStage = 
"fetch";
 
  365inline constexpr std::string_view kParseStage = 
"parse";
 
  367inline constexpr std::size_t kDefaultChunkSize = 1000;
 
  375template <
typename PostgreCachePolicy>
 
  376class PostgreCache 
final 
  377    : 
public pg_cache::detail::PolicyChecker<PostgreCachePolicy>::BaseType {
 
  380  using PolicyType = PostgreCachePolicy;
 
  381  using ValueType = pg_cache::detail::ValueType<PolicyType>;
 
  382  using RawValueType = pg_cache::detail::RawValueType<PolicyType>;
 
  383  using DataType = pg_cache::detail::DataCacheContainerType<PolicyType>;
 
  384  using PolicyCheckerType = pg_cache::detail::PolicyChecker<PostgreCachePolicy>;
 
  385  using UpdatedFieldType =
 
  386      pg_cache::detail::UpdatedFieldType<PostgreCachePolicy>;
 
  387  using BaseType = 
typename PolicyCheckerType::BaseType;
 
  390  constexpr static bool kIncrementalUpdates =
 
  391      pg_cache::detail::kWantIncrementalUpdates<PolicyType>;
 
  392  constexpr static auto kClusterHostTypeFlags =
 
  393      pg_cache::detail::ClusterHostType<PolicyType>();
 
  394  constexpr static auto kName = PolicyType::kName;
 
  396  PostgreCache(
const ComponentConfig&, 
const ComponentContext&);
 
  397  ~PostgreCache() 
override;
 
  399  static yaml_config::Schema GetStaticConfigSchema();
 
  402  using CachedData = std::unique_ptr<DataType>;
 
  404  UpdatedFieldType GetLastUpdated(
 
  405      std::chrono::system_clock::time_point last_update,
 
  406      const DataType& cache) 
const;
 
  409              const std::chrono::system_clock::time_point& last_update,
 
  410              const std::chrono::system_clock::time_point& now,
 
  411              cache::UpdateStatisticsScope& stats_scope) 
override;
 
  413  bool MayReturnNull() 
const override;
 
  415  CachedData GetDataSnapshot(cache::UpdateType type, tracing::ScopeTime& scope);
 
  417                    cache::UpdateStatisticsScope& stats_scope,
 
  423  std::chrono::milliseconds ParseCorrection(
const ComponentConfig& config);
 
  425  std::vector<storages::postgres::ClusterPtr> clusters_;
 
  427  const std::chrono::system_clock::duration correction_;
 
  428  const std::chrono::milliseconds full_update_timeout_;
 
  429  const std::chrono::milliseconds incremental_update_timeout_;
 
  430  const std::size_t chunk_size_;
 
  431  std::size_t cpu_relax_iterations_parse_{0};
 
  432  std::size_t cpu_relax_iterations_copy_{0};
 
  435template <
typename PostgreCachePolicy>
 
  436inline constexpr bool kHasValidate<PostgreCache<PostgreCachePolicy>> = 
true;
 
  438template <
typename PostgreCachePolicy>
 
  439PostgreCache<PostgreCachePolicy>::PostgreCache(
const ComponentConfig& config,
 
  440                                               const ComponentContext& context)
 
  441    : BaseType{config, context},
 
  442      correction_{ParseCorrection(config)},
 
  443      full_update_timeout_{
 
  444          config[
"full-update-op-timeout"].As<std::chrono::milliseconds>(
 
  445              pg_cache::detail::kDefaultFullUpdateTimeout)},
 
  446      incremental_update_timeout_{
 
  447          config[
"incremental-update-op-timeout"].As<std::chrono::milliseconds>(
 
  448              pg_cache::detail::kDefaultIncrementalUpdateTimeout)},
 
  449      chunk_size_{config[
"chunk-size"].As<size_t>(
 
  450          pg_cache::detail::kDefaultChunkSize)} {
 
  451  if (
this->GetAllowedUpdateTypes() ==
 
  453      !kIncrementalUpdates) {
 
  454    throw std::logic_error(
 
  455        "Incremental update support is requested in config but no update field " 
  456        "name is specified in traits of '" +
 
  457        config.Name() + 
"' cache");
 
  459  if (correction_.count() < 0) {
 
  460    throw std::logic_error(
 
  461        "Refusing to set forward (negative) update correction requested in " 
  463        config.Name() + 
"' cache");
 
  466  const auto pg_alias = config[
"pgcomponent"].As<std::string>(
"");
 
  467  if (pg_alias.empty()) {
 
  469        "No `pgcomponent` entry in configuration"};
 
  471  auto& pg_cluster_comp = context.FindComponent<components::Postgres>(pg_alias);
 
  472  const auto shard_count = pg_cluster_comp.GetShardCount();
 
  473  clusters_.resize(shard_count);
 
  474  for (size_t i = 0; i < shard_count; ++i) {
 
  475    clusters_[i] = pg_cluster_comp.GetClusterForShard(i);
 
  478  LOG_INFO() << 
"Cache " << kName << 
" full update query `" 
  479             << GetAllQuery().Statement() << 
"` incremental update query `" 
  480             << GetDeltaQuery().Statement() << 
"`";
 
  482  this->StartPeriodicUpdates();
 
  485template <
typename PostgreCachePolicy>
 
  486PostgreCache<PostgreCachePolicy>::~PostgreCache() {
 
  487  this->StopPeriodicUpdates();
 
  490template <
typename PostgreCachePolicy>
 
  491storages::
postgres::
Query PostgreCache<PostgreCachePolicy>::GetAllQuery() {
 
  492  storages::
postgres::
Query query = PolicyCheckerType::GetQuery();
 
  493  if constexpr (pg_cache::detail::kHasWhere<PostgreCachePolicy>) {
 
  494    return {fmt::format(
"{} where {}", query.Statement(),
 
  495                        PostgreCachePolicy::kWhere),
 
  502template <
typename PostgreCachePolicy>
 
  503storages::
postgres::
Query PostgreCache<PostgreCachePolicy>::GetDeltaQuery() {
 
  504  if constexpr (kIncrementalUpdates) {
 
  505    storages::
postgres::
Query query = PolicyCheckerType::GetQuery();
 
  507    if constexpr (pg_cache::detail::kHasWhere<PostgreCachePolicy>) {
 
  509          fmt::format(
"{} where ({}) and {} >= $1", query.Statement(),
 
  510                      PostgreCachePolicy::kWhere, PolicyType::kUpdatedField),
 
  513      return {fmt::format(
"{} where {} >= $1", query.Statement(),
 
  514                          PolicyType::kUpdatedField),
 
  518    return GetAllQuery();
 
  522template <
typename PostgreCachePolicy>
 
  523std::chrono::milliseconds PostgreCache<PostgreCachePolicy>::ParseCorrection(
 
  524    const ComponentConfig& config) {
 
  525  static constexpr std::string_view kUpdateCorrection = 
"update-correction";
 
  526  if (pg_cache::detail::kHasCustomUpdated<PostgreCachePolicy> ||
 
  528    return config[kUpdateCorrection].As<std::chrono::milliseconds>(0);
 
  530    return config[kUpdateCorrection].As<std::chrono::milliseconds>();
 
  534template <
typename PostgreCachePolicy>
 
  535typename PostgreCache<PostgreCachePolicy>::UpdatedFieldType
 
  536PostgreCache<PostgreCachePolicy>::GetLastUpdated(
 
  537    [[maybe_unused]] std::chrono::system_clock::time_point last_update,
 
  538    const DataType& cache) 
const {
 
  539  if constexpr (pg_cache::detail::kHasCustomUpdated<PostgreCachePolicy>) {
 
  540    return PostgreCachePolicy::GetLastKnownUpdated(cache);
 
  542    return UpdatedFieldType{last_update - correction_};
 
  546template <
typename PostgreCachePolicy>
 
  547void PostgreCache<PostgreCachePolicy>::Update(
 
  549    const std::chrono::system_clock::time_point& last_update,
 
  550    const std::chrono::system_clock::time_point& ,
 
  551    cache::UpdateStatisticsScope& stats_scope) {
 
  553  if constexpr (!kIncrementalUpdates) {
 
  558  const std::chrono::milliseconds timeout = (type == cache::UpdateType::kFull)
 
  559                                                ? full_update_timeout_
 
  560                                                : incremental_update_timeout_;
 
  563  auto scope = tracing::Span::CurrentSpan().CreateScopeTime(
 
  564      std::string{pg_cache::detail::kCopyStage});
 
  565  auto data_cache = GetDataSnapshot(type, scope);
 
  566  [[maybe_unused]] 
const auto old_size = data_cache->size();
 
  568  scope.Reset(std::string{pg_cache::detail::kFetchStage});
 
  572  for (
auto& cluster : clusters_) {
 
  573    if (chunk_size_ > 0) {
 
  574      auto trx = cluster->Begin(
 
  575          kClusterHostTypeFlags, pg::Transaction::RO,
 
  576          pg::CommandControl{timeout, pg_cache::detail::kStatementTimeoutOff});
 
  578          trx.MakePortal(query, GetLastUpdated(last_update, *data_cache));
 
  580        scope.Reset(std::string{pg_cache::detail::kFetchStage});
 
  581        auto res = portal.Fetch(chunk_size_);
 
  582        stats_scope.IncreaseDocumentsReadCount(res.Size());
 
  584        scope.Reset(std::string{pg_cache::detail::kParseStage});
 
  585        CacheResults(res, data_cache, stats_scope, scope);
 
  586        changes += res.Size();
 
  590      bool has_parameter = query.Statement().find(
'$') != std::string::npos;
 
  591      auto res = has_parameter
 
  593                           kClusterHostTypeFlags,
 
  595                               timeout, pg_cache::detail::kStatementTimeoutOff},
 
  596                           query, GetLastUpdated(last_update, *data_cache))
 
  598                           kClusterHostTypeFlags,
 
  600                               timeout, pg_cache::detail::kStatementTimeoutOff},
 
  602      stats_scope.IncreaseDocumentsReadCount(res.Size());
 
  604      scope.Reset(std::string{pg_cache::detail::kParseStage});
 
  605      CacheResults(res, data_cache, stats_scope, scope);
 
  606      changes += res.Size();
 
  612  if constexpr (pg_cache::detail::kIsContainerCopiedByElement<DataType>) {
 
  614      const auto elapsed_copy =
 
  615          scope.ElapsedTotal(std::string{pg_cache::detail::kCopyStage});
 
  616      if (elapsed_copy > pg_cache::detail::kCpuRelaxThreshold) {
 
  617        cpu_relax_iterations_copy_ = 
static_cast<std::size_t>(
 
  618            static_cast<
double>(old_size) /
 
  619            (elapsed_copy / pg_cache::detail::kCpuRelaxInterval));
 
  620        LOG_TRACE() << 
"Elapsed time for copying " << kName << 
" " 
  621                    << elapsed_copy.count() << 
" for " << changes
 
  622                    << 
" data items is over threshold. Will relax CPU every " 
  623                    << cpu_relax_iterations_parse_ << 
" iterations";
 
  629    const auto elapsed_parse =
 
  630        scope.ElapsedTotal(std::string{pg_cache::detail::kParseStage});
 
  631    if (elapsed_parse > pg_cache::detail::kCpuRelaxThreshold) {
 
  632      cpu_relax_iterations_parse_ = 
static_cast<std::size_t>(
 
  633          static_cast<
double>(changes) /
 
  634          (elapsed_parse / pg_cache::detail::kCpuRelaxInterval));
 
  635      LOG_TRACE() << 
"Elapsed time for parsing " << kName << 
" " 
  636                  << elapsed_parse.count() << 
" for " << changes
 
  637                  << 
" data items is over threshold. Will relax CPU every " 
  638                  << cpu_relax_iterations_parse_ << 
" iterations";
 
  641  if (changes > 0 || type == cache::UpdateType::kFull) {
 
  643    stats_scope.Finish(data_cache->size());
 
  644    pg_cache::detail::OnWritesDone(*data_cache);
 
  645    this->Set(std::move(data_cache));
 
  651template <
typename PostgreCachePolicy>
 
  652bool PostgreCache<PostgreCachePolicy>::MayReturnNull() 
const {
 
  653  return pg_cache::detail::MayReturnNull<PolicyType>();
 
  656template <
typename PostgreCachePolicy>
 
  657void PostgreCache<PostgreCachePolicy>::CacheResults(
 
  660  auto values = res.AsSetOf<RawValueType>(storages::postgres::kRowTag);
 
  662  for (
auto p = values.begin(); p != values.end(); ++p) {
 
  665      using pg_cache::detail::CacheInsertOrAssign;
 
  667          *data_cache, pg_cache::detail::ExtractValue<PostgreCachePolicy>(*p),
 
  668          PostgreCachePolicy::kKeyMember);
 
  669    } 
catch (
const std::exception& e) {
 
  670      stats_scope.IncreaseDocumentsParseFailures(1);
 
  671      LOG_ERROR() << 
"Error parsing data row in cache '" << kName << 
"' to '" 
  672                  << compiler::GetTypeName<ValueType>() << 
"': " << e.what();
 
  677template <
typename PostgreCachePolicy>
 
  678typename PostgreCache<PostgreCachePolicy>::CachedData
 
  679PostgreCache<PostgreCachePolicy>::GetDataSnapshot(cache::
UpdateType type,
 
  682    auto data = 
this->Get();
 
  684      return pg_cache::detail::CopyContainer(*data, cpu_relax_iterations_copy_,
 
  688  return std::make_unique<DataType>();
 
  693std::string GetPostgreCacheSchema();
 
  697template <
typename PostgreCachePolicy>
 
  698yaml_config::Schema PostgreCache<PostgreCachePolicy>::GetStaticConfigSchema() {
 
  700      typename pg_cache::detail::PolicyChecker<PostgreCachePolicy>::BaseType;
 
  701  return yaml_config::MergeSchemas<ParentType>(impl::GetPostgreCacheSchema());
 
  706namespace utils::impl::projected_set {
 
  708template <
typename Set, 
typename Value, 
typename KeyMember>
 
  709void CacheInsertOrAssign(Set& set, Value&& value,
 
  711  DoInsert(set, std::forward<Value>(value));