userver: userver/cache/base_postgres_cache.hpp Source File
Loading...
Searching...
No Matches
base_postgres_cache.hpp
Go to the documentation of this file.
1#pragma once
2
3/// @file userver/cache/base_postgres_cache.hpp
4/// @brief @copybrief components::PostgreCache
5
6#include <userver/cache/base_postgres_cache_fwd.hpp>
7
8#include <chrono>
9#include <map>
10#include <string_view>
11#include <type_traits>
12#include <unordered_map>
13
14#include <fmt/format.h>
15
16#include <userver/cache/cache_statistics.hpp>
17#include <userver/cache/caching_component_base.hpp>
18#include <userver/components/component_config.hpp>
19#include <userver/components/component_context.hpp>
20
21#include <userver/storages/postgres/cluster.hpp>
22#include <userver/storages/postgres/component.hpp>
23#include <userver/storages/postgres/io/chrono.hpp>
24
25#include <userver/compiler/demangle.hpp>
26#include <userver/engine/sleep.hpp>
27#include <userver/logging/log.hpp>
28#include <userver/tracing/span.hpp>
29#include <userver/utils/assert.hpp>
30#include <userver/utils/cpu_relax.hpp>
31#include <userver/utils/meta.hpp>
32#include <userver/utils/void_t.hpp>
33#include <userver/yaml_config/merge_schemas.hpp>
34
35USERVER_NAMESPACE_BEGIN
36
37namespace components {
38
39// clang-format off
40
41/// @page pg_cache Caching Component for PostgreSQL
42///
43/// A typical components::PostgreCache usage consists of trait definition:
44///
45/// @snippet postgresql/src/cache/postgres_cache_test.cpp Pg Cache Policy Trivial
46///
47/// and registration of the component in components::ComponentList:
48///
49/// @snippet postgresql/src/cache/postgres_cache_test.cpp Pg Cache Trivial Usage
50///
51/// See @ref scripts/docs/en/userver/caches.md for introduction into caches.
52///
53///
54/// @section pg_cc_configuration Configuration
55///
56/// components::PostgreCache static configuration file should have a PostgreSQL
57/// component name specified in `pgcomponent` configuration parameter.
58///
59/// Optionally the operation timeouts for cache loading can be specified.
60///
61/// For avoiding "memory leaks", see the respective section
62/// in @ref components::CachingComponentBase.
63///
64/// Name | Description | Default value
65/// ---- | ----------- | -------------
66/// full-update-op-timeout | timeout for a full update | 1m
67/// incremental-update-op-timeout | timeout for an incremental update | 1s
68/// update-correction | incremental update window adjustment | - (0 for caches with defined GetLastKnownUpdated)
69/// chunk-size | number of rows to request from PostgreSQL via portals, 0 to fetch all rows in one request without portals | 1000
70/// sleep-between-chunks | duration to wait between reading chunks from PostgreSQL | 0ms
71///
72/// @section pg_cc_cache_policy Cache policy
73///
74/// Cache policy is the template argument of components::PostgreCache component.
75/// Please see the following code snippet for documentation.
76///
77/// @snippet cache/postgres_cache_test.cpp Pg Cache Policy Example
78///
79/// The query can be a std::string. But due to non-guaranteed order of static
80/// data members initialization, std::string should be returned from a static
81/// member function, please see the following code snippet.
82///
83/// @snippet cache/postgres_cache_test.cpp Pg Cache Policy GetQuery Example
84///
85/// Policy may have static function GetLastKnownUpdated. It should be used
86/// when new entries from database are taken via revision, identifier, or
87/// anything else, but not timestamp of the last update.
88/// If this function is supplied, new entries are taken from db with condition
89/// 'WHERE kUpdatedField > GetLastKnownUpdated(cache_container)'.
90/// Otherwise, condition is
91/// 'WHERE kUpdatedField > last_update - correction_'.
92/// See the following code snippet for an example of usage
93///
94/// @snippet cache/postgres_cache_test.cpp Pg Cache Policy Custom Updated Example
95///
96/// Cache can also store only subset of data. For example for the database that is is defined in the following way:
97///
98/// @include samples/postgres_cache_order_by/schemas/postgresql/key_value.sql
99///
100/// it is possible to create a cache that stores only the latest `value`:
101///
102/// @snippet samples/postgres_cache_order_by/main.cpp Last pg cache
103///
104/// In case one provides a custom CacheContainer within Policy, it is notified
105/// of Update completion via its public member function OnWritesDone, if any.
106/// Custom CacheContainer must provide size method and insert_or_assign method
107/// similar to std::unordered_map's one or CacheInsertOrAssign function similar
108/// to one defined in namespace utils::impl::projected_set (i.e. used for
109/// utils::ProjectedUnorderedSet).
110/// See the following code snippet for an example of usage:
111///
112/// @snippet cache/postgres_cache_test.cpp Pg Cache Policy Custom Container With Write Notification Example
113///
114/// @section pg_cc_forward_declaration Forward Declaration
115///
116/// To forward declare a cache you can forward declare a trait and
117/// include userver/cache/base_postgres_cache_fwd.hpp header. It is also useful to
118/// forward declare the cache value type.
119///
120/// @snippet cache/postgres_cache_test_fwd.hpp Pg Cache Fwd Example
121///
122/// ----------
123///
124/// @htmlonly <div class="bottom-nav"> @endhtmlonly
125/// ⇦ @ref scripts/docs/en/userver/cache_dumps.md | @ref scripts/docs/en/userver/lru_cache.md ⇨
126/// @htmlonly </div> @endhtmlonly
127
128// clang-format on
129
130namespace pg_cache::detail {
131
132template <typename T>
133using ValueType = typename T::ValueType;
134template <typename T>
135inline constexpr bool kHasValueType = meta::IsDetected<ValueType, T>;
136
137template <typename T>
138using RawValueTypeImpl = typename T::RawValueType;
139template <typename T>
140inline constexpr bool kHasRawValueType = meta::IsDetected<RawValueTypeImpl, T>;
141template <typename T>
142using RawValueType = meta::DetectedOr<ValueType<T>, RawValueTypeImpl, T>;
143
144template <typename PostgreCachePolicy>
145auto ExtractValue(RawValueType<PostgreCachePolicy>&& raw) {
146 if constexpr (kHasRawValueType<PostgreCachePolicy>) {
147 return Convert(std::move(raw), formats::parse::To<ValueType<PostgreCachePolicy>>());
148 } else {
149 return std::move(raw);
150 }
151}
152
153// Component name in policy
154template <typename T>
155using HasNameImpl = std::enable_if_t<!std::string_view{T::kName}.empty()>;
156template <typename T>
157inline constexpr bool kHasName = meta::IsDetected<HasNameImpl, T>;
158
159// Component query in policy
160template <typename T>
161using HasQueryImpl = decltype(T::kQuery);
162template <typename T>
163inline constexpr bool kHasQuery = meta::IsDetected<HasQueryImpl, T>;
164
165// Component GetQuery in policy
166template <typename T>
167using HasGetQueryImpl = decltype(T::GetQuery());
168template <typename T>
169inline constexpr bool kHasGetQuery = meta::IsDetected<HasGetQueryImpl, T>;
170
171// Component kWhere in policy
172template <typename T>
173using HasWhere = decltype(T::kWhere);
174template <typename T>
175inline constexpr bool kHasWhere = meta::IsDetected<HasWhere, T>;
176
177// Component kOrderBy in policy
178template <typename T>
179using HasOrderBy = decltype(T::kOrderBy);
180template <typename T>
181inline constexpr bool kHasOrderBy = meta::IsDetected<HasOrderBy, T>;
182
183// Update field
184template <typename T>
185using HasUpdatedField = decltype(T::kUpdatedField);
186template <typename T>
187inline constexpr bool kHasUpdatedField = meta::IsDetected<HasUpdatedField, T>;
188
189template <typename T>
190using WantIncrementalUpdates = std::enable_if_t<!std::string_view{T::kUpdatedField}.empty()>;
191template <typename T>
192inline constexpr bool kWantIncrementalUpdates = meta::IsDetected<WantIncrementalUpdates, T>;
193
194// Key member in policy
195template <typename T>
196using KeyMemberTypeImpl = std::decay_t<std::invoke_result_t<decltype(T::kKeyMember), ValueType<T>>>;
197template <typename T>
198inline constexpr bool kHasKeyMember = meta::IsDetected<KeyMemberTypeImpl, T>;
199template <typename T>
200using KeyMemberType = meta::DetectedType<KeyMemberTypeImpl, T>;
201
202// size method in custom container in policy
203template <typename T>
204using SizeMethodInvokeResultImpl = decltype(std::declval<T>().size());
205template <typename T>
206inline constexpr bool kHasSizeMethod = meta::IsDetected<SizeMethodInvokeResultImpl, T> &&
207 std::is_convertible_v<SizeMethodInvokeResultImpl<T>, std::size_t>;
208
209// insert_or_assign method in custom container in policy
210template <typename T>
211using InsertOrAssignMethodInvokeResultImpl = decltype(std::declval<typename T::CacheContainer>().insert_or_assign(
212 std::declval<KeyMemberTypeImpl<T>>(),
213 std::declval<ValueType<T>>()
214));
215template <typename T>
216inline constexpr bool kHasInsertOrAssignMethod = meta::IsDetected<InsertOrAssignMethodInvokeResultImpl, T>;
217
218// CacheInsertOrAssign function for custom container in policy
219template <typename T>
220using CacheInsertOrAssignFunctionInvokeResultImpl = decltype(CacheInsertOrAssign(
221 std::declval<typename T::CacheContainer&>(),
222 std::declval<ValueType<T>>(),
223 std::declval<KeyMemberTypeImpl<T>>()
224));
225template <typename T>
226inline constexpr bool kHasCacheInsertOrAssignFunction =
227 meta::IsDetected<CacheInsertOrAssignFunctionInvokeResultImpl, T>;
228
229// Data container for cache
230template <typename T, typename = USERVER_NAMESPACE::utils::void_t<>>
231struct DataCacheContainer {
232 static_assert(
233 meta::kIsStdHashable<KeyMemberType<T>>,
234 "With default CacheContainer, key type must be std::hash-able"
235 );
236
237 using type = std::unordered_map<KeyMemberType<T>, ValueType<T>>;
238};
239
240template <typename T>
241struct DataCacheContainer<T, USERVER_NAMESPACE::utils::void_t<typename T::CacheContainer>> {
242 static_assert(kHasSizeMethod<typename T::CacheContainer>, "Custom CacheContainer must provide `size` method");
243 static_assert(
244 kHasInsertOrAssignMethod<T> || kHasCacheInsertOrAssignFunction<T>,
245 "Custom CacheContainer must provide `insert_or_assign` method similar to std::unordered_map's "
246 "one or CacheInsertOrAssign function"
247 );
248
249 using type = typename T::CacheContainer;
250};
251
252template <typename T>
253using DataCacheContainerType = typename DataCacheContainer<T>::type;
254
255// We have to whitelist container types, for which we perform by-element
256// copying, because it's not correct for certain custom containers.
257template <typename T>
258inline constexpr bool kIsContainerCopiedByElement =
259 meta::kIsInstantiationOf<std::unordered_map, T> || meta::kIsInstantiationOf<std::map, T>;
260
261template <typename T>
262std::unique_ptr<T>
263CopyContainer(const T& container, [[maybe_unused]] std::size_t cpu_relax_iterations, tracing::ScopeTime& scope) {
264 if constexpr (kIsContainerCopiedByElement<T>) {
265 auto copy = std::make_unique<T>();
266 if constexpr (meta::kIsReservable<T>) {
267 copy->reserve(container.size());
268 }
269
270 utils::CpuRelax relax{cpu_relax_iterations, &scope};
271 for (const auto& kv : container) {
272 relax.Relax();
273 copy->insert(kv);
274 }
275 return copy;
276 } else {
277 return std::make_unique<T>(container);
278 }
279}
280
281template <typename Container, typename Value, typename KeyMember, typename... Args>
282void CacheInsertOrAssign(Container& container, Value&& value, const KeyMember& key_member, Args&&... /*args*/) {
283 // Args are only used to de-prioritize this default overload.
284 static_assert(sizeof...(Args) == 0);
285 // Copy 'key' to avoid aliasing issues in 'insert_or_assign'.
286 auto key = std::invoke(key_member, value);
287 container.insert_or_assign(std::move(key), std::forward<Value>(value));
288}
289
290template <typename T>
291using HasOnWritesDoneImpl = decltype(std::declval<T&>().OnWritesDone());
292
293template <typename T>
294void OnWritesDone(T& container) {
295 if constexpr (meta::IsDetected<HasOnWritesDoneImpl, T>) {
296 container.OnWritesDone();
297 }
298}
299
300template <typename T>
301using HasCustomUpdatedImpl = decltype(T::GetLastKnownUpdated(std::declval<DataCacheContainerType<T>>()));
302
303template <typename T>
304inline constexpr bool kHasCustomUpdated = meta::IsDetected<HasCustomUpdatedImpl, T>;
305
306template <typename T>
307using UpdatedFieldTypeImpl = typename T::UpdatedFieldType;
308template <typename T>
309inline constexpr bool kHasUpdatedFieldType = meta::IsDetected<UpdatedFieldTypeImpl, T>;
310template <typename T>
311using UpdatedFieldType = meta::DetectedOr<storages::postgres::TimePointTz, UpdatedFieldTypeImpl, T>;
312
313template <typename T>
314constexpr bool CheckUpdatedFieldType() {
315 if constexpr (kHasUpdatedFieldType<T>) {
316#if USERVER_POSTGRES_ENABLE_LEGACY_TIMESTAMP
317 static_assert(
318 std::is_same_v<typename T::UpdatedFieldType, storages::postgres::TimePointTz> ||
319 std::is_same_v<typename T::UpdatedFieldType, storages::postgres::TimePointWithoutTz> ||
320 std::is_same_v<typename T::UpdatedFieldType, storages::postgres::TimePoint> || kHasCustomUpdated<T>,
321 "Invalid UpdatedFieldType, must be either TimePointTz or "
322 "TimePointWithoutTz"
323 "or (legacy) system_clock::time_point"
324 );
325#else
326 static_assert(
327 std::is_same_v<typename T::UpdatedFieldType, storages::postgres::TimePointTz> ||
328 std::is_same_v<typename T::UpdatedFieldType, storages::postgres::TimePointWithoutTz> ||
329 kHasCustomUpdated<T>,
330 "Invalid UpdatedFieldType, must be either TimePointTz or "
331 "TimePointWithoutTz"
332 );
333#endif
334 } else {
335 static_assert(
336 !kWantIncrementalUpdates<T>,
337 "UpdatedFieldType must be explicitly specified when using "
338 "incremental updates"
339 );
340 }
341 return true;
342}
343
344// Cluster host type policy
345template <typename T>
346using HasClusterHostTypeImpl = decltype(T::kClusterHostType);
347
348template <typename T>
349constexpr storages::postgres::ClusterHostTypeFlags ClusterHostType() {
350 if constexpr (meta::IsDetected<HasClusterHostTypeImpl, T>) {
351 return T::kClusterHostType;
352 } else {
354 }
355}
356
357// May return null policy
358template <typename T>
359using HasMayReturnNull = decltype(T::kMayReturnNull);
360
361template <typename T>
362constexpr bool MayReturnNull() {
363 if constexpr (meta::IsDetected<HasMayReturnNull, T>) {
364 return T::kMayReturnNull;
365 } else {
366 return false;
367 }
368}
369
370template <typename PostgreCachePolicy>
371struct PolicyChecker {
372 // Static assertions for cache traits
373 static_assert(kHasName<PostgreCachePolicy>, "The PosgreSQL cache policy must contain a static member `kName`");
374 static_assert(kHasValueType<PostgreCachePolicy>, "The PosgreSQL cache policy must define a type alias `ValueType`");
375 static_assert(
376 kHasKeyMember<PostgreCachePolicy>,
377 "The PostgreSQL cache policy must contain a static member `kKeyMember` "
378 "with a pointer to a data or a function member with the object's key"
379 );
380 static_assert(
381 kHasQuery<PostgreCachePolicy> || kHasGetQuery<PostgreCachePolicy>,
382 "The PosgreSQL cache policy must contain a static data member "
383 "`kQuery` with a select statement or a static member function "
384 "`GetQuery` returning the query"
385 );
386 static_assert(
387 !(kHasQuery<PostgreCachePolicy> && kHasGetQuery<PostgreCachePolicy>),
388 "The PosgreSQL cache policy must define `kQuery` or "
389 "`GetQuery`, not both"
390 );
391 static_assert(
392 kHasUpdatedField<PostgreCachePolicy>,
393 "The PosgreSQL cache policy must contain a static member "
394 "`kUpdatedField`. If you don't want to use incremental updates, "
395 "please set its value to `nullptr`"
396 );
397 static_assert(CheckUpdatedFieldType<PostgreCachePolicy>());
398
399 static_assert(
400 ClusterHostType<PostgreCachePolicy>() & storages::postgres::kClusterHostRolesMask,
401 "Cluster host role must be specified for caching component, "
402 "please be more specific"
403 );
404
405 static storages::postgres::Query GetQuery() {
406 if constexpr (kHasGetQuery<PostgreCachePolicy>) {
407 return PostgreCachePolicy::GetQuery();
408 } else {
409 return PostgreCachePolicy::kQuery;
410 }
411 }
412
413 using BaseType = CachingComponentBase<DataCacheContainerType<PostgreCachePolicy>>;
414};
415
416inline constexpr std::chrono::minutes kDefaultFullUpdateTimeout{1};
417inline constexpr std::chrono::seconds kDefaultIncrementalUpdateTimeout{1};
418inline constexpr std::chrono::milliseconds kStatementTimeoutOff{0};
419inline constexpr std::chrono::milliseconds kCpuRelaxThreshold{10};
420inline constexpr std::chrono::milliseconds kCpuRelaxInterval{2};
421
422inline constexpr std::string_view kCopyStage = "copy_data";
423inline constexpr std::string_view kFetchStage = "fetch";
424inline constexpr std::string_view kParseStage = "parse";
425
426inline constexpr std::size_t kDefaultChunkSize = 1000;
427inline constexpr std::chrono::milliseconds kDefaultSleepBetweenChunks{0};
428} // namespace pg_cache::detail
429
430/// @ingroup userver_components
431///
432/// @brief Caching component for PostgreSQL. See @ref pg_cache.
433///
434/// @see @ref pg_cache, @ref scripts/docs/en/userver/caches.md
435template <typename PostgreCachePolicy>
436class PostgreCache final : public pg_cache::detail::PolicyChecker<PostgreCachePolicy>::BaseType {
437public:
438 // Type aliases
439 using PolicyType = PostgreCachePolicy;
440 using ValueType = pg_cache::detail::ValueType<PolicyType>;
441 using RawValueType = pg_cache::detail::RawValueType<PolicyType>;
442 using DataType = pg_cache::detail::DataCacheContainerType<PolicyType>;
443 using PolicyCheckerType = pg_cache::detail::PolicyChecker<PostgreCachePolicy>;
444 using UpdatedFieldType = pg_cache::detail::UpdatedFieldType<PostgreCachePolicy>;
445 using BaseType = typename PolicyCheckerType::BaseType;
446
447 // Calculated constants
448 constexpr static bool kIncrementalUpdates = pg_cache::detail::kWantIncrementalUpdates<PolicyType>;
449 constexpr static auto kClusterHostTypeFlags = pg_cache::detail::ClusterHostType<PolicyType>();
450 constexpr static auto kName = PolicyType::kName;
451
452 PostgreCache(const ComponentConfig&, const ComponentContext&);
453 ~PostgreCache() override;
454
455 static yaml_config::Schema GetStaticConfigSchema();
456
457private:
458 using CachedData = std::unique_ptr<DataType>;
459
460 UpdatedFieldType GetLastUpdated(std::chrono::system_clock::time_point last_update, const DataType& cache) const;
461
462 void Update(
463 cache::UpdateType type,
464 const std::chrono::system_clock::time_point& last_update,
465 const std::chrono::system_clock::time_point& now,
466 cache::UpdateStatisticsScope& stats_scope
467 ) override;
468
469 bool MayReturnNull() const override;
470
471 CachedData GetDataSnapshot(cache::UpdateType type, tracing::ScopeTime& scope);
472 void CacheResults(
473 storages::postgres::ResultSet res,
474 CachedData& data_cache,
475 cache::UpdateStatisticsScope& stats_scope,
476 tracing::ScopeTime& scope
477 );
478
479 static storages::postgres::Query GetAllQuery();
480 static storages::postgres::Query GetDeltaQuery();
481 static std::string GetWhereClause();
482 static std::string GetDeltaWhereClause();
483 static std::string GetOrderByClause();
484
485 std::chrono::milliseconds ParseCorrection(const ComponentConfig& config);
486
487 std::vector<storages::postgres::ClusterPtr> clusters_;
488
489 const std::chrono::system_clock::duration correction_;
490 const std::chrono::milliseconds full_update_timeout_;
491 const std::chrono::milliseconds incremental_update_timeout_;
492 const std::size_t chunk_size_;
493 const std::chrono::milliseconds sleep_between_chunks_;
494 std::size_t cpu_relax_iterations_parse_{0};
495 std::size_t cpu_relax_iterations_copy_{0};
496};
497
498template <typename PostgreCachePolicy>
499inline constexpr bool kHasValidate<PostgreCache<PostgreCachePolicy>> = true;
500
501template <typename PostgreCachePolicy>
502PostgreCache<PostgreCachePolicy>::PostgreCache(const ComponentConfig& config, const ComponentContext& context)
503 : BaseType{config, context},
504 correction_{ParseCorrection(config)},
505 full_update_timeout_{
506 config["full-update-op-timeout"].As<std::chrono::milliseconds>(pg_cache::detail::kDefaultFullUpdateTimeout)},
507 incremental_update_timeout_{config["incremental-update-op-timeout"].As<std::chrono::milliseconds>(
508 pg_cache::detail::kDefaultIncrementalUpdateTimeout
509 )},
510 chunk_size_{config["chunk-size"].As<size_t>(pg_cache::detail::kDefaultChunkSize)},
511 sleep_between_chunks_{
512 config["sleep-between-chunks"].As<std::chrono::milliseconds>(pg_cache::detail::kDefaultSleepBetweenChunks)} {
514 !chunk_size_ || storages::postgres::Portal::IsSupportedByDriver(),
515 "Either set 'chunk-size' to 0, or enable PostgreSQL portals by building "
516 "the framework with CMake option USERVER_FEATURE_PATCH_LIBPQ set to ON."
517 );
518
519 if (this->GetAllowedUpdateTypes() == cache::AllowedUpdateTypes::kFullAndIncremental && !kIncrementalUpdates) {
520 throw std::logic_error(
521 "Incremental update support is requested in config but no update field "
522 "name is specified in traits of '" +
523 config.Name() + "' cache"
524 );
525 }
526 if (correction_.count() < 0) {
527 throw std::logic_error(
528 "Refusing to set forward (negative) update correction requested in "
529 "config for '" +
530 config.Name() + "' cache"
531 );
532 }
533
534 const auto pg_alias = config["pgcomponent"].As<std::string>("");
535 if (pg_alias.empty()) {
536 throw storages::postgres::InvalidConfig{"No `pgcomponent` entry in configuration"};
537 }
538 auto& pg_cluster_comp = context.FindComponent<components::Postgres>(pg_alias);
539 const auto shard_count = pg_cluster_comp.GetShardCount();
540 clusters_.resize(shard_count);
541 for (size_t i = 0; i < shard_count; ++i) {
542 clusters_[i] = pg_cluster_comp.GetClusterForShard(i);
543 }
544
545 LOG_INFO() << "Cache " << kName << " full update query `" << GetAllQuery().GetStatementView()
546 << "` incremental update query `" << GetDeltaQuery().GetStatementView() << "`";
547
548 this->StartPeriodicUpdates();
549}
550
551template <typename PostgreCachePolicy>
552PostgreCache<PostgreCachePolicy>::~PostgreCache() {
553 this->StopPeriodicUpdates();
554}
555
556template <typename PostgreCachePolicy>
557std::string PostgreCache<PostgreCachePolicy>::GetWhereClause() {
558 if constexpr (pg_cache::detail::kHasWhere<PostgreCachePolicy>) {
559 return fmt::format(FMT_COMPILE("where {}"), PostgreCachePolicy::kWhere);
560 } else {
561 return "";
562 }
563}
564
565template <typename PostgreCachePolicy>
566std::string PostgreCache<PostgreCachePolicy>::GetDeltaWhereClause() {
567 if constexpr (pg_cache::detail::kHasWhere<PostgreCachePolicy>) {
568 return fmt::format(
569 FMT_COMPILE("where ({}) and {} >= $1"), PostgreCachePolicy::kWhere, PostgreCachePolicy::kUpdatedField
570 );
571 } else {
572 return fmt::format(FMT_COMPILE("where {} >= $1"), PostgreCachePolicy::kUpdatedField);
573 }
574}
575
576template <typename PostgreCachePolicy>
577std::string PostgreCache<PostgreCachePolicy>::GetOrderByClause() {
578 if constexpr (pg_cache::detail::kHasOrderBy<PostgreCachePolicy>) {
579 return fmt::format(FMT_COMPILE("order by {}"), PostgreCachePolicy::kOrderBy);
580 } else {
581 return "";
582 }
583}
584
585template <typename PostgreCachePolicy>
586storages::postgres::Query PostgreCache<PostgreCachePolicy>::GetAllQuery() {
587 const storages::postgres::Query query = PolicyCheckerType::GetQuery();
588 return fmt::format("{} {} {}", query.GetStatementView(), GetWhereClause(), GetOrderByClause());
589}
590
591template <typename PostgreCachePolicy>
592storages::postgres::Query PostgreCache<PostgreCachePolicy>::GetDeltaQuery() {
593 if constexpr (kIncrementalUpdates) {
594 const storages::postgres::Query query = PolicyCheckerType::GetQuery();
595 return storages::postgres::Query{
596 fmt::format("{} {} {}", query.GetStatementView(), GetDeltaWhereClause(), GetOrderByClause()),
598 };
599 } else {
600 return GetAllQuery();
601 }
602}
603
604template <typename PostgreCachePolicy>
605std::chrono::milliseconds PostgreCache<PostgreCachePolicy>::ParseCorrection(const ComponentConfig& config) {
606 static constexpr std::string_view kUpdateCorrection = "update-correction";
607 if (pg_cache::detail::kHasCustomUpdated<PostgreCachePolicy> ||
608 this->GetAllowedUpdateTypes() == cache::AllowedUpdateTypes::kOnlyFull) {
609 return config[kUpdateCorrection].As<std::chrono::milliseconds>(0);
610 } else {
611 return config[kUpdateCorrection].As<std::chrono::milliseconds>();
612 }
613}
614
615template <typename PostgreCachePolicy>
616typename PostgreCache<PostgreCachePolicy>::UpdatedFieldType PostgreCache<PostgreCachePolicy>::GetLastUpdated(
617 [[maybe_unused]] std::chrono::system_clock::time_point last_update,
618 const DataType& cache
619) const {
620 if constexpr (pg_cache::detail::kHasCustomUpdated<PostgreCachePolicy>) {
621 return PostgreCachePolicy::GetLastKnownUpdated(cache);
622 } else {
623 return UpdatedFieldType{last_update - correction_};
624 }
625}
626
627template <typename PostgreCachePolicy>
628void PostgreCache<PostgreCachePolicy>::Update(
629 cache::UpdateType type,
630 const std::chrono::system_clock::time_point& last_update,
631 const std::chrono::system_clock::time_point& /*now*/,
632 cache::UpdateStatisticsScope& stats_scope
633) {
634 namespace pg = storages::postgres;
635 if constexpr (!kIncrementalUpdates) {
637 }
638 const auto query = (type == cache::UpdateType::kFull) ? GetAllQuery() : GetDeltaQuery();
639 const std::chrono::milliseconds timeout =
640 (type == cache::UpdateType::kFull) ? full_update_timeout_ : incremental_update_timeout_;
641
642 // COPY current cached data
643 auto scope = tracing::Span::CurrentSpan().CreateScopeTime(std::string{pg_cache::detail::kCopyStage});
644 auto data_cache = GetDataSnapshot(type, scope);
645 [[maybe_unused]] const auto old_size = data_cache->size();
646
647 scope.Reset(std::string{pg_cache::detail::kFetchStage});
648
649 size_t changes = 0;
650 // Iterate clusters
651 for (auto& cluster : clusters_) {
652 if (chunk_size_ > 0) {
653 auto trx = cluster->Begin(
654 kClusterHostTypeFlags,
656 pg::CommandControl{timeout, pg_cache::detail::kStatementTimeoutOff}
657 );
658 auto portal = trx.MakePortal(query, GetLastUpdated(last_update, *data_cache));
659 while (portal) {
660 scope.Reset(std::string{pg_cache::detail::kFetchStage});
661 auto res = portal.Fetch(chunk_size_);
662 stats_scope.IncreaseDocumentsReadCount(res.Size());
663
664 scope.Reset(std::string{pg_cache::detail::kParseStage});
665 CacheResults(res, data_cache, stats_scope, scope);
666 changes += res.Size();
667 if (sleep_between_chunks_.count() > 0) {
668 engine::InterruptibleSleepFor(sleep_between_chunks_);
669 }
670 }
671 trx.Commit();
672 } else {
673 const bool has_parameter = query.GetStatementView().find('$') != std::string::npos;
674 auto res = has_parameter ? cluster->Execute(
675 kClusterHostTypeFlags,
676 pg::CommandControl{timeout, pg_cache::detail::kStatementTimeoutOff},
677 query,
678 GetLastUpdated(last_update, *data_cache)
679 )
680 : cluster->Execute(
681 kClusterHostTypeFlags,
682 pg::CommandControl{timeout, pg_cache::detail::kStatementTimeoutOff},
683 query
684 );
685 stats_scope.IncreaseDocumentsReadCount(res.Size());
686
687 scope.Reset(std::string{pg_cache::detail::kParseStage});
688 CacheResults(res, data_cache, stats_scope, scope);
689 changes += res.Size();
690 }
691 }
692
693 scope.Reset();
694
695 if constexpr (pg_cache::detail::kIsContainerCopiedByElement<DataType>) {
696 if (old_size > 0) {
697 const auto elapsed_copy = scope.ElapsedTotal(std::string{pg_cache::detail::kCopyStage});
698 if (elapsed_copy > pg_cache::detail::kCpuRelaxThreshold) {
699 cpu_relax_iterations_copy_ = static_cast<std::size_t>(
700 static_cast<double>(old_size) / (elapsed_copy / pg_cache::detail::kCpuRelaxInterval)
701 );
702 LOG_TRACE() << "Elapsed time for copying " << kName << " " << elapsed_copy.count() << " for " << changes
703 << " data items is over threshold. Will relax CPU every " << cpu_relax_iterations_parse_
704 << " iterations";
705 }
706 }
707 }
708
709 if (changes > 0) {
710 const auto elapsed_parse = scope.ElapsedTotal(std::string{pg_cache::detail::kParseStage});
711 if (elapsed_parse > pg_cache::detail::kCpuRelaxThreshold) {
712 cpu_relax_iterations_parse_ = static_cast<std::size_t>(
713 static_cast<double>(changes) / (elapsed_parse / pg_cache::detail::kCpuRelaxInterval)
714 );
715 LOG_TRACE() << "Elapsed time for parsing " << kName << " " << elapsed_parse.count() << " for " << changes
716 << " data items is over threshold. Will relax CPU every " << cpu_relax_iterations_parse_
717 << " iterations";
718 }
719 }
720 if (changes > 0 || type == cache::UpdateType::kFull) {
721 // Set current cache
722 pg_cache::detail::OnWritesDone(*data_cache);
723 stats_scope.Finish(data_cache->size());
724 this->Set(std::move(data_cache));
725 } else {
726 stats_scope.FinishNoChanges();
727 }
728}
729
730template <typename PostgreCachePolicy>
731bool PostgreCache<PostgreCachePolicy>::MayReturnNull() const {
732 return pg_cache::detail::MayReturnNull<PolicyType>();
733}
734
735template <typename PostgreCachePolicy>
736void PostgreCache<PostgreCachePolicy>::CacheResults(
737 storages::postgres::ResultSet res,
738 CachedData& data_cache,
739 cache::UpdateStatisticsScope& stats_scope,
740 tracing::ScopeTime& scope
741) {
742 auto values = res.AsSetOf<RawValueType>(storages::postgres::kRowTag);
743 utils::CpuRelax relax{cpu_relax_iterations_parse_, &scope};
744 for (auto p = values.begin(); p != values.end(); ++p) {
745 relax.Relax();
746 try {
747 using pg_cache::detail::CacheInsertOrAssign;
748 CacheInsertOrAssign(
749 *data_cache, pg_cache::detail::ExtractValue<PostgreCachePolicy>(*p), PostgreCachePolicy::kKeyMember
750 );
751 } catch (const std::exception& e) {
753 LOG_ERROR() << "Error parsing data row in cache '" << kName << "' to '"
754 << compiler::GetTypeName<ValueType>() << "': " << e.what();
755 }
756 }
757}
758
759template <typename PostgreCachePolicy>
760typename PostgreCache<PostgreCachePolicy>::CachedData
761PostgreCache<PostgreCachePolicy>::GetDataSnapshot(cache::UpdateType type, tracing::ScopeTime& scope) {
763 auto data = this->Get();
764 if (data) {
765 return pg_cache::detail::CopyContainer(*data, cpu_relax_iterations_copy_, scope);
766 }
767 }
768 return std::make_unique<DataType>();
769}
770
771namespace impl {
772
773std::string GetPostgreCacheSchema();
774
775} // namespace impl
776
777template <typename PostgreCachePolicy>
778yaml_config::Schema PostgreCache<PostgreCachePolicy>::GetStaticConfigSchema() {
779 using ParentType = typename pg_cache::detail::PolicyChecker<PostgreCachePolicy>::BaseType;
780 return yaml_config::MergeSchemas<ParentType>(impl::GetPostgreCacheSchema());
781}
782
783} // namespace components
784
785namespace utils::impl::projected_set {
786
787template <typename Set, typename Value, typename KeyMember>
788void CacheInsertOrAssign(Set& set, Value&& value, const KeyMember& /*key_member*/) {
789 DoInsert(set, std::forward<Value>(value));
790}
791
792} // namespace utils::impl::projected_set
793
794USERVER_NAMESPACE_END