userver: userver/cache/base_postgres_cache.hpp Source File
Loading...
Searching...
No Matches
base_postgres_cache.hpp
Go to the documentation of this file.
1#pragma once
2
3/// @file userver/cache/base_postgres_cache.hpp
4/// @brief @copybrief components::PostgreCache
5
6#include <userver/cache/base_postgres_cache_fwd.hpp>
7
8#include <chrono>
9#include <map>
10#include <string_view>
11#include <type_traits>
12#include <unordered_map>
13
14#include <fmt/format.h>
15
16#include <userver/cache/cache_statistics.hpp>
17#include <userver/cache/caching_component_base.hpp>
18#include <userver/components/component_config.hpp>
19#include <userver/components/component_context.hpp>
20
21#include <userver/storages/postgres/cluster.hpp>
22#include <userver/storages/postgres/component.hpp>
23#include <userver/storages/postgres/io/chrono.hpp>
24
25#include <userver/compiler/demangle.hpp>
26#include <userver/engine/sleep.hpp>
27#include <userver/logging/log.hpp>
28#include <userver/tracing/span.hpp>
29#include <userver/utils/assert.hpp>
30#include <userver/utils/cpu_relax.hpp>
31#include <userver/utils/meta.hpp>
32#include <userver/yaml_config/merge_schemas.hpp>
33
34USERVER_NAMESPACE_BEGIN
35
36namespace components {
37
38namespace pg_cache::detail {
39
40template <typename T>
41using ValueType = typename T::ValueType;
42template <typename T>
43concept HasValueType = requires { typename T::ValueType; };
44
45template <typename T>
46struct RawValueTypeImpl : std::type_identity<ValueType<T>> {};
47
48template <typename T>
49concept HasRawValueType = requires { typename T::RawValueType; };
50
51template <HasRawValueType T>
52struct RawValueTypeImpl<T> : std::type_identity<typename T::RawValueType> {};
53
54template <typename T>
55using RawValueType = typename RawValueTypeImpl<T>::type;
56
57template <typename PostgreCachePolicy>
58auto ExtractValue(RawValueType<PostgreCachePolicy>&& raw) {
59 if constexpr (HasRawValueType<PostgreCachePolicy>) {
60 return Convert(std::move(raw), formats::parse::To<ValueType<PostgreCachePolicy>>());
61 } else {
62 return std::move(raw);
63 }
64}
65
66// Component name in policy
67template <typename T>
68concept HasName = requires {
69 requires !std::string_view { T::kName }
70 .empty();
71};
72
73// Component query in policy
74template <typename T>
75concept HasQuery = requires { T::kQuery; };
76
77// Component GetQuery in policy
78template <typename T>
79concept HasGetQuery = requires { T::GetQuery(); };
80
81// Component kWhere in policy
82template <typename T>
83concept HasWhere = requires { T::kWhere; };
84
85// Component kOrderBy in policy
86template <typename T>
87concept HasOrderBy = requires { T::kOrderBy; };
88
89// Component kTag in policy
90template <typename T>
91concept HasTag = requires { T::kTag; };
92
93template <typename PostgreCachePolicy>
94auto GetTag() {
95 if constexpr (HasTag<PostgreCachePolicy>) {
96 return PostgreCachePolicy::kTag;
97 } else {
98 return storages::postgres::kRowTag;
99 }
100}
101
102// Update field
103template <typename T>
104concept HasUpdatedField = requires { T::kUpdatedField; };
105
106template <typename T>
107concept WantIncrementalUpdates = requires {
108 {
109 std::integral_constant<bool, !std::string_view{T::kUpdatedField}.empty()>{}
110 } -> std::same_as<std::true_type>;
111};
112
113// Key member in policy
114template <typename T>
115concept HasKeyMember = requires { T::kKeyMember; } && std::invocable<decltype(T::kKeyMember), ValueType<T>>;
116
117template <typename T>
118using KeyMemberType = std::decay_t<std::invoke_result_t<decltype(T::kKeyMember), ValueType<T>>>;
119
120// size method in custom container in policy
121template <typename T>
122concept HasSizeMethod = requires(T t) {
123 {
124 t.size()
125 } -> std::convertible_to<std::size_t>;
126};
127
128// insert_or_assign method in custom container in policy
129template <typename T>
130concept HasInsertOrAssignMethod = requires(typename T::CacheContainer& c, KeyMemberType<T> key, ValueType<T> val) {
131 c.insert_or_assign(std::move(key), std::move(val));
132};
133
134// CacheInsertOrAssign function for custom container in policy
135template <typename T>
136concept HasCacheInsertOrAssignFunction =
137 requires(typename T::CacheContainer& c, ValueType<T> val, KeyMemberType<T> key) {
138 CacheInsertOrAssign(c, std::move(val), std::move(key));
139 };
140
141template <typename T>
142concept PolicyHasCustomCacheContainer = requires { typename T::CacheContainer; };
143
144// Data container for cache
145template <typename T>
146struct DataCacheContainer {
147 static_assert(
148 meta::IsStdHashable<KeyMemberType<T>>,
149 "With default CacheContainer, key type must be std::hash-able"
150 );
151
152 using type = std::unordered_map<KeyMemberType<T>, ValueType<T>>;
153};
154
155template <typename T>
156requires PolicyHasCustomCacheContainer<T>
157struct DataCacheContainer<T> {
158 static_assert(HasSizeMethod<typename T::CacheContainer>, "Custom CacheContainer must provide `size` method");
159 static_assert(
160 HasInsertOrAssignMethod<T> || HasCacheInsertOrAssignFunction<T>,
161 "Custom CacheContainer must provide `insert_or_assign` method similar to std::unordered_map's "
162 "one or CacheInsertOrAssign function"
163 );
164
165 using type = typename T::CacheContainer;
166};
167
168template <typename T>
169using DataCacheContainerType = typename DataCacheContainer<T>::type;
170
171// We have to whitelist container types, for which we perform by-element
172// copying, because it's not correct for certain custom containers.
173template <typename T>
174inline constexpr bool kIsContainerCopiedByElement =
175 meta::kIsInstantiationOf<std::unordered_map, T> || meta::kIsInstantiationOf<std::map, T>;
176
177template <typename T>
178std::unique_ptr<T> CopyContainer(
179 const T& container,
180 [[maybe_unused]] std::size_t cpu_relax_iterations,
181 tracing::ScopeTime& scope
182) {
183 if constexpr (kIsContainerCopiedByElement<T>) {
184 auto copy = std::make_unique<T>();
185 if constexpr (meta::kIsReservable<T>) {
186 copy->reserve(container.size());
187 }
188
189 utils::CpuRelax relax{cpu_relax_iterations, &scope};
190 for (const auto& kv : container) {
191 relax.Relax();
192 copy->insert(kv);
193 }
194 return copy;
195 } else {
196 return std::make_unique<T>(container);
197 }
198}
199
200template <typename Container, typename Value, typename KeyMember, typename... Args>
201void CacheInsertOrAssign(Container& container, Value&& value, const KeyMember& key_member, Args&&... /*args*/) {
202 // Args are only used to de-prioritize this default overload.
203 static_assert(sizeof...(Args) == 0);
204 // Copy 'key' to avoid aliasing issues in 'insert_or_assign'.
205 auto key = std::invoke(key_member, value);
206 container.insert_or_assign(std::move(key), std::forward<Value>(value));
207}
208
209template <typename T>
210void OnWritesDone(T& container) {
211 if constexpr (requires(T& c) { c.OnWritesDone(); }) {
212 container.OnWritesDone();
213 }
214}
215
216template <typename T>
217concept HasCustomUpdated = requires(const DataCacheContainerType<T>& cache) { T::GetLastKnownUpdated(cache); };
218
219template <typename T>
220struct UpdatedFieldTypeImpl : std::type_identity<storages::postgres::TimePointTz> {};
221
222template <typename T>
223concept HasUpdatedFieldType = requires { typename T::UpdatedFieldType; };
224
225template <HasUpdatedFieldType T>
226struct UpdatedFieldTypeImpl<T> : std::type_identity<typename T::UpdatedFieldType> {};
227
228template <typename T>
229using UpdatedFieldType = typename UpdatedFieldTypeImpl<T>::type;
230
231template <typename T>
232constexpr bool CheckUpdatedFieldType() {
233 if constexpr (HasUpdatedFieldType<T>) {
234#if USERVER_POSTGRES_ENABLE_LEGACY_TIMESTAMP
235 static_assert(
236 std::is_same_v<typename T::UpdatedFieldType, storages::postgres::TimePointTz> ||
237 std::is_same_v<typename T::UpdatedFieldType, storages::postgres::TimePointWithoutTz> ||
238 std::is_same_v<typename T::UpdatedFieldType, storages::postgres::TimePoint> || HasCustomUpdated<T>,
239 "Invalid UpdatedFieldType, must be either TimePointTz or "
240 "TimePointWithoutTz"
241 "or (legacy) system_clock::time_point"
242 );
243#else
244 static_assert(
245 std::is_same_v<typename T::UpdatedFieldType, storages::postgres::TimePointTz> ||
246 std::is_same_v<typename T::UpdatedFieldType, storages::postgres::TimePointWithoutTz> ||
247 HasCustomUpdated<T>,
248 "Invalid UpdatedFieldType, must be either TimePointTz or "
249 "TimePointWithoutTz"
250 );
251#endif
252 } else {
253 static_assert(
254 !WantIncrementalUpdates<T>,
255 "UpdatedFieldType must be explicitly specified when using "
256 "incremental updates"
257 );
258 }
259 return true;
260}
261
262// Cluster host type policy
263template <typename T>
264constexpr storages::postgres::ClusterHostTypeFlags ClusterHostType() {
265 if constexpr (requires { T::kClusterHostType; }) {
266 return T::kClusterHostType;
267 } else {
269 }
270}
271
272// May return null policy
273template <typename T>
274constexpr bool MayReturnNull() {
275 if constexpr (requires { T::kMayReturnNull; }) {
276 return T::kMayReturnNull;
277 } else {
278 return false;
279 }
280}
281
282template <typename PostgreCachePolicy>
283struct PolicyChecker {
284 // Static assertions for cache traits
285 static_assert(HasName<PostgreCachePolicy>, "The PosgreSQL cache policy must contain a static member `kName`");
286 static_assert(HasValueType<PostgreCachePolicy>, "The PosgreSQL cache policy must define a type alias `ValueType`");
287 static_assert(
288 HasKeyMember<PostgreCachePolicy>,
289 "The PostgreSQL cache policy must contain a static member `kKeyMember` "
290 "with a pointer to a data or a function member with the object's key"
291 );
292 static_assert(
293 HasQuery<PostgreCachePolicy> || HasGetQuery<PostgreCachePolicy>,
294 "The PosgreSQL cache policy must contain a static data member "
295 "`kQuery` with a select statement or a static member function "
296 "`GetQuery` returning the query"
297 );
298 static_assert(
299 !(HasQuery<PostgreCachePolicy> && HasGetQuery<PostgreCachePolicy>),
300 "The PosgreSQL cache policy must define `kQuery` or "
301 "`GetQuery`, not both"
302 );
303 static_assert(
304 HasUpdatedField<PostgreCachePolicy>,
305 "The PosgreSQL cache policy must contain a static member "
306 "`kUpdatedField`. If you don't want to use incremental updates, "
307 "please set its value to `nullptr`"
308 );
309 static_assert(CheckUpdatedFieldType<PostgreCachePolicy>());
310
311 static_assert(
312 ClusterHostType<PostgreCachePolicy>() & storages::postgres::kClusterHostRolesMask,
313 "Cluster host role must be specified for caching component, "
314 "please be more specific"
315 );
316
317 static storages::postgres::Query GetQuery() {
318 if constexpr (HasGetQuery<PostgreCachePolicy>) {
319 return PostgreCachePolicy::GetQuery();
320 } else {
321 return PostgreCachePolicy::kQuery;
322 }
323 }
324
325 using BaseType = CachingComponentBase<DataCacheContainerType<PostgreCachePolicy>>;
326};
327
328inline constexpr std::chrono::minutes kDefaultFullUpdateTimeout{1};
329inline constexpr std::chrono::seconds kDefaultIncrementalUpdateTimeout{1};
330inline constexpr std::chrono::milliseconds kStatementTimeoutOff{0};
331inline constexpr std::chrono::milliseconds kCpuRelaxThreshold{10};
332inline constexpr std::chrono::milliseconds kCpuRelaxInterval{2};
333
334inline constexpr std::string_view kCopyStage = "copy_data";
335inline constexpr std::string_view kFetchStage = "fetch";
336inline constexpr std::string_view kParseStage = "parse";
337
338inline constexpr std::size_t kDefaultChunkSize = 1000;
339inline constexpr std::chrono::milliseconds kDefaultSleepBetweenChunks{0};
340} // namespace pg_cache::detail
341
342/// @ingroup userver_components
343///
344/// @brief Caching component for PostgreSQL. See @ref scripts/docs/en/userver/pg/cache.md.
345///
346/// @see @ref scripts/docs/en/userver/pg/cache.md, @ref scripts/docs/en/userver/caches.md
347template <typename PostgreCachePolicy>
348class PostgreCache final : public pg_cache::detail::PolicyChecker<PostgreCachePolicy>::BaseType {
349public:
350 // Type aliases
351 using PolicyType = PostgreCachePolicy;
352 using ValueType = pg_cache::detail::ValueType<PolicyType>;
353 using RawValueType = pg_cache::detail::RawValueType<PolicyType>;
354 using DataType = pg_cache::detail::DataCacheContainerType<PolicyType>;
355 using PolicyCheckerType = pg_cache::detail::PolicyChecker<PostgreCachePolicy>;
356 using UpdatedFieldType = pg_cache::detail::UpdatedFieldType<PostgreCachePolicy>;
357 using BaseType = typename PolicyCheckerType::BaseType;
358
359 // Calculated constants
360 constexpr static bool kIncrementalUpdates = pg_cache::detail::WantIncrementalUpdates<PolicyType>;
361 constexpr static auto kClusterHostTypeFlags = pg_cache::detail::ClusterHostType<PolicyType>();
362 constexpr static auto kName = PolicyType::kName;
363
364 PostgreCache(const ComponentConfig&, const ComponentContext&);
365
366 static yaml_config::Schema GetStaticConfigSchema();
367
368private:
369 using CachedData = std::unique_ptr<DataType>;
370
371 UpdatedFieldType GetLastUpdated(std::chrono::system_clock::time_point last_update, const DataType& cache) const;
372
373 void Update(
374 cache::UpdateType type,
375 const std::chrono::system_clock::time_point& last_update,
376 const std::chrono::system_clock::time_point& now,
377 cache::UpdateStatisticsScope& stats_scope
378 ) override;
379
380 bool MayReturnNull() const override;
381
382 CachedData GetDataSnapshot(cache::UpdateType type, tracing::ScopeTime& scope);
383 void CacheResults(
384 storages::postgres::ResultSet res,
385 CachedData& data_cache,
386 cache::UpdateStatisticsScope& stats_scope,
387 tracing::ScopeTime& scope
388 );
389
390 static storages::postgres::Query GetAllQuery();
391 static storages::postgres::Query GetDeltaQuery();
392 static std::string GetWhereClause();
393 static std::string GetDeltaWhereClause();
394 static std::string GetOrderByClause();
395
396 std::chrono::milliseconds ParseCorrection(const ComponentConfig& config);
397
398 std::vector<storages::postgres::ClusterPtr> clusters_;
399
400 const std::chrono::system_clock::duration correction_;
401 const std::chrono::milliseconds full_update_timeout_;
402 const std::chrono::milliseconds incremental_update_timeout_;
403 const std::size_t chunk_size_;
404 const std::chrono::milliseconds sleep_between_chunks_;
405 std::size_t cpu_relax_iterations_parse_{0};
406 std::size_t cpu_relax_iterations_copy_{0};
407};
408
409template <typename PostgreCachePolicy>
410inline constexpr bool kHasValidate<PostgreCache<PostgreCachePolicy>> = true;
411
412template <typename PostgreCachePolicy>
413PostgreCache<PostgreCachePolicy>::PostgreCache(const ComponentConfig& config, const ComponentContext& context)
414 : BaseType{config, context},
415 correction_{ParseCorrection(config)},
416 full_update_timeout_{
417 config["full-update-op-timeout"].As<std::chrono::milliseconds>(pg_cache::detail::kDefaultFullUpdateTimeout)
418 },
419 incremental_update_timeout_{
420 config["incremental-update-op-timeout"]
421 .As<std::chrono::milliseconds>(pg_cache::detail::kDefaultIncrementalUpdateTimeout)
422 },
423 chunk_size_{config["chunk-size"].As<size_t>(pg_cache::detail::kDefaultChunkSize)},
424 sleep_between_chunks_{
425 config["sleep-between-chunks"].As<std::chrono::milliseconds>(pg_cache::detail::kDefaultSleepBetweenChunks)
426 }
427{
429 !chunk_size_ || storages::postgres::Portal::IsSupportedByDriver(),
430 "Either set 'chunk-size' to 0, or enable PostgreSQL portals by building "
431 "the framework with CMake option USERVER_FEATURE_PATCH_LIBPQ set to ON."
432 );
433
434 if (this->GetAllowedUpdateTypes() == cache::AllowedUpdateTypes::kFullAndIncremental && !kIncrementalUpdates) {
435 throw std::logic_error(
436 "Incremental update support is requested in config but no update field "
437 "name is specified in traits of '" +
438 config.Name() + "' cache"
439 );
440 }
441 if (correction_.count() < 0) {
442 throw std::logic_error(
443 "Refusing to set forward (negative) update correction requested in "
444 "config for '" +
445 config.Name() + "' cache"
446 );
447 }
448
449 const auto pg_alias = config["pgcomponent"].As<std::string>("");
450 if (pg_alias.empty()) {
451 throw storages::postgres::InvalidConfig{"No `pgcomponent` entry in configuration"};
452 }
453 auto& pg_cluster_comp = context.FindComponent<components::Postgres>(pg_alias);
454 const auto shard_count = pg_cluster_comp.GetShardCount();
455 clusters_.resize(shard_count);
456 for (size_t i = 0; i < shard_count; ++i) {
457 clusters_[i] = pg_cluster_comp.GetClusterForShard(i);
458 }
459
460 LOG_INFO()
461 << "Cache " << kName << " full update query `" << GetAllQuery().GetStatementView()
462 << "` incremental update query `" << GetDeltaQuery().GetStatementView() << "`";
463}
464
465template <typename PostgreCachePolicy>
466std::string PostgreCache<PostgreCachePolicy>::GetWhereClause() {
467 if constexpr (pg_cache::detail::HasWhere<PostgreCachePolicy>) {
468 return fmt::format(FMT_COMPILE("where {}"), PostgreCachePolicy::kWhere);
469 } else {
470 return "";
471 }
472}
473
474template <typename PostgreCachePolicy>
475std::string PostgreCache<PostgreCachePolicy>::GetDeltaWhereClause() {
476 if constexpr (pg_cache::detail::HasWhere<PostgreCachePolicy>) {
477 return fmt::format(
478 FMT_COMPILE("where ({}) and {} >= $1"),
479 PostgreCachePolicy::kWhere,
480 PostgreCachePolicy::kUpdatedField
481 );
482 } else {
483 return fmt::format(FMT_COMPILE("where {} >= $1"), PostgreCachePolicy::kUpdatedField);
484 }
485}
486
487template <typename PostgreCachePolicy>
488std::string PostgreCache<PostgreCachePolicy>::GetOrderByClause() {
489 if constexpr (pg_cache::detail::HasOrderBy<PostgreCachePolicy>) {
490 return fmt::format(FMT_COMPILE("order by {}"), PostgreCachePolicy::kOrderBy);
491 } else {
492 return "";
493 }
494}
495
496template <typename PostgreCachePolicy>
497storages::postgres::Query PostgreCache<PostgreCachePolicy>::GetAllQuery() {
498 const storages::postgres::Query query = PolicyCheckerType::GetQuery();
499 return fmt::format("{} {} {}", query.GetStatementView(), GetWhereClause(), GetOrderByClause());
500}
501
502template <typename PostgreCachePolicy>
503storages::postgres::Query PostgreCache<PostgreCachePolicy>::GetDeltaQuery() {
504 if constexpr (kIncrementalUpdates) {
505 const storages::postgres::Query query = PolicyCheckerType::GetQuery();
506 return storages::postgres::Query{
507 fmt::format("{} {} {}", query.GetStatementView(), GetDeltaWhereClause(), GetOrderByClause()),
509 };
510 } else {
511 return GetAllQuery();
512 }
513}
514
515template <typename PostgreCachePolicy>
516std::chrono::milliseconds PostgreCache<PostgreCachePolicy>::ParseCorrection(const ComponentConfig& config) {
517 static constexpr std::string_view kUpdateCorrection = "update-correction";
518 if (pg_cache::detail::HasCustomUpdated<PostgreCachePolicy> ||
519 this->GetAllowedUpdateTypes() == cache::AllowedUpdateTypes::kOnlyFull)
520 {
521 return config[kUpdateCorrection].As<std::chrono::milliseconds>(0);
522 } else {
523 return config[kUpdateCorrection].As<std::chrono::milliseconds>();
524 }
525}
526
527template <typename PostgreCachePolicy>
528typename PostgreCache<PostgreCachePolicy>::UpdatedFieldType PostgreCache<PostgreCachePolicy>::GetLastUpdated(
529 [[maybe_unused]] std::chrono::system_clock::time_point last_update,
530 const DataType& cache
531) const {
532 if constexpr (pg_cache::detail::HasCustomUpdated<PostgreCachePolicy>) {
533 return PostgreCachePolicy::GetLastKnownUpdated(cache);
534 } else {
535 return UpdatedFieldType{last_update - correction_};
536 }
537}
538
539template <typename PostgreCachePolicy>
540void PostgreCache<PostgreCachePolicy>::Update(
541 cache::UpdateType type,
542 const std::chrono::system_clock::time_point& last_update,
543 const std::chrono::system_clock::time_point& /*now*/,
544 cache::UpdateStatisticsScope& stats_scope
545) {
546 namespace pg = storages::postgres;
547 if constexpr (!kIncrementalUpdates) {
549 }
550 const auto query = (type == cache::UpdateType::kFull) ? GetAllQuery() : GetDeltaQuery();
551 const std::chrono::milliseconds
552 timeout = (type == cache::UpdateType::kFull) ? full_update_timeout_ : incremental_update_timeout_;
553
554 // COPY current cached data
555 auto scope = tracing::Span::CurrentSpan().CreateScopeTime(std::string{pg_cache::detail::kCopyStage});
556 auto data_cache = GetDataSnapshot(type, scope);
557 [[maybe_unused]] const auto old_size = data_cache->size();
558
559 scope.Reset(std::string{pg_cache::detail::kFetchStage});
560
561 size_t changes = 0;
562 // Iterate clusters
563 for (auto& cluster : clusters_) {
564 if (chunk_size_ > 0) {
565 auto trx = cluster->Begin(
566 kClusterHostTypeFlags,
568 pg::CommandControl{timeout, pg_cache::detail::kStatementTimeoutOff}
569 );
570 auto portal = trx.MakePortal(query, GetLastUpdated(last_update, *data_cache));
571 while (portal) {
572 scope.Reset(std::string{pg_cache::detail::kFetchStage});
573 auto res = portal.Fetch(chunk_size_);
574 stats_scope.IncreaseDocumentsReadCount(res.Size());
575
576 scope.Reset(std::string{pg_cache::detail::kParseStage});
577 CacheResults(res, data_cache, stats_scope, scope);
578 changes += res.Size();
579 if (sleep_between_chunks_.count() > 0) {
580 engine::InterruptibleSleepFor(sleep_between_chunks_);
581 }
582 }
583 trx.Commit();
584 } else {
585 const bool has_parameter = query.GetStatementView().find('$') != std::string::npos;
586 auto res =
587 has_parameter
588 ? cluster->Execute(
589 kClusterHostTypeFlags,
590 pg::CommandControl{timeout, pg_cache::detail::kStatementTimeoutOff},
591 query,
592 GetLastUpdated(last_update, *data_cache)
593 )
594 : cluster->Execute(
595 kClusterHostTypeFlags,
596 pg::CommandControl{timeout, pg_cache::detail::kStatementTimeoutOff},
597 query
598 );
599 stats_scope.IncreaseDocumentsReadCount(res.Size());
600
601 scope.Reset(std::string{pg_cache::detail::kParseStage});
602 CacheResults(res, data_cache, stats_scope, scope);
603 changes += res.Size();
604 }
605 }
606
607 scope.Reset();
608
609 if constexpr (pg_cache::detail::kIsContainerCopiedByElement<DataType>) {
610 if (old_size > 0) {
611 const auto elapsed_copy = scope.ElapsedTotal(std::string{pg_cache::detail::kCopyStage});
612 if (elapsed_copy > pg_cache::detail::kCpuRelaxThreshold) {
613 cpu_relax_iterations_copy_ = static_cast<
614 std::size_t>(static_cast<double>(old_size) / (elapsed_copy / pg_cache::detail::kCpuRelaxInterval));
615 LOG_TRACE()
616 << "Elapsed time for copying " << kName << " " << elapsed_copy.count() << " for " << changes
617 << " data items is over threshold. Will relax CPU every " << cpu_relax_iterations_parse_
618 << " iterations";
619 }
620 }
621 }
622
623 if (changes > 0) {
624 const auto elapsed_parse = scope.ElapsedTotal(std::string{pg_cache::detail::kParseStage});
625 if (elapsed_parse > pg_cache::detail::kCpuRelaxThreshold) {
626 cpu_relax_iterations_parse_ = static_cast<
627 std::size_t>(static_cast<double>(changes) / (elapsed_parse / pg_cache::detail::kCpuRelaxInterval));
628 LOG_TRACE()
629 << "Elapsed time for parsing " << kName << " " << elapsed_parse.count() << " for " << changes
630 << " data items is over threshold. Will relax CPU every " << cpu_relax_iterations_parse_
631 << " iterations";
632 }
633 }
634 if (changes > 0 || type == cache::UpdateType::kFull) {
635 // Set current cache
636 pg_cache::detail::OnWritesDone(*data_cache);
637 stats_scope.Finish(data_cache->size());
638 this->Set(std::move(data_cache));
639 } else {
640 stats_scope.FinishNoChanges();
641 }
642}
643
644template <typename PostgreCachePolicy>
645bool PostgreCache<PostgreCachePolicy>::MayReturnNull() const {
646 return pg_cache::detail::MayReturnNull<PolicyType>();
647}
648
649template <typename PostgreCachePolicy>
650void PostgreCache<PostgreCachePolicy>::CacheResults(
651 storages::postgres::ResultSet res,
652 CachedData& data_cache,
653 cache::UpdateStatisticsScope& stats_scope,
654 tracing::ScopeTime& scope
655) {
656 auto values = res.AsSetOf<RawValueType>(pg_cache::detail::GetTag<PostgreCachePolicy>());
657 utils::CpuRelax relax{cpu_relax_iterations_parse_, &scope};
658 for (auto p = values.begin(); p != values.end(); ++p) {
659 relax.Relax();
660 try {
661 using pg_cache::detail::CacheInsertOrAssign;
662 CacheInsertOrAssign(
663 *data_cache,
664 pg_cache::detail::ExtractValue<PostgreCachePolicy>(*p),
665 PostgreCachePolicy::kKeyMember
666 );
667 } catch (const std::exception& e) {
669 LOG_ERROR()
670 << "Error parsing data row in cache '" << kName << "' to '" << compiler::GetTypeName<ValueType>()
671 << "': " << e.what();
672 }
673 }
674}
675
676template <typename PostgreCachePolicy>
677typename PostgreCache<PostgreCachePolicy>::CachedData PostgreCache<
678 PostgreCachePolicy>::GetDataSnapshot(cache::UpdateType type, tracing::ScopeTime& scope) {
680 auto data = this->Get();
681 if (data) {
682 return pg_cache::detail::CopyContainer(*data, cpu_relax_iterations_copy_, scope);
683 }
684 }
685 return std::make_unique<DataType>();
686}
687
688namespace impl {
689
690std::string GetPostgreCacheSchema();
691
692} // namespace impl
693
694template <typename PostgreCachePolicy>
695yaml_config::Schema PostgreCache<PostgreCachePolicy>::GetStaticConfigSchema() {
696 using ParentType = typename pg_cache::detail::PolicyChecker<PostgreCachePolicy>::BaseType;
697 return yaml_config::MergeSchemas<ParentType>(impl::GetPostgreCacheSchema());
698}
699
700} // namespace components
701
702namespace utils::impl::projected_set {
703
704template <typename Set, typename Value, typename KeyMember>
705void CacheInsertOrAssign(Set& set, Value&& value, const KeyMember& /*key_member*/) {
706 DoInsert(set, std::forward<Value>(value));
707}
708
709} // namespace utils::impl::projected_set
710
711USERVER_NAMESPACE_END