11#include <unordered_map>
13#include <fmt/format.h>
17#include <userver/components/component_config.hpp>
33USERVER_NAMESPACE_BEGIN
112namespace pg_cache::detail {
115using ValueType =
typename T::ValueType;
117inline constexpr bool kHasValueType = meta::kIsDetected<ValueType, T>;
120using RawValueTypeImpl =
typename T::RawValueType;
122inline constexpr bool kHasRawValueType = meta::kIsDetected<RawValueTypeImpl, T>;
126template <
typename PostgreCachePolicy>
127auto ExtractValue(RawValueType<PostgreCachePolicy>&& raw) {
128 if constexpr (kHasRawValueType<PostgreCachePolicy>) {
129 return Convert(std::move(raw),
132 return std::move(raw);
138using HasNameImpl = std::enable_if_t<!std::string_view{T::kName}.empty()>;
140inline constexpr bool kHasName = meta::kIsDetected<HasNameImpl, T>;
144using HasQueryImpl =
decltype(T::kQuery);
146inline constexpr bool kHasQuery = meta::kIsDetected<HasQueryImpl, T>;
150using HasGetQueryImpl =
decltype(T::GetQuery());
152inline constexpr bool kHasGetQuery = meta::kIsDetected<HasGetQueryImpl, T>;
156using HasWhere =
decltype(T::kWhere);
158inline constexpr bool kHasWhere = meta::kIsDetected<HasWhere, T>;
162using HasUpdatedField =
decltype(T::kUpdatedField);
164inline constexpr bool kHasUpdatedField = meta::kIsDetected<HasUpdatedField, T>;
167using WantIncrementalUpdates =
168 std::enable_if_t<!std::string_view{T::kUpdatedField}.empty()>;
170inline constexpr bool kWantIncrementalUpdates =
171 meta::kIsDetected<WantIncrementalUpdates, T>;
175using KeyMemberTypeImpl =
176 std::decay_t<std::invoke_result_t<
decltype(T::kKeyMember), ValueType<T>>>;
178inline constexpr bool kHasKeyMember = meta::kIsDetected<KeyMemberTypeImpl, T>;
183template <
typename T,
typename = USERVER_NAMESPACE::utils::
void_t<>>
184struct DataCacheContainer {
185 static_assert(meta::kIsStdHashable<KeyMemberType<T>>,
186 "With default CacheContainer, key type must be std::hash-able");
188 using type = std::unordered_map<KeyMemberType<T>, ValueType<T>>;
192struct DataCacheContainer<
193 T, USERVER_NAMESPACE::
utils::void_t<typename T::CacheContainer>> {
194 using type =
typename T::CacheContainer;
198using DataCacheContainerType =
typename DataCacheContainer<T>::type;
203inline constexpr bool kIsContainerCopiedByElement =
204 meta::kIsInstantiationOf<std::unordered_map, T> ||
205 meta::kIsInstantiationOf<std::map, T>;
208std::unique_ptr<T> CopyContainer(
209 const T& container, [[maybe_unused]] std::size_t cpu_relax_iterations,
211 if constexpr (kIsContainerCopiedByElement<T>) {
212 auto copy = std::make_unique<T>();
213 if constexpr (meta::kIsReservable<T>) {
214 copy->reserve(container.size());
218 for (
const auto& kv : container) {
224 return std::make_unique<T>(container);
228template <
typename Container,
typename Value,
typename KeyMember,
230void CacheInsertOrAssign(Container& container, Value&& value,
231 const KeyMember& key_member, Args&&... ) {
233 static_assert(
sizeof...(Args) == 0);
235 auto key = std::invoke(key_member, value);
236 container.insert_or_assign(std::move(key), std::forward<Value>(value));
240using HasOnWritesDoneImpl =
decltype(std::declval<T&>().OnWritesDone());
243void OnWritesDone(T& container) {
244 if constexpr (meta::kIsDetected<HasOnWritesDoneImpl, T>) {
245 container.OnWritesDone();
250using HasCustomUpdatedImpl =
251 decltype(T::GetLastKnownUpdated(std::declval<DataCacheContainerType<T>>()));
254inline constexpr bool kHasCustomUpdated =
255 meta::kIsDetected<HasCustomUpdatedImpl, T>;
258using UpdatedFieldTypeImpl =
typename T::UpdatedFieldType;
260inline constexpr bool kHasUpdatedFieldType =
261 meta::kIsDetected<UpdatedFieldTypeImpl, T>;
263using UpdatedFieldType =
267constexpr bool CheckUpdatedFieldType() {
268 if constexpr (kHasUpdatedFieldType<T>) {
270 std::is_same_v<
typename T::UpdatedFieldType,
272 std::is_same_v<
typename T::UpdatedFieldType,
274 kHasCustomUpdated<T>,
275 "Invalid UpdatedFieldType, must be either TimePointTz or TimePoint");
277 static_assert(!kWantIncrementalUpdates<T>,
278 "UpdatedFieldType must be explicitly specified when using "
279 "incremental updates");
286using HasClusterHostTypeImpl =
decltype(T::kClusterHostType);
290 if constexpr (meta::kIsDetected<HasClusterHostTypeImpl, T>) {
291 return T::kClusterHostType;
299using HasMayReturnNull =
decltype(T::kMayReturnNull);
302constexpr bool MayReturnNull() {
303 if constexpr (meta::kIsDetected<HasMayReturnNull, T>) {
304 return T::kMayReturnNull;
310template <
typename PostgreCachePolicy>
311struct PolicyChecker {
314 kHasName<PostgreCachePolicy>,
315 "The PosgreSQL cache policy must contain a static member `kName`");
317 kHasValueType<PostgreCachePolicy>,
318 "The PosgreSQL cache policy must define a type alias `ValueType`");
320 kHasKeyMember<PostgreCachePolicy>,
321 "The PostgreSQL cache policy must contain a static member `kKeyMember` "
322 "with a pointer to a data or a function member with the object's key");
323 static_assert(kHasQuery<PostgreCachePolicy> ||
324 kHasGetQuery<PostgreCachePolicy>,
325 "The PosgreSQL cache policy must contain a static data member "
326 "`kQuery` with a select statement or a static member function "
327 "`GetQuery` returning the query");
328 static_assert(!(kHasQuery<PostgreCachePolicy> &&
329 kHasGetQuery<PostgreCachePolicy>),
330 "The PosgreSQL cache policy must define `kQuery` or "
331 "`GetQuery`, not both");
333 kHasUpdatedField<PostgreCachePolicy>,
334 "The PosgreSQL cache policy must contain a static member "
335 "`kUpdatedField`. If you don't want to use incremental updates, "
336 "please set its value to `nullptr`");
337 static_assert(CheckUpdatedFieldType<PostgreCachePolicy>());
339 static_assert(ClusterHostType<PostgreCachePolicy>() &
340 storages::postgres::kClusterHostRolesMask,
341 "Cluster host role must be specified for caching component, "
342 "please be more specific");
345 if constexpr (kHasGetQuery<PostgreCachePolicy>) {
346 return PostgreCachePolicy::GetQuery();
348 return PostgreCachePolicy::kQuery;
353 CachingComponentBase<DataCacheContainerType<PostgreCachePolicy>>;
356inline constexpr std::chrono::minutes kDefaultFullUpdateTimeout{1};
357inline constexpr std::chrono::seconds kDefaultIncrementalUpdateTimeout{1};
358inline constexpr std::chrono::milliseconds kStatementTimeoutOff{0};
359inline constexpr std::chrono::milliseconds kCpuRelaxThreshold{10};
360inline constexpr std::chrono::milliseconds kCpuRelaxInterval{2};
362inline constexpr std::string_view kCopyStage =
"copy_data";
363inline constexpr std::string_view kFetchStage =
"fetch";
364inline constexpr std::string_view kParseStage =
"parse";
366inline constexpr std::size_t kDefaultChunkSize = 1000;
374template <
typename PostgreCachePolicy>
376 :
public pg_cache::detail::PolicyChecker<PostgreCachePolicy>::BaseType {
379 using PolicyType = PostgreCachePolicy;
380 using ValueType = pg_cache::detail::ValueType<PolicyType>;
381 using RawValueType = pg_cache::detail::RawValueType<PolicyType>;
382 using DataType = pg_cache::detail::DataCacheContainerType<PolicyType>;
383 using PolicyCheckerType = pg_cache::detail::PolicyChecker<PostgreCachePolicy>;
384 using UpdatedFieldType =
385 pg_cache::detail::UpdatedFieldType<PostgreCachePolicy>;
389 constexpr static bool kIncrementalUpdates =
390 pg_cache::detail::kWantIncrementalUpdates<PolicyType>;
391 constexpr static auto kClusterHostTypeFlags =
392 pg_cache::detail::ClusterHostType<PolicyType>();
393 constexpr static auto kName = PolicyType::kName;
401 using CachedData = std::unique_ptr<DataType>;
403 UpdatedFieldType GetLastUpdated(
404 std::chrono::system_clock::time_point last_update,
405 const DataType&
cache)
const;
408 const std::chrono::system_clock::time_point& last_update,
409 const std::chrono::system_clock::time_point& now,
412 bool MayReturnNull()
const override;
422 std::chrono::milliseconds ParseCorrection(
const ComponentConfig& config);
424 std::vector<storages::postgres::ClusterPtr> clusters_;
426 const std::chrono::system_clock::duration correction_;
427 const std::chrono::milliseconds full_update_timeout_;
428 const std::chrono::milliseconds incremental_update_timeout_;
429 const std::size_t chunk_size_;
430 std::size_t cpu_relax_iterations_parse_{0};
431 std::size_t cpu_relax_iterations_copy_{0};
434template <
typename PostgreCachePolicy>
435inline constexpr bool kHasValidate<PostgreCache<PostgreCachePolicy>> =
true;
437template <
typename PostgreCachePolicy>
440 : BaseType{config, context},
441 correction_{ParseCorrection(config)},
442 full_update_timeout_{
443 config[
"full-update-op-timeout"].As<
std::chrono::milliseconds>(
444 pg_cache::detail::kDefaultFullUpdateTimeout)},
445 incremental_update_timeout_{
446 config[
"incremental-update-op-timeout"].As<
std::chrono::milliseconds>(
447 pg_cache::detail::kDefaultIncrementalUpdateTimeout)},
448 chunk_size_{config[
"chunk-size"].As<size_t>(
449 pg_cache::detail::kDefaultChunkSize)} {
451 cache::AllowedUpdateTypes::kFullAndIncremental &&
452 !kIncrementalUpdates) {
453 throw std::logic_error(
454 "Incremental update support is requested in config but no update field "
455 "name is specified in traits of '" +
456 config.Name() +
"' cache");
458 if (correction_.count() < 0) {
459 throw std::logic_error(
460 "Refusing to set forward (negative) update correction requested in "
462 config.Name() +
"' cache");
465 const auto pg_alias = config[
"pgcomponent"].
As<std::string>(
"");
466 if (pg_alias.empty()) {
468 "No `pgcomponent` entry in configuration"};
472 clusters_.resize(shard_count);
473 for (
size_t i = 0; i < shard_count; ++i) {
474 clusters_[i] = pg_cluster_comp.GetClusterForShard(i);
477 LOG_INFO() <<
"Cache " << kName <<
" full update query `"
478 << GetAllQuery().Statement() <<
"` incremental update query `"
479 << GetDeltaQuery().Statement() <<
"`";
484template <
typename PostgreCachePolicy>
485PostgreCache<PostgreCachePolicy>::~PostgreCache() {
486 this->StopPeriodicUpdates();
489template <
typename PostgreCachePolicy>
492 if constexpr (pg_cache::detail::kHasWhere<PostgreCachePolicy>) {
493 return {fmt::format(
"{} where {}", query.Statement(),
494 PostgreCachePolicy::kWhere),
501template <
typename PostgreCachePolicy>
503 if constexpr (kIncrementalUpdates) {
506 if constexpr (pg_cache::detail::kHasWhere<PostgreCachePolicy>) {
508 fmt::format(
"{} where ({}) and {} >= $1", query.Statement(),
509 PostgreCachePolicy::kWhere, PolicyType::kUpdatedField),
512 return {fmt::format(
"{} where {} >= $1", query.Statement(),
513 PolicyType::kUpdatedField),
517 return GetAllQuery();
521template <
typename PostgreCachePolicy>
522std::chrono::milliseconds PostgreCache<PostgreCachePolicy>::ParseCorrection(
523 const ComponentConfig& config) {
524 static constexpr std::string_view kUpdateCorrection =
"update-correction";
525 if (pg_cache::detail::kHasCustomUpdated<PostgreCachePolicy> ||
526 this->GetAllowedUpdateTypes() == cache::AllowedUpdateTypes::kOnlyFull) {
527 return config[kUpdateCorrection].As<std::chrono::milliseconds>(0);
529 return config[kUpdateCorrection].As<std::chrono::milliseconds>();
533template <
typename PostgreCachePolicy>
534typename PostgreCache<PostgreCachePolicy>::UpdatedFieldType
535PostgreCache<PostgreCachePolicy>::GetLastUpdated(
536 [[maybe_unused]] std::chrono::system_clock::time_point last_update,
537 const DataType&
cache)
const {
538 if constexpr (pg_cache::detail::kHasCustomUpdated<PostgreCachePolicy>) {
539 return PostgreCachePolicy::GetLastKnownUpdated(
cache);
541 return UpdatedFieldType{last_update - correction_};
545template <
typename PostgreCachePolicy>
546void PostgreCache<PostgreCachePolicy>::Update(
548 const std::chrono::system_clock::time_point& last_update,
549 const std::chrono::system_clock::time_point& ,
552 if constexpr (!kIncrementalUpdates) {
558 ? full_update_timeout_
559 : incremental_update_timeout_;
563 std::string{pg_cache::detail::kCopyStage});
564 auto data_cache = GetDataSnapshot(type, scope);
565 [[maybe_unused]]
const auto old_size = data_cache->size();
567 scope.
Reset(std::string{pg_cache::detail::kFetchStage});
571 for (
auto& cluster : clusters_) {
572 if (chunk_size_ > 0) {
573 auto trx = cluster->Begin(
574 kClusterHostTypeFlags, pg::Transaction::RO,
575 pg::CommandControl{timeout, pg_cache::detail::kStatementTimeoutOff});
577 trx.MakePortal(query, GetLastUpdated(last_update, *data_cache));
579 scope.
Reset(std::string{pg_cache::detail::kFetchStage});
580 auto res = portal.Fetch(chunk_size_);
583 scope.
Reset(std::string{pg_cache::detail::kParseStage});
584 CacheResults(res, data_cache, stats_scope, scope);
585 changes += res.Size();
589 bool has_parameter = query.Statement().find(
'$') != std::string::npos;
590 auto res = has_parameter
592 kClusterHostTypeFlags,
594 timeout, pg_cache::detail::kStatementTimeoutOff},
595 query, GetLastUpdated(last_update, *data_cache))
597 kClusterHostTypeFlags,
599 timeout, pg_cache::detail::kStatementTimeoutOff},
603 scope.
Reset(std::string{pg_cache::detail::kParseStage});
604 CacheResults(res, data_cache, stats_scope, scope);
605 changes += res.Size();
611 if constexpr (pg_cache::detail::kIsContainerCopiedByElement<DataType>) {
613 const auto elapsed_copy =
614 scope.
ElapsedTotal(std::string{pg_cache::detail::kCopyStage});
615 if (elapsed_copy > pg_cache::detail::kCpuRelaxThreshold) {
616 cpu_relax_iterations_copy_ =
static_cast<std::size_t
>(
617 static_cast<double>(old_size) /
618 (elapsed_copy / pg_cache::detail::kCpuRelaxInterval));
619 LOG_TRACE() <<
"Elapsed time for copying " << kName <<
" "
620 << elapsed_copy.count() <<
" for " << changes
621 <<
" data items is over threshold. Will relax CPU every "
622 << cpu_relax_iterations_parse_ <<
" iterations";
628 const auto elapsed_parse =
629 scope.
ElapsedTotal(std::string{pg_cache::detail::kParseStage});
630 if (elapsed_parse > pg_cache::detail::kCpuRelaxThreshold) {
631 cpu_relax_iterations_parse_ =
static_cast<std::size_t
>(
632 static_cast<double>(changes) /
633 (elapsed_parse / pg_cache::detail::kCpuRelaxInterval));
634 LOG_TRACE() <<
"Elapsed time for parsing " << kName <<
" "
635 << elapsed_parse.count() <<
" for " << changes
636 <<
" data items is over threshold. Will relax CPU every "
637 << cpu_relax_iterations_parse_ <<
" iterations";
642 stats_scope.
Finish(data_cache->size());
643 pg_cache::detail::OnWritesDone(*data_cache);
644 this->Set(std::move(data_cache));
650template <
typename PostgreCachePolicy>
651bool PostgreCache<PostgreCachePolicy>::MayReturnNull()
const {
652 return pg_cache::detail::MayReturnNull<PolicyType>();
655template <
typename PostgreCachePolicy>
656void PostgreCache<PostgreCachePolicy>::CacheResults(
659 auto values = res.
AsSetOf<RawValueType>(storages::postgres::kRowTag);
661 for (
auto p = values.begin(); p != values.end(); ++p) {
664 using pg_cache::detail::CacheInsertOrAssign;
666 *data_cache, pg_cache::detail::ExtractValue<PostgreCachePolicy>(*p),
667 PostgreCachePolicy::kKeyMember);
668 }
catch (
const std::exception& e) {
670 LOG_ERROR() <<
"Error parsing data row in cache '" << kName <<
"' to '"
671 << compiler::GetTypeName<ValueType>() <<
"': " << e.what();
676template <
typename PostgreCachePolicy>
677typename PostgreCache<PostgreCachePolicy>::CachedData
681 auto data = this->Get();
683 return pg_cache::detail::CopyContainer(*data, cpu_relax_iterations_copy_,
687 return std::make_unique<DataType>();
692std::string GetPostgreCacheSchema();
696template <
typename PostgreCachePolicy>
699 typename pg_cache::detail::PolicyChecker<PostgreCachePolicy>::BaseType;
700 return yaml_config::MergeSchemas<ParentType>(impl::GetPostgreCacheSchema());
705namespace utils::impl::projected_set {
707template <
typename Set,
typename Value,
typename KeyMember>
708void CacheInsertOrAssign(Set& set, Value&& value,
710 DoInsert(set, std::forward<Value>(value));