userver: userver/cache/base_postgres_cache.hpp Source File
Loading...
Searching...
No Matches
base_postgres_cache.hpp
Go to the documentation of this file.
1#pragma once
2
3/// @file userver/cache/base_postgres_cache.hpp
4/// @brief @copybrief components::PostgreCache
5
6#include <userver/cache/base_postgres_cache_fwd.hpp>
7
8#include <chrono>
9#include <map>
10#include <string_view>
11#include <type_traits>
12#include <unordered_map>
13
14#include <fmt/format.h>
15
16#include <userver/cache/cache_statistics.hpp>
17#include <userver/cache/caching_component_base.hpp>
18#include <userver/components/component_config.hpp>
19#include <userver/components/component_context.hpp>
20
21#include <userver/storages/postgres/cluster.hpp>
22#include <userver/storages/postgres/component.hpp>
23#include <userver/storages/postgres/io/chrono.hpp>
24
25#include <userver/compiler/demangle.hpp>
26#include <userver/engine/sleep.hpp>
27#include <userver/logging/log.hpp>
28#include <userver/tracing/span.hpp>
29#include <userver/utils/assert.hpp>
30#include <userver/utils/cpu_relax.hpp>
31#include <userver/utils/meta.hpp>
32#include <userver/utils/void_t.hpp>
33#include <userver/yaml_config/merge_schemas.hpp>
34
35USERVER_NAMESPACE_BEGIN
36
37namespace components {
38
39/// @page pg_cache Caching Component for PostgreSQL
40///
41/// A typical components::PostgreCache usage consists of trait definition:
42///
43/// @snippet postgresql/src/cache/postgres_cache_test.cpp Pg Cache Policy Trivial
44///
45/// and registration of the component in components::ComponentList:
46///
47/// @snippet postgresql/src/cache/postgres_cache_test.cpp Pg Cache Trivial Usage
48///
49/// See @ref scripts/docs/en/userver/caches.md for introduction into caches.
50///
51///
52/// @section pg_cc_configuration Configuration
53///
54/// components::PostgreCache static configuration file should have a PostgreSQL
55/// component name specified in `pgcomponent` configuration parameter.
56///
57/// Optionally the operation timeouts for cache loading can be specified.
58///
59/// For avoiding "memory leaks", see the respective section
60/// in @ref components::CachingComponentBase.
61///
62/// @include{doc} scripts/docs/en/components_schema/postgresql/src/cache/base_postgres_cache.md
63///
64/// Options inherited from @ref components::CachingComponentBase :
65/// @include{doc} scripts/docs/en/components_schema/core/src/cache/caching_component_base.md
66///
67/// Options inherited from @ref components::ComponentBase :
68/// @include{doc} scripts/docs/en/components_schema/core/src/components/impl/component_base.md
69///
70/// @section pg_cc_cache_policy Cache policy
71///
72/// Cache policy is the template argument of components::PostgreCache component.
73/// Please see the following code snippet for documentation.
74///
75/// @snippet cache/postgres_cache_test.cpp Pg Cache Policy Example
76///
77/// The query can be a std::string. But due to non-guaranteed order of static
78/// data members initialization, std::string should be returned from a static
79/// member function, please see the following code snippet.
80///
81/// @snippet cache/postgres_cache_test.cpp Pg Cache Policy GetQuery Example
82///
83/// Policy may have static function GetLastKnownUpdated. It should be used
84/// when new entries from database are taken via revision, identifier, or
85/// anything else, but not timestamp of the last update.
86/// If this function is supplied, new entries are taken from db with condition
87/// 'WHERE kUpdatedField > GetLastKnownUpdated(cache_container)'.
88/// Otherwise, condition is
89/// 'WHERE kUpdatedField > last_update - correction_'.
90/// See the following code snippet for an example of usage
91///
92/// @snippet cache/postgres_cache_test.cpp Pg Cache Policy Custom Updated Example
93///
94/// Cache can also store only subset of data. For example for the database that is is defined in the following way:
95///
96/// @include samples/postgres_cache_order_by/schemas/postgresql/key_value.sql
97///
98/// it is possible to create a cache that stores only the latest `value`:
99///
100/// @snippet samples/postgres_cache_order_by/main.cpp Last pg cache
101///
102/// In case one provides a custom CacheContainer within Policy, it is notified
103/// of Update completion via its public member function OnWritesDone, if any.
104/// Custom CacheContainer must provide size method and insert_or_assign method
105/// similar to std::unordered_map's one or CacheInsertOrAssign function similar
106/// to one defined in namespace utils::impl::projected_set (i.e. used for
107/// utils::ProjectedUnorderedSet).
108/// See the following code snippet for an example of usage:
109///
110/// @snippet cache/postgres_cache_test.cpp Pg Cache Policy Custom Container With Write Notification Example
111///
112/// @section pg_cc_forward_declaration Forward Declaration
113///
114/// To forward declare a cache you can forward declare a trait and
115/// include userver/cache/base_postgres_cache_fwd.hpp header. It is also useful to
116/// forward declare the cache value type.
117///
118/// @snippet cache/postgres_cache_test_fwd.hpp Pg Cache Fwd Example
119///
120/// ----------
121///
122/// @htmlonly <div class="bottom-nav"> @endhtmlonly
123/// ⇦ @ref scripts/docs/en/userver/cache_dumps.md | @ref scripts/docs/en/userver/lru_cache.md ⇨
124/// @htmlonly </div> @endhtmlonly
125
126namespace pg_cache::detail {
127
128template <typename T>
129using ValueType = typename T::ValueType;
130template <typename T>
131concept HasValueType = requires { typename T::ValueType; };
132
133template <typename T>
134struct RawValueTypeImpl : std::type_identity<ValueType<T>> {};
135
136template <typename T>
137concept HasRawValueType = requires { typename T::RawValueType; };
138
139template <HasRawValueType T>
140struct RawValueTypeImpl<T> : std::type_identity<typename T::RawValueType> {};
141
142template <typename T>
143using RawValueType = typename RawValueTypeImpl<T>::type;
144
145template <typename PostgreCachePolicy>
146auto ExtractValue(RawValueType<PostgreCachePolicy>&& raw) {
147 if constexpr (HasRawValueType<PostgreCachePolicy>) {
148 return Convert(std::move(raw), formats::parse::To<ValueType<PostgreCachePolicy>>());
149 } else {
150 return std::move(raw);
151 }
152}
153
154// Component name in policy
155template <typename T>
156concept HasName = requires {
157 requires !std::string_view { T::kName }
158 .empty();
159};
160
161// Component query in policy
162template <typename T>
163concept HasQuery = requires { T::kQuery; };
164
165// Component GetQuery in policy
166template <typename T>
167concept HasGetQuery = requires { T::GetQuery(); };
168
169// Component kWhere in policy
170template <typename T>
171concept HasWhere = requires { T::kWhere; };
172
173// Component kOrderBy in policy
174template <typename T>
175concept HasOrderBy = requires { T::kOrderBy; };
176
177// Component kTag in policy
178template <typename T>
179concept HasTag = requires { T::kTag; };
180
181template <typename PostgreCachePolicy>
182auto GetTag() {
183 if constexpr (HasTag<PostgreCachePolicy>) {
184 return PostgreCachePolicy::kTag;
185 } else {
186 return storages::postgres::kRowTag;
187 }
188}
189
190// Update field
191template <typename T>
192concept HasUpdatedField = requires { T::kUpdatedField; };
193
194template <typename T>
195concept WantIncrementalUpdates = requires {
196 {
197 std::integral_constant<bool, !std::string_view{T::kUpdatedField}.empty()>{}
198 } -> std::same_as<std::true_type>;
199};
200
201// Key member in policy
202template <typename T>
203concept HasKeyMember = requires { T::kKeyMember; } && std::invocable<decltype(T::kKeyMember), ValueType<T>>;
204
205template <typename T>
206using KeyMemberType = std::decay_t<std::invoke_result_t<decltype(T::kKeyMember), ValueType<T>>>;
207
208// size method in custom container in policy
209template <typename T>
210concept HasSizeMethod = requires(T t) {
211 {
212 t.size()
213 } -> std::convertible_to<std::size_t>;
214};
215
216// insert_or_assign method in custom container in policy
217template <typename T>
218concept HasInsertOrAssignMethod = requires(typename T::CacheContainer& c, KeyMemberType<T> key, ValueType<T> val) {
219 c.insert_or_assign(std::move(key), std::move(val));
220};
221
222// CacheInsertOrAssign function for custom container in policy
223template <typename T>
224concept HasCacheInsertOrAssignFunction =
225 requires(typename T::CacheContainer& c, ValueType<T> val, KeyMemberType<T> key) {
226 CacheInsertOrAssign(c, std::move(val), std::move(key));
227 };
228
229// Data container for cache
230template <typename T, typename = USERVER_NAMESPACE::utils::void_t<>>
231struct DataCacheContainer {
232 static_assert(
233 meta::IsStdHashable<KeyMemberType<T>>,
234 "With default CacheContainer, key type must be std::hash-able"
235 );
236
237 using type = std::unordered_map<KeyMemberType<T>, ValueType<T>>;
238};
239
240template <typename T>
241struct DataCacheContainer<T, USERVER_NAMESPACE::utils::void_t<typename T::CacheContainer>> {
242 static_assert(HasSizeMethod<typename T::CacheContainer>, "Custom CacheContainer must provide `size` method");
243 static_assert(
244 HasInsertOrAssignMethod<T> || HasCacheInsertOrAssignFunction<T>,
245 "Custom CacheContainer must provide `insert_or_assign` method similar to std::unordered_map's "
246 "one or CacheInsertOrAssign function"
247 );
248
249 using type = typename T::CacheContainer;
250};
251
252template <typename T>
253using DataCacheContainerType = typename DataCacheContainer<T>::type;
254
255// We have to whitelist container types, for which we perform by-element
256// copying, because it's not correct for certain custom containers.
257template <typename T>
258inline constexpr bool kIsContainerCopiedByElement =
259 meta::kIsInstantiationOf<std::unordered_map, T> || meta::kIsInstantiationOf<std::map, T>;
260
261template <typename T>
262std::unique_ptr<T> CopyContainer(
263 const T& container,
264 [[maybe_unused]] std::size_t cpu_relax_iterations,
265 tracing::ScopeTime& scope
266) {
267 if constexpr (kIsContainerCopiedByElement<T>) {
268 auto copy = std::make_unique<T>();
269 if constexpr (meta::kIsReservable<T>) {
270 copy->reserve(container.size());
271 }
272
273 utils::CpuRelax relax{cpu_relax_iterations, &scope};
274 for (const auto& kv : container) {
275 relax.Relax();
276 copy->insert(kv);
277 }
278 return copy;
279 } else {
280 return std::make_unique<T>(container);
281 }
282}
283
284template <typename Container, typename Value, typename KeyMember, typename... Args>
285void CacheInsertOrAssign(Container& container, Value&& value, const KeyMember& key_member, Args&&... /*args*/) {
286 // Args are only used to de-prioritize this default overload.
287 static_assert(sizeof...(Args) == 0);
288 // Copy 'key' to avoid aliasing issues in 'insert_or_assign'.
289 auto key = std::invoke(key_member, value);
290 container.insert_or_assign(std::move(key), std::forward<Value>(value));
291}
292
293template <typename T>
294void OnWritesDone(T& container) {
295 if constexpr (requires(T& c) { c.OnWritesDone(); }) {
296 container.OnWritesDone();
297 }
298}
299
300template <typename T>
301concept HasCustomUpdated = requires(const DataCacheContainerType<T>& cache) { T::GetLastKnownUpdated(cache); };
302
303template <typename T>
304struct UpdatedFieldTypeImpl : std::type_identity<storages::postgres::TimePointTz> {};
305
306template <typename T>
307concept HasUpdatedFieldType = requires { typename T::UpdatedFieldType; };
308
309template <HasUpdatedFieldType T>
310struct UpdatedFieldTypeImpl<T> : std::type_identity<typename T::UpdatedFieldType> {};
311
312template <typename T>
313using UpdatedFieldType = typename UpdatedFieldTypeImpl<T>::type;
314
315template <typename T>
316constexpr bool CheckUpdatedFieldType() {
317 if constexpr (HasUpdatedFieldType<T>) {
318#if USERVER_POSTGRES_ENABLE_LEGACY_TIMESTAMP
319 static_assert(
320 std::is_same_v<typename T::UpdatedFieldType, storages::postgres::TimePointTz> ||
321 std::is_same_v<typename T::UpdatedFieldType, storages::postgres::TimePointWithoutTz> ||
322 std::is_same_v<typename T::UpdatedFieldType, storages::postgres::TimePoint> || HasCustomUpdated<T>,
323 "Invalid UpdatedFieldType, must be either TimePointTz or "
324 "TimePointWithoutTz"
325 "or (legacy) system_clock::time_point"
326 );
327#else
328 static_assert(
329 std::is_same_v<typename T::UpdatedFieldType, storages::postgres::TimePointTz> ||
330 std::is_same_v<typename T::UpdatedFieldType, storages::postgres::TimePointWithoutTz> ||
331 HasCustomUpdated<T>,
332 "Invalid UpdatedFieldType, must be either TimePointTz or "
333 "TimePointWithoutTz"
334 );
335#endif
336 } else {
337 static_assert(
338 !WantIncrementalUpdates<T>,
339 "UpdatedFieldType must be explicitly specified when using "
340 "incremental updates"
341 );
342 }
343 return true;
344}
345
346// Cluster host type policy
347template <typename T>
348constexpr storages::postgres::ClusterHostTypeFlags ClusterHostType() {
349 if constexpr (requires { T::kClusterHostType; }) {
350 return T::kClusterHostType;
351 } else {
353 }
354}
355
356// May return null policy
357template <typename T>
358constexpr bool MayReturnNull() {
359 if constexpr (requires { T::kMayReturnNull; }) {
360 return T::kMayReturnNull;
361 } else {
362 return false;
363 }
364}
365
366template <typename PostgreCachePolicy>
367struct PolicyChecker {
368 // Static assertions for cache traits
369 static_assert(HasName<PostgreCachePolicy>, "The PosgreSQL cache policy must contain a static member `kName`");
370 static_assert(HasValueType<PostgreCachePolicy>, "The PosgreSQL cache policy must define a type alias `ValueType`");
371 static_assert(
372 HasKeyMember<PostgreCachePolicy>,
373 "The PostgreSQL cache policy must contain a static member `kKeyMember` "
374 "with a pointer to a data or a function member with the object's key"
375 );
376 static_assert(
377 HasQuery<PostgreCachePolicy> || HasGetQuery<PostgreCachePolicy>,
378 "The PosgreSQL cache policy must contain a static data member "
379 "`kQuery` with a select statement or a static member function "
380 "`GetQuery` returning the query"
381 );
382 static_assert(
383 !(HasQuery<PostgreCachePolicy> && HasGetQuery<PostgreCachePolicy>),
384 "The PosgreSQL cache policy must define `kQuery` or "
385 "`GetQuery`, not both"
386 );
387 static_assert(
388 HasUpdatedField<PostgreCachePolicy>,
389 "The PosgreSQL cache policy must contain a static member "
390 "`kUpdatedField`. If you don't want to use incremental updates, "
391 "please set its value to `nullptr`"
392 );
393 static_assert(CheckUpdatedFieldType<PostgreCachePolicy>());
394
395 static_assert(
396 ClusterHostType<PostgreCachePolicy>() & storages::postgres::kClusterHostRolesMask,
397 "Cluster host role must be specified for caching component, "
398 "please be more specific"
399 );
400
401 static storages::postgres::Query GetQuery() {
402 if constexpr (HasGetQuery<PostgreCachePolicy>) {
403 return PostgreCachePolicy::GetQuery();
404 } else {
405 return PostgreCachePolicy::kQuery;
406 }
407 }
408
409 using BaseType = CachingComponentBase<DataCacheContainerType<PostgreCachePolicy>>;
410};
411
412inline constexpr std::chrono::minutes kDefaultFullUpdateTimeout{1};
413inline constexpr std::chrono::seconds kDefaultIncrementalUpdateTimeout{1};
414inline constexpr std::chrono::milliseconds kStatementTimeoutOff{0};
415inline constexpr std::chrono::milliseconds kCpuRelaxThreshold{10};
416inline constexpr std::chrono::milliseconds kCpuRelaxInterval{2};
417
418inline constexpr std::string_view kCopyStage = "copy_data";
419inline constexpr std::string_view kFetchStage = "fetch";
420inline constexpr std::string_view kParseStage = "parse";
421
422inline constexpr std::size_t kDefaultChunkSize = 1000;
423inline constexpr std::chrono::milliseconds kDefaultSleepBetweenChunks{0};
424} // namespace pg_cache::detail
425
426/// @ingroup userver_components
427///
428/// @brief Caching component for PostgreSQL. See @ref pg_cache.
429///
430/// @see @ref pg_cache, @ref scripts/docs/en/userver/caches.md
431template <typename PostgreCachePolicy>
432class PostgreCache final : public pg_cache::detail::PolicyChecker<PostgreCachePolicy>::BaseType {
433public:
434 // Type aliases
435 using PolicyType = PostgreCachePolicy;
436 using ValueType = pg_cache::detail::ValueType<PolicyType>;
437 using RawValueType = pg_cache::detail::RawValueType<PolicyType>;
438 using DataType = pg_cache::detail::DataCacheContainerType<PolicyType>;
439 using PolicyCheckerType = pg_cache::detail::PolicyChecker<PostgreCachePolicy>;
440 using UpdatedFieldType = pg_cache::detail::UpdatedFieldType<PostgreCachePolicy>;
441 using BaseType = typename PolicyCheckerType::BaseType;
442
443 // Calculated constants
444 constexpr static bool kIncrementalUpdates = pg_cache::detail::WantIncrementalUpdates<PolicyType>;
445 constexpr static auto kClusterHostTypeFlags = pg_cache::detail::ClusterHostType<PolicyType>();
446 constexpr static auto kName = PolicyType::kName;
447
448 PostgreCache(const ComponentConfig&, const ComponentContext&);
449
450 static yaml_config::Schema GetStaticConfigSchema();
451
452private:
453 using CachedData = std::unique_ptr<DataType>;
454
455 UpdatedFieldType GetLastUpdated(std::chrono::system_clock::time_point last_update, const DataType& cache) const;
456
457 void Update(
458 cache::UpdateType type,
459 const std::chrono::system_clock::time_point& last_update,
460 const std::chrono::system_clock::time_point& now,
461 cache::UpdateStatisticsScope& stats_scope
462 ) override;
463
464 bool MayReturnNull() const override;
465
466 CachedData GetDataSnapshot(cache::UpdateType type, tracing::ScopeTime& scope);
467 void CacheResults(
468 storages::postgres::ResultSet res,
469 CachedData& data_cache,
470 cache::UpdateStatisticsScope& stats_scope,
471 tracing::ScopeTime& scope
472 );
473
474 static storages::postgres::Query GetAllQuery();
475 static storages::postgres::Query GetDeltaQuery();
476 static std::string GetWhereClause();
477 static std::string GetDeltaWhereClause();
478 static std::string GetOrderByClause();
479
480 std::chrono::milliseconds ParseCorrection(const ComponentConfig& config);
481
482 std::vector<storages::postgres::ClusterPtr> clusters_;
483
484 const std::chrono::system_clock::duration correction_;
485 const std::chrono::milliseconds full_update_timeout_;
486 const std::chrono::milliseconds incremental_update_timeout_;
487 const std::size_t chunk_size_;
488 const std::chrono::milliseconds sleep_between_chunks_;
489 std::size_t cpu_relax_iterations_parse_{0};
490 std::size_t cpu_relax_iterations_copy_{0};
491};
492
493template <typename PostgreCachePolicy>
494inline constexpr bool kHasValidate<PostgreCache<PostgreCachePolicy>> = true;
495
496template <typename PostgreCachePolicy>
497PostgreCache<PostgreCachePolicy>::PostgreCache(const ComponentConfig& config, const ComponentContext& context)
498 : BaseType{config, context},
499 correction_{ParseCorrection(config)},
500 full_update_timeout_{
501 config["full-update-op-timeout"].As<std::chrono::milliseconds>(pg_cache::detail::kDefaultFullUpdateTimeout)
502 },
503 incremental_update_timeout_{
504 config["incremental-update-op-timeout"]
505 .As<std::chrono::milliseconds>(pg_cache::detail::kDefaultIncrementalUpdateTimeout)
506 },
507 chunk_size_{config["chunk-size"].As<size_t>(pg_cache::detail::kDefaultChunkSize)},
508 sleep_between_chunks_{
509 config["sleep-between-chunks"].As<std::chrono::milliseconds>(pg_cache::detail::kDefaultSleepBetweenChunks)
510 }
511{
513 !chunk_size_ || storages::postgres::Portal::IsSupportedByDriver(),
514 "Either set 'chunk-size' to 0, or enable PostgreSQL portals by building "
515 "the framework with CMake option USERVER_FEATURE_PATCH_LIBPQ set to ON."
516 );
517
518 if (this->GetAllowedUpdateTypes() == cache::AllowedUpdateTypes::kFullAndIncremental && !kIncrementalUpdates) {
519 throw std::logic_error(
520 "Incremental update support is requested in config but no update field "
521 "name is specified in traits of '" +
522 config.Name() + "' cache"
523 );
524 }
525 if (correction_.count() < 0) {
526 throw std::logic_error(
527 "Refusing to set forward (negative) update correction requested in "
528 "config for '" +
529 config.Name() + "' cache"
530 );
531 }
532
533 const auto pg_alias = config["pgcomponent"].As<std::string>("");
534 if (pg_alias.empty()) {
535 throw storages::postgres::InvalidConfig{"No `pgcomponent` entry in configuration"};
536 }
537 auto& pg_cluster_comp = context.FindComponent<components::Postgres>(pg_alias);
538 const auto shard_count = pg_cluster_comp.GetShardCount();
539 clusters_.resize(shard_count);
540 for (size_t i = 0; i < shard_count; ++i) {
541 clusters_[i] = pg_cluster_comp.GetClusterForShard(i);
542 }
543
544 LOG_INFO()
545 << "Cache " << kName << " full update query `" << GetAllQuery().GetStatementView()
546 << "` incremental update query `" << GetDeltaQuery().GetStatementView() << "`";
547}
548
549template <typename PostgreCachePolicy>
550std::string PostgreCache<PostgreCachePolicy>::GetWhereClause() {
551 if constexpr (pg_cache::detail::HasWhere<PostgreCachePolicy>) {
552 return fmt::format(FMT_COMPILE("where {}"), PostgreCachePolicy::kWhere);
553 } else {
554 return "";
555 }
556}
557
558template <typename PostgreCachePolicy>
559std::string PostgreCache<PostgreCachePolicy>::GetDeltaWhereClause() {
560 if constexpr (pg_cache::detail::HasWhere<PostgreCachePolicy>) {
561 return fmt::format(
562 FMT_COMPILE("where ({}) and {} >= $1"),
563 PostgreCachePolicy::kWhere,
564 PostgreCachePolicy::kUpdatedField
565 );
566 } else {
567 return fmt::format(FMT_COMPILE("where {} >= $1"), PostgreCachePolicy::kUpdatedField);
568 }
569}
570
571template <typename PostgreCachePolicy>
572std::string PostgreCache<PostgreCachePolicy>::GetOrderByClause() {
573 if constexpr (pg_cache::detail::HasOrderBy<PostgreCachePolicy>) {
574 return fmt::format(FMT_COMPILE("order by {}"), PostgreCachePolicy::kOrderBy);
575 } else {
576 return "";
577 }
578}
579
580template <typename PostgreCachePolicy>
581storages::postgres::Query PostgreCache<PostgreCachePolicy>::GetAllQuery() {
582 const storages::postgres::Query query = PolicyCheckerType::GetQuery();
583 return fmt::format("{} {} {}", query.GetStatementView(), GetWhereClause(), GetOrderByClause());
584}
585
586template <typename PostgreCachePolicy>
587storages::postgres::Query PostgreCache<PostgreCachePolicy>::GetDeltaQuery() {
588 if constexpr (kIncrementalUpdates) {
589 const storages::postgres::Query query = PolicyCheckerType::GetQuery();
590 return storages::postgres::Query{
591 fmt::format("{} {} {}", query.GetStatementView(), GetDeltaWhereClause(), GetOrderByClause()),
593 };
594 } else {
595 return GetAllQuery();
596 }
597}
598
599template <typename PostgreCachePolicy>
600std::chrono::milliseconds PostgreCache<PostgreCachePolicy>::ParseCorrection(const ComponentConfig& config) {
601 static constexpr std::string_view kUpdateCorrection = "update-correction";
602 if (pg_cache::detail::HasCustomUpdated<PostgreCachePolicy> ||
603 this->GetAllowedUpdateTypes() == cache::AllowedUpdateTypes::kOnlyFull)
604 {
605 return config[kUpdateCorrection].As<std::chrono::milliseconds>(0);
606 } else {
607 return config[kUpdateCorrection].As<std::chrono::milliseconds>();
608 }
609}
610
611template <typename PostgreCachePolicy>
612typename PostgreCache<PostgreCachePolicy>::UpdatedFieldType PostgreCache<PostgreCachePolicy>::GetLastUpdated(
613 [[maybe_unused]] std::chrono::system_clock::time_point last_update,
614 const DataType& cache
615) const {
616 if constexpr (pg_cache::detail::HasCustomUpdated<PostgreCachePolicy>) {
617 return PostgreCachePolicy::GetLastKnownUpdated(cache);
618 } else {
619 return UpdatedFieldType{last_update - correction_};
620 }
621}
622
623template <typename PostgreCachePolicy>
624void PostgreCache<PostgreCachePolicy>::Update(
625 cache::UpdateType type,
626 const std::chrono::system_clock::time_point& last_update,
627 const std::chrono::system_clock::time_point& /*now*/,
628 cache::UpdateStatisticsScope& stats_scope
629) {
630 namespace pg = storages::postgres;
631 if constexpr (!kIncrementalUpdates) {
633 }
634 const auto query = (type == cache::UpdateType::kFull) ? GetAllQuery() : GetDeltaQuery();
635 const std::chrono::milliseconds
636 timeout = (type == cache::UpdateType::kFull) ? full_update_timeout_ : incremental_update_timeout_;
637
638 // COPY current cached data
639 auto scope = tracing::Span::CurrentSpan().CreateScopeTime(std::string{pg_cache::detail::kCopyStage});
640 auto data_cache = GetDataSnapshot(type, scope);
641 [[maybe_unused]] const auto old_size = data_cache->size();
642
643 scope.Reset(std::string{pg_cache::detail::kFetchStage});
644
645 size_t changes = 0;
646 // Iterate clusters
647 for (auto& cluster : clusters_) {
648 if (chunk_size_ > 0) {
649 auto trx = cluster->Begin(
650 kClusterHostTypeFlags,
652 pg::CommandControl{timeout, pg_cache::detail::kStatementTimeoutOff}
653 );
654 auto portal = trx.MakePortal(query, GetLastUpdated(last_update, *data_cache));
655 while (portal) {
656 scope.Reset(std::string{pg_cache::detail::kFetchStage});
657 auto res = portal.Fetch(chunk_size_);
658 stats_scope.IncreaseDocumentsReadCount(res.Size());
659
660 scope.Reset(std::string{pg_cache::detail::kParseStage});
661 CacheResults(res, data_cache, stats_scope, scope);
662 changes += res.Size();
663 if (sleep_between_chunks_.count() > 0) {
664 engine::InterruptibleSleepFor(sleep_between_chunks_);
665 }
666 }
667 trx.Commit();
668 } else {
669 const bool has_parameter = query.GetStatementView().find('$') != std::string::npos;
670 auto res =
671 has_parameter
672 ? cluster->Execute(
673 kClusterHostTypeFlags,
674 pg::CommandControl{timeout, pg_cache::detail::kStatementTimeoutOff},
675 query,
676 GetLastUpdated(last_update, *data_cache)
677 )
678 : cluster->Execute(
679 kClusterHostTypeFlags,
680 pg::CommandControl{timeout, pg_cache::detail::kStatementTimeoutOff},
681 query
682 );
683 stats_scope.IncreaseDocumentsReadCount(res.Size());
684
685 scope.Reset(std::string{pg_cache::detail::kParseStage});
686 CacheResults(res, data_cache, stats_scope, scope);
687 changes += res.Size();
688 }
689 }
690
691 scope.Reset();
692
693 if constexpr (pg_cache::detail::kIsContainerCopiedByElement<DataType>) {
694 if (old_size > 0) {
695 const auto elapsed_copy = scope.ElapsedTotal(std::string{pg_cache::detail::kCopyStage});
696 if (elapsed_copy > pg_cache::detail::kCpuRelaxThreshold) {
697 cpu_relax_iterations_copy_ = static_cast<
698 std::size_t>(static_cast<double>(old_size) / (elapsed_copy / pg_cache::detail::kCpuRelaxInterval));
699 LOG_TRACE()
700 << "Elapsed time for copying " << kName << " " << elapsed_copy.count() << " for " << changes
701 << " data items is over threshold. Will relax CPU every " << cpu_relax_iterations_parse_
702 << " iterations";
703 }
704 }
705 }
706
707 if (changes > 0) {
708 const auto elapsed_parse = scope.ElapsedTotal(std::string{pg_cache::detail::kParseStage});
709 if (elapsed_parse > pg_cache::detail::kCpuRelaxThreshold) {
710 cpu_relax_iterations_parse_ = static_cast<
711 std::size_t>(static_cast<double>(changes) / (elapsed_parse / pg_cache::detail::kCpuRelaxInterval));
712 LOG_TRACE()
713 << "Elapsed time for parsing " << kName << " " << elapsed_parse.count() << " for " << changes
714 << " data items is over threshold. Will relax CPU every " << cpu_relax_iterations_parse_
715 << " iterations";
716 }
717 }
718 if (changes > 0 || type == cache::UpdateType::kFull) {
719 // Set current cache
720 pg_cache::detail::OnWritesDone(*data_cache);
721 stats_scope.Finish(data_cache->size());
722 this->Set(std::move(data_cache));
723 } else {
724 stats_scope.FinishNoChanges();
725 }
726}
727
728template <typename PostgreCachePolicy>
729bool PostgreCache<PostgreCachePolicy>::MayReturnNull() const {
730 return pg_cache::detail::MayReturnNull<PolicyType>();
731}
732
733template <typename PostgreCachePolicy>
734void PostgreCache<PostgreCachePolicy>::CacheResults(
735 storages::postgres::ResultSet res,
736 CachedData& data_cache,
737 cache::UpdateStatisticsScope& stats_scope,
738 tracing::ScopeTime& scope
739) {
740 auto values = res.AsSetOf<RawValueType>(pg_cache::detail::GetTag<PostgreCachePolicy>());
741 utils::CpuRelax relax{cpu_relax_iterations_parse_, &scope};
742 for (auto p = values.begin(); p != values.end(); ++p) {
743 relax.Relax();
744 try {
745 using pg_cache::detail::CacheInsertOrAssign;
746 CacheInsertOrAssign(
747 *data_cache,
748 pg_cache::detail::ExtractValue<PostgreCachePolicy>(*p),
749 PostgreCachePolicy::kKeyMember
750 );
751 } catch (const std::exception& e) {
753 LOG_ERROR()
754 << "Error parsing data row in cache '" << kName << "' to '" << compiler::GetTypeName<ValueType>()
755 << "': " << e.what();
756 }
757 }
758}
759
760template <typename PostgreCachePolicy>
761typename PostgreCache<PostgreCachePolicy>::CachedData PostgreCache<
762 PostgreCachePolicy>::GetDataSnapshot(cache::UpdateType type, tracing::ScopeTime& scope) {
764 auto data = this->Get();
765 if (data) {
766 return pg_cache::detail::CopyContainer(*data, cpu_relax_iterations_copy_, scope);
767 }
768 }
769 return std::make_unique<DataType>();
770}
771
772namespace impl {
773
774std::string GetPostgreCacheSchema();
775
776} // namespace impl
777
778template <typename PostgreCachePolicy>
779yaml_config::Schema PostgreCache<PostgreCachePolicy>::GetStaticConfigSchema() {
780 using ParentType = typename pg_cache::detail::PolicyChecker<PostgreCachePolicy>::BaseType;
781 return yaml_config::MergeSchemas<ParentType>(impl::GetPostgreCacheSchema());
782}
783
784} // namespace components
785
786namespace utils::impl::projected_set {
787
788template <typename Set, typename Value, typename KeyMember>
789void CacheInsertOrAssign(Set& set, Value&& value, const KeyMember& /*key_member*/) {
790 DoInsert(set, std::forward<Value>(value));
791}
792
793} // namespace utils::impl::projected_set
794
795USERVER_NAMESPACE_END