userver: userver/cache/base_postgres_cache.hpp Source File
Loading...
Searching...
No Matches
base_postgres_cache.hpp
Go to the documentation of this file.
1#pragma once
2
3/// @file userver/cache/base_postgres_cache.hpp
4/// @brief @copybrief components::PostgreCache
5
6#include <userver/cache/base_postgres_cache_fwd.hpp>
7
8#include <chrono>
9#include <map>
10#include <string_view>
11#include <type_traits>
12#include <unordered_map>
13
14#include <fmt/format.h>
15
16#include <userver/cache/cache_statistics.hpp>
17#include <userver/cache/caching_component_base.hpp>
18#include <userver/components/component_config.hpp>
19#include <userver/components/component_context.hpp>
20
21#include <userver/storages/postgres/cluster.hpp>
22#include <userver/storages/postgres/component.hpp>
23#include <userver/storages/postgres/io/chrono.hpp>
24
25#include <userver/compiler/demangle.hpp>
26#include <userver/engine/sleep.hpp>
27#include <userver/logging/log.hpp>
28#include <userver/tracing/span.hpp>
29#include <userver/utils/assert.hpp>
30#include <userver/utils/cpu_relax.hpp>
31#include <userver/utils/meta.hpp>
32#include <userver/utils/void_t.hpp>
33#include <userver/yaml_config/merge_schemas.hpp>
34
35USERVER_NAMESPACE_BEGIN
36
37namespace components {
38
39/// @page pg_cache Caching Component for PostgreSQL
40///
41/// A typical components::PostgreCache usage consists of trait definition:
42///
43/// @snippet postgresql/src/cache/postgres_cache_test.cpp Pg Cache Policy Trivial
44///
45/// and registration of the component in components::ComponentList:
46///
47/// @snippet postgresql/src/cache/postgres_cache_test.cpp Pg Cache Trivial Usage
48///
49/// See @ref scripts/docs/en/userver/caches.md for introduction into caches.
50///
51///
52/// @section pg_cc_configuration Configuration
53///
54/// components::PostgreCache static configuration file should have a PostgreSQL
55/// component name specified in `pgcomponent` configuration parameter.
56///
57/// Optionally the operation timeouts for cache loading can be specified.
58///
59/// For avoiding "memory leaks", see the respective section
60/// in @ref components::CachingComponentBase.
61///
62/// @include{doc} scripts/docs/en/components_schema/postgresql/src/cache/base_postgres_cache.md
63///
64/// Options inherited from @ref components::CachingComponentBase :
65/// @include{doc} scripts/docs/en/components_schema/core/src/cache/caching_component_base.md
66///
67/// Options inherited from @ref components::ComponentBase :
68/// @include{doc} scripts/docs/en/components_schema/core/src/components/impl/component_base.md
69///
70/// @section pg_cc_cache_policy Cache policy
71///
72/// Cache policy is the template argument of components::PostgreCache component.
73/// Please see the following code snippet for documentation.
74///
75/// @snippet cache/postgres_cache_test.cpp Pg Cache Policy Example
76///
77/// The query can be a std::string. But due to non-guaranteed order of static
78/// data members initialization, std::string should be returned from a static
79/// member function, please see the following code snippet.
80///
81/// @snippet cache/postgres_cache_test.cpp Pg Cache Policy GetQuery Example
82///
83/// Policy may have static function GetLastKnownUpdated. It should be used
84/// when new entries from database are taken via revision, identifier, or
85/// anything else, but not timestamp of the last update.
86/// If this function is supplied, new entries are taken from db with condition
87/// 'WHERE kUpdatedField > GetLastKnownUpdated(cache_container)'.
88/// Otherwise, condition is
89/// 'WHERE kUpdatedField > last_update - correction_'.
90/// See the following code snippet for an example of usage
91///
92/// @snippet cache/postgres_cache_test.cpp Pg Cache Policy Custom Updated Example
93///
94/// Cache can also store only subset of data. For example for the database that is is defined in the following way:
95///
96/// @include samples/postgres_cache_order_by/schemas/postgresql/key_value.sql
97///
98/// it is possible to create a cache that stores only the latest `value`:
99///
100/// @snippet samples/postgres_cache_order_by/main.cpp Last pg cache
101///
102/// In case one provides a custom CacheContainer within Policy, it is notified
103/// of Update completion via its public member function OnWritesDone, if any.
104/// Custom CacheContainer must provide size method and insert_or_assign method
105/// similar to std::unordered_map's one or CacheInsertOrAssign function similar
106/// to one defined in namespace utils::impl::projected_set (i.e. used for
107/// utils::ProjectedUnorderedSet).
108/// See the following code snippet for an example of usage:
109///
110/// @snippet cache/postgres_cache_test.cpp Pg Cache Policy Custom Container With Write Notification Example
111///
112/// @section pg_cc_forward_declaration Forward Declaration
113///
114/// To forward declare a cache you can forward declare a trait and
115/// include userver/cache/base_postgres_cache_fwd.hpp header. It is also useful to
116/// forward declare the cache value type.
117///
118/// @snippet cache/postgres_cache_test_fwd.hpp Pg Cache Fwd Example
119///
120/// ----------
121///
122/// @htmlonly <div class="bottom-nav"> @endhtmlonly
123/// ⇦ @ref scripts/docs/en/userver/cache_dumps.md | @ref scripts/docs/en/userver/lru_cache.md ⇨
124/// @htmlonly </div> @endhtmlonly
125
126namespace pg_cache::detail {
127
128template <typename T>
129using ValueType = typename T::ValueType;
130template <typename T>
131inline constexpr bool kHasValueType = meta::IsDetected<ValueType, T>;
132
133template <typename T>
134using RawValueTypeImpl = typename T::RawValueType;
135template <typename T>
136inline constexpr bool kHasRawValueType = meta::IsDetected<RawValueTypeImpl, T>;
137template <typename T>
138using RawValueType = meta::DetectedOr<ValueType<T>, RawValueTypeImpl, T>;
139
140template <typename PostgreCachePolicy>
141auto ExtractValue(RawValueType<PostgreCachePolicy>&& raw) {
142 if constexpr (kHasRawValueType<PostgreCachePolicy>) {
143 return Convert(std::move(raw), formats::parse::To<ValueType<PostgreCachePolicy>>());
144 } else {
145 return std::move(raw);
146 }
147}
148
149// Component name in policy
150template <typename T>
151using HasNameImpl = std::enable_if_t<!std::string_view{T::kName}.empty()>;
152template <typename T>
153inline constexpr bool kHasName = meta::IsDetected<HasNameImpl, T>;
154
155// Component query in policy
156template <typename T>
157using HasQueryImpl = decltype(T::kQuery);
158template <typename T>
159inline constexpr bool kHasQuery = meta::IsDetected<HasQueryImpl, T>;
160
161// Component GetQuery in policy
162template <typename T>
163using HasGetQueryImpl = decltype(T::GetQuery());
164template <typename T>
165inline constexpr bool kHasGetQuery = meta::IsDetected<HasGetQueryImpl, T>;
166
167// Component kWhere in policy
168template <typename T>
169using HasWhere = decltype(T::kWhere);
170template <typename T>
171inline constexpr bool kHasWhere = meta::IsDetected<HasWhere, T>;
172
173// Component kOrderBy in policy
174template <typename T>
175using HasOrderBy = decltype(T::kOrderBy);
176template <typename T>
177inline constexpr bool kHasOrderBy = meta::IsDetected<HasOrderBy, T>;
178
179// Component kTag in policy
180template <typename T>
181using HasTag = decltype(T::kTag);
182template <typename T>
183inline constexpr bool kHasTag = meta::IsDetected<HasTag, T>;
184
185template <typename PostgreCachePolicy>
186auto GetTag() {
187 if constexpr (kHasTag<PostgreCachePolicy>) {
188 return PostgreCachePolicy::kTag;
189 } else {
190 return storages::postgres::kRowTag;
191 }
192}
193
194// Update field
195template <typename T>
196using HasUpdatedField = decltype(T::kUpdatedField);
197template <typename T>
198inline constexpr bool kHasUpdatedField = meta::IsDetected<HasUpdatedField, T>;
199
200template <typename T>
201using WantIncrementalUpdates = std::enable_if_t<!std::string_view{T::kUpdatedField}.empty()>;
202template <typename T>
203inline constexpr bool kWantIncrementalUpdates = meta::IsDetected<WantIncrementalUpdates, T>;
204
205// Key member in policy
206template <typename T>
207using KeyMemberTypeImpl = std::decay_t<std::invoke_result_t<decltype(T::kKeyMember), ValueType<T>>>;
208template <typename T>
209inline constexpr bool kHasKeyMember = meta::IsDetected<KeyMemberTypeImpl, T>;
210template <typename T>
211using KeyMemberType = meta::DetectedType<KeyMemberTypeImpl, T>;
212
213// size method in custom container in policy
214template <typename T>
215using SizeMethodInvokeResultImpl = decltype(std::declval<T>().size());
216template <typename T>
217inline constexpr bool kHasSizeMethod =
218 meta::IsDetected<SizeMethodInvokeResultImpl, T> &&
219 std::is_convertible_v<SizeMethodInvokeResultImpl<T>, std::size_t>;
220
221// insert_or_assign method in custom container in policy
222template <typename T>
223using InsertOrAssignMethodInvokeResultImpl =
224 decltype(std::declval<typename T::CacheContainer>()
225 .insert_or_assign(std::declval<KeyMemberTypeImpl<T>>(), std::declval<ValueType<T>>()));
226template <typename T>
227inline constexpr bool kHasInsertOrAssignMethod = meta::IsDetected<InsertOrAssignMethodInvokeResultImpl, T>;
228
229// CacheInsertOrAssign function for custom container in policy
230template <typename T>
231using CacheInsertOrAssignFunctionInvokeResultImpl = decltype(CacheInsertOrAssign(
232 std::declval<typename T::CacheContainer&>(),
233 std::declval<ValueType<T>>(),
234 std::declval<KeyMemberTypeImpl<T>>()
235));
236template <typename T>
237inline constexpr bool
238 kHasCacheInsertOrAssignFunction = meta::IsDetected<CacheInsertOrAssignFunctionInvokeResultImpl, T>;
239
240// Data container for cache
241template <typename T, typename = USERVER_NAMESPACE::utils::void_t<>>
242struct DataCacheContainer {
243 static_assert(
244 meta::kIsStdHashable<KeyMemberType<T>>,
245 "With default CacheContainer, key type must be std::hash-able"
246 );
247
248 using type = std::unordered_map<KeyMemberType<T>, ValueType<T>>;
249};
250
251template <typename T>
252struct DataCacheContainer<T, USERVER_NAMESPACE::utils::void_t<typename T::CacheContainer>> {
253 static_assert(kHasSizeMethod<typename T::CacheContainer>, "Custom CacheContainer must provide `size` method");
254 static_assert(
255 kHasInsertOrAssignMethod<T> || kHasCacheInsertOrAssignFunction<T>,
256 "Custom CacheContainer must provide `insert_or_assign` method similar to std::unordered_map's "
257 "one or CacheInsertOrAssign function"
258 );
259
260 using type = typename T::CacheContainer;
261};
262
263template <typename T>
264using DataCacheContainerType = typename DataCacheContainer<T>::type;
265
266// We have to whitelist container types, for which we perform by-element
267// copying, because it's not correct for certain custom containers.
268template <typename T>
269inline constexpr bool kIsContainerCopiedByElement =
270 meta::kIsInstantiationOf<std::unordered_map, T> || meta::kIsInstantiationOf<std::map, T>;
271
272template <typename T>
273std::unique_ptr<T> CopyContainer(
274 const T& container,
275 [[maybe_unused]] std::size_t cpu_relax_iterations,
276 tracing::ScopeTime& scope
277) {
278 if constexpr (kIsContainerCopiedByElement<T>) {
279 auto copy = std::make_unique<T>();
280 if constexpr (meta::kIsReservable<T>) {
281 copy->reserve(container.size());
282 }
283
284 utils::CpuRelax relax{cpu_relax_iterations, &scope};
285 for (const auto& kv : container) {
286 relax.Relax();
287 copy->insert(kv);
288 }
289 return copy;
290 } else {
291 return std::make_unique<T>(container);
292 }
293}
294
295template <typename Container, typename Value, typename KeyMember, typename... Args>
296void CacheInsertOrAssign(Container& container, Value&& value, const KeyMember& key_member, Args&&... /*args*/) {
297 // Args are only used to de-prioritize this default overload.
298 static_assert(sizeof...(Args) == 0);
299 // Copy 'key' to avoid aliasing issues in 'insert_or_assign'.
300 auto key = std::invoke(key_member, value);
301 container.insert_or_assign(std::move(key), std::forward<Value>(value));
302}
303
304template <typename T>
305using HasOnWritesDoneImpl = decltype(std::declval<T&>().OnWritesDone());
306
307template <typename T>
308void OnWritesDone(T& container) {
309 if constexpr (meta::IsDetected<HasOnWritesDoneImpl, T>) {
310 container.OnWritesDone();
311 }
312}
313
314template <typename T>
315using HasCustomUpdatedImpl = decltype(T::GetLastKnownUpdated(std::declval<DataCacheContainerType<T>>()));
316
317template <typename T>
318inline constexpr bool kHasCustomUpdated = meta::IsDetected<HasCustomUpdatedImpl, T>;
319
320template <typename T>
321using UpdatedFieldTypeImpl = typename T::UpdatedFieldType;
322template <typename T>
323inline constexpr bool kHasUpdatedFieldType = meta::IsDetected<UpdatedFieldTypeImpl, T>;
324template <typename T>
325using UpdatedFieldType = meta::DetectedOr<storages::postgres::TimePointTz, UpdatedFieldTypeImpl, T>;
326
327template <typename T>
328constexpr bool CheckUpdatedFieldType() {
329 if constexpr (kHasUpdatedFieldType<T>) {
330#if USERVER_POSTGRES_ENABLE_LEGACY_TIMESTAMP
331 static_assert(
332 std::is_same_v<typename T::UpdatedFieldType, storages::postgres::TimePointTz> ||
333 std::is_same_v<typename T::UpdatedFieldType, storages::postgres::TimePointWithoutTz> ||
334 std::is_same_v<typename T::UpdatedFieldType, storages::postgres::TimePoint> || kHasCustomUpdated<T>,
335 "Invalid UpdatedFieldType, must be either TimePointTz or "
336 "TimePointWithoutTz"
337 "or (legacy) system_clock::time_point"
338 );
339#else
340 static_assert(
341 std::is_same_v<typename T::UpdatedFieldType, storages::postgres::TimePointTz> ||
342 std::is_same_v<typename T::UpdatedFieldType, storages::postgres::TimePointWithoutTz> ||
343 kHasCustomUpdated<T>,
344 "Invalid UpdatedFieldType, must be either TimePointTz or "
345 "TimePointWithoutTz"
346 );
347#endif
348 } else {
349 static_assert(
350 !kWantIncrementalUpdates<T>,
351 "UpdatedFieldType must be explicitly specified when using "
352 "incremental updates"
353 );
354 }
355 return true;
356}
357
358// Cluster host type policy
359template <typename T>
360using HasClusterHostTypeImpl = decltype(T::kClusterHostType);
361
362template <typename T>
363constexpr storages::postgres::ClusterHostTypeFlags ClusterHostType() {
364 if constexpr (meta::IsDetected<HasClusterHostTypeImpl, T>) {
365 return T::kClusterHostType;
366 } else {
368 }
369}
370
371// May return null policy
372template <typename T>
373using HasMayReturnNull = decltype(T::kMayReturnNull);
374
375template <typename T>
376constexpr bool MayReturnNull() {
377 if constexpr (meta::IsDetected<HasMayReturnNull, T>) {
378 return T::kMayReturnNull;
379 } else {
380 return false;
381 }
382}
383
384template <typename PostgreCachePolicy>
385struct PolicyChecker {
386 // Static assertions for cache traits
387 static_assert(kHasName<PostgreCachePolicy>, "The PosgreSQL cache policy must contain a static member `kName`");
388 static_assert(kHasValueType<PostgreCachePolicy>, "The PosgreSQL cache policy must define a type alias `ValueType`");
389 static_assert(
390 kHasKeyMember<PostgreCachePolicy>,
391 "The PostgreSQL cache policy must contain a static member `kKeyMember` "
392 "with a pointer to a data or a function member with the object's key"
393 );
394 static_assert(
395 kHasQuery<PostgreCachePolicy> || kHasGetQuery<PostgreCachePolicy>,
396 "The PosgreSQL cache policy must contain a static data member "
397 "`kQuery` with a select statement or a static member function "
398 "`GetQuery` returning the query"
399 );
400 static_assert(
401 !(kHasQuery<PostgreCachePolicy> && kHasGetQuery<PostgreCachePolicy>),
402 "The PosgreSQL cache policy must define `kQuery` or "
403 "`GetQuery`, not both"
404 );
405 static_assert(
406 kHasUpdatedField<PostgreCachePolicy>,
407 "The PosgreSQL cache policy must contain a static member "
408 "`kUpdatedField`. If you don't want to use incremental updates, "
409 "please set its value to `nullptr`"
410 );
411 static_assert(CheckUpdatedFieldType<PostgreCachePolicy>());
412
413 static_assert(
414 ClusterHostType<PostgreCachePolicy>() & storages::postgres::kClusterHostRolesMask,
415 "Cluster host role must be specified for caching component, "
416 "please be more specific"
417 );
418
419 static storages::postgres::Query GetQuery() {
420 if constexpr (kHasGetQuery<PostgreCachePolicy>) {
421 return PostgreCachePolicy::GetQuery();
422 } else {
423 return PostgreCachePolicy::kQuery;
424 }
425 }
426
427 using BaseType = CachingComponentBase<DataCacheContainerType<PostgreCachePolicy>>;
428};
429
430inline constexpr std::chrono::minutes kDefaultFullUpdateTimeout{1};
431inline constexpr std::chrono::seconds kDefaultIncrementalUpdateTimeout{1};
432inline constexpr std::chrono::milliseconds kStatementTimeoutOff{0};
433inline constexpr std::chrono::milliseconds kCpuRelaxThreshold{10};
434inline constexpr std::chrono::milliseconds kCpuRelaxInterval{2};
435
436inline constexpr std::string_view kCopyStage = "copy_data";
437inline constexpr std::string_view kFetchStage = "fetch";
438inline constexpr std::string_view kParseStage = "parse";
439
440inline constexpr std::size_t kDefaultChunkSize = 1000;
441inline constexpr std::chrono::milliseconds kDefaultSleepBetweenChunks{0};
442} // namespace pg_cache::detail
443
444/// @ingroup userver_components
445///
446/// @brief Caching component for PostgreSQL. See @ref pg_cache.
447///
448/// @see @ref pg_cache, @ref scripts/docs/en/userver/caches.md
449template <typename PostgreCachePolicy>
450class PostgreCache final : public pg_cache::detail::PolicyChecker<PostgreCachePolicy>::BaseType {
451public:
452 // Type aliases
453 using PolicyType = PostgreCachePolicy;
454 using ValueType = pg_cache::detail::ValueType<PolicyType>;
455 using RawValueType = pg_cache::detail::RawValueType<PolicyType>;
456 using DataType = pg_cache::detail::DataCacheContainerType<PolicyType>;
457 using PolicyCheckerType = pg_cache::detail::PolicyChecker<PostgreCachePolicy>;
458 using UpdatedFieldType = pg_cache::detail::UpdatedFieldType<PostgreCachePolicy>;
459 using BaseType = typename PolicyCheckerType::BaseType;
460
461 // Calculated constants
462 constexpr static bool kIncrementalUpdates = pg_cache::detail::kWantIncrementalUpdates<PolicyType>;
463 constexpr static auto kClusterHostTypeFlags = pg_cache::detail::ClusterHostType<PolicyType>();
464 constexpr static auto kName = PolicyType::kName;
465
466 PostgreCache(const ComponentConfig&, const ComponentContext&);
467
468 static yaml_config::Schema GetStaticConfigSchema();
469
470private:
471 using CachedData = std::unique_ptr<DataType>;
472
473 UpdatedFieldType GetLastUpdated(std::chrono::system_clock::time_point last_update, const DataType& cache) const;
474
475 void Update(
476 cache::UpdateType type,
477 const std::chrono::system_clock::time_point& last_update,
478 const std::chrono::system_clock::time_point& now,
479 cache::UpdateStatisticsScope& stats_scope
480 ) override;
481
482 bool MayReturnNull() const override;
483
484 CachedData GetDataSnapshot(cache::UpdateType type, tracing::ScopeTime& scope);
485 void CacheResults(
486 storages::postgres::ResultSet res,
487 CachedData& data_cache,
488 cache::UpdateStatisticsScope& stats_scope,
489 tracing::ScopeTime& scope
490 );
491
492 static storages::postgres::Query GetAllQuery();
493 static storages::postgres::Query GetDeltaQuery();
494 static std::string GetWhereClause();
495 static std::string GetDeltaWhereClause();
496 static std::string GetOrderByClause();
497
498 std::chrono::milliseconds ParseCorrection(const ComponentConfig& config);
499
500 std::vector<storages::postgres::ClusterPtr> clusters_;
501
502 const std::chrono::system_clock::duration correction_;
503 const std::chrono::milliseconds full_update_timeout_;
504 const std::chrono::milliseconds incremental_update_timeout_;
505 const std::size_t chunk_size_;
506 const std::chrono::milliseconds sleep_between_chunks_;
507 std::size_t cpu_relax_iterations_parse_{0};
508 std::size_t cpu_relax_iterations_copy_{0};
509};
510
511template <typename PostgreCachePolicy>
512inline constexpr bool kHasValidate<PostgreCache<PostgreCachePolicy>> = true;
513
514template <typename PostgreCachePolicy>
515PostgreCache<PostgreCachePolicy>::PostgreCache(const ComponentConfig& config, const ComponentContext& context)
516 : BaseType{config, context},
517 correction_{ParseCorrection(config)},
518 full_update_timeout_{
519 config["full-update-op-timeout"].As<std::chrono::milliseconds>(pg_cache::detail::kDefaultFullUpdateTimeout)
520 },
521 incremental_update_timeout_{
522 config["incremental-update-op-timeout"]
523 .As<std::chrono::milliseconds>(pg_cache::detail::kDefaultIncrementalUpdateTimeout)
524 },
525 chunk_size_{config["chunk-size"].As<size_t>(pg_cache::detail::kDefaultChunkSize)},
526 sleep_between_chunks_{
527 config["sleep-between-chunks"].As<std::chrono::milliseconds>(pg_cache::detail::kDefaultSleepBetweenChunks)
528 }
529{
531 !chunk_size_ || storages::postgres::Portal::IsSupportedByDriver(),
532 "Either set 'chunk-size' to 0, or enable PostgreSQL portals by building "
533 "the framework with CMake option USERVER_FEATURE_PATCH_LIBPQ set to ON."
534 );
535
536 if (this->GetAllowedUpdateTypes() == cache::AllowedUpdateTypes::kFullAndIncremental && !kIncrementalUpdates) {
537 throw std::logic_error(
538 "Incremental update support is requested in config but no update field "
539 "name is specified in traits of '" +
540 config.Name() + "' cache"
541 );
542 }
543 if (correction_.count() < 0) {
544 throw std::logic_error(
545 "Refusing to set forward (negative) update correction requested in "
546 "config for '" +
547 config.Name() + "' cache"
548 );
549 }
550
551 const auto pg_alias = config["pgcomponent"].As<std::string>("");
552 if (pg_alias.empty()) {
553 throw storages::postgres::InvalidConfig{"No `pgcomponent` entry in configuration"};
554 }
555 auto& pg_cluster_comp = context.FindComponent<components::Postgres>(pg_alias);
556 const auto shard_count = pg_cluster_comp.GetShardCount();
557 clusters_.resize(shard_count);
558 for (size_t i = 0; i < shard_count; ++i) {
559 clusters_[i] = pg_cluster_comp.GetClusterForShard(i);
560 }
561
562 LOG_INFO()
563 << "Cache " << kName << " full update query `" << GetAllQuery().GetStatementView()
564 << "` incremental update query `" << GetDeltaQuery().GetStatementView() << "`";
565}
566
567template <typename PostgreCachePolicy>
568std::string PostgreCache<PostgreCachePolicy>::GetWhereClause() {
569 if constexpr (pg_cache::detail::kHasWhere<PostgreCachePolicy>) {
570 return fmt::format(FMT_COMPILE("where {}"), PostgreCachePolicy::kWhere);
571 } else {
572 return "";
573 }
574}
575
576template <typename PostgreCachePolicy>
577std::string PostgreCache<PostgreCachePolicy>::GetDeltaWhereClause() {
578 if constexpr (pg_cache::detail::kHasWhere<PostgreCachePolicy>) {
579 return fmt::format(
580 FMT_COMPILE("where ({}) and {} >= $1"),
581 PostgreCachePolicy::kWhere,
582 PostgreCachePolicy::kUpdatedField
583 );
584 } else {
585 return fmt::format(FMT_COMPILE("where {} >= $1"), PostgreCachePolicy::kUpdatedField);
586 }
587}
588
589template <typename PostgreCachePolicy>
590std::string PostgreCache<PostgreCachePolicy>::GetOrderByClause() {
591 if constexpr (pg_cache::detail::kHasOrderBy<PostgreCachePolicy>) {
592 return fmt::format(FMT_COMPILE("order by {}"), PostgreCachePolicy::kOrderBy);
593 } else {
594 return "";
595 }
596}
597
598template <typename PostgreCachePolicy>
599storages::postgres::Query PostgreCache<PostgreCachePolicy>::GetAllQuery() {
600 const storages::postgres::Query query = PolicyCheckerType::GetQuery();
601 return fmt::format("{} {} {}", query.GetStatementView(), GetWhereClause(), GetOrderByClause());
602}
603
604template <typename PostgreCachePolicy>
605storages::postgres::Query PostgreCache<PostgreCachePolicy>::GetDeltaQuery() {
606 if constexpr (kIncrementalUpdates) {
607 const storages::postgres::Query query = PolicyCheckerType::GetQuery();
608 return storages::postgres::Query{
609 fmt::format("{} {} {}", query.GetStatementView(), GetDeltaWhereClause(), GetOrderByClause()),
611 };
612 } else {
613 return GetAllQuery();
614 }
615}
616
617template <typename PostgreCachePolicy>
618std::chrono::milliseconds PostgreCache<PostgreCachePolicy>::ParseCorrection(const ComponentConfig& config) {
619 static constexpr std::string_view kUpdateCorrection = "update-correction";
620 if (pg_cache::detail::kHasCustomUpdated<PostgreCachePolicy> ||
621 this->GetAllowedUpdateTypes() == cache::AllowedUpdateTypes::kOnlyFull)
622 {
623 return config[kUpdateCorrection].As<std::chrono::milliseconds>(0);
624 } else {
625 return config[kUpdateCorrection].As<std::chrono::milliseconds>();
626 }
627}
628
629template <typename PostgreCachePolicy>
630typename PostgreCache<PostgreCachePolicy>::UpdatedFieldType PostgreCache<PostgreCachePolicy>::GetLastUpdated(
631 [[maybe_unused]] std::chrono::system_clock::time_point last_update,
632 const DataType& cache
633) const {
634 if constexpr (pg_cache::detail::kHasCustomUpdated<PostgreCachePolicy>) {
635 return PostgreCachePolicy::GetLastKnownUpdated(cache);
636 } else {
637 return UpdatedFieldType{last_update - correction_};
638 }
639}
640
641template <typename PostgreCachePolicy>
642void PostgreCache<PostgreCachePolicy>::Update(
643 cache::UpdateType type,
644 const std::chrono::system_clock::time_point& last_update,
645 const std::chrono::system_clock::time_point& /*now*/,
646 cache::UpdateStatisticsScope& stats_scope
647) {
648 namespace pg = storages::postgres;
649 if constexpr (!kIncrementalUpdates) {
651 }
652 const auto query = (type == cache::UpdateType::kFull) ? GetAllQuery() : GetDeltaQuery();
653 const std::chrono::milliseconds
654 timeout = (type == cache::UpdateType::kFull) ? full_update_timeout_ : incremental_update_timeout_;
655
656 // COPY current cached data
657 auto scope = tracing::Span::CurrentSpan().CreateScopeTime(std::string{pg_cache::detail::kCopyStage});
658 auto data_cache = GetDataSnapshot(type, scope);
659 [[maybe_unused]] const auto old_size = data_cache->size();
660
661 scope.Reset(std::string{pg_cache::detail::kFetchStage});
662
663 size_t changes = 0;
664 // Iterate clusters
665 for (auto& cluster : clusters_) {
666 if (chunk_size_ > 0) {
667 auto trx = cluster->Begin(
668 kClusterHostTypeFlags,
670 pg::CommandControl{timeout, pg_cache::detail::kStatementTimeoutOff}
671 );
672 auto portal = trx.MakePortal(query, GetLastUpdated(last_update, *data_cache));
673 while (portal) {
674 scope.Reset(std::string{pg_cache::detail::kFetchStage});
675 auto res = portal.Fetch(chunk_size_);
676 stats_scope.IncreaseDocumentsReadCount(res.Size());
677
678 scope.Reset(std::string{pg_cache::detail::kParseStage});
679 CacheResults(res, data_cache, stats_scope, scope);
680 changes += res.Size();
681 if (sleep_between_chunks_.count() > 0) {
682 engine::InterruptibleSleepFor(sleep_between_chunks_);
683 }
684 }
685 trx.Commit();
686 } else {
687 const bool has_parameter = query.GetStatementView().find('$') != std::string::npos;
688 auto res =
689 has_parameter
690 ? cluster->Execute(
691 kClusterHostTypeFlags,
692 pg::CommandControl{timeout, pg_cache::detail::kStatementTimeoutOff},
693 query,
694 GetLastUpdated(last_update, *data_cache)
695 )
696 : cluster->Execute(
697 kClusterHostTypeFlags,
698 pg::CommandControl{timeout, pg_cache::detail::kStatementTimeoutOff},
699 query
700 );
701 stats_scope.IncreaseDocumentsReadCount(res.Size());
702
703 scope.Reset(std::string{pg_cache::detail::kParseStage});
704 CacheResults(res, data_cache, stats_scope, scope);
705 changes += res.Size();
706 }
707 }
708
709 scope.Reset();
710
711 if constexpr (pg_cache::detail::kIsContainerCopiedByElement<DataType>) {
712 if (old_size > 0) {
713 const auto elapsed_copy = scope.ElapsedTotal(std::string{pg_cache::detail::kCopyStage});
714 if (elapsed_copy > pg_cache::detail::kCpuRelaxThreshold) {
715 cpu_relax_iterations_copy_ = static_cast<
716 std::size_t>(static_cast<double>(old_size) / (elapsed_copy / pg_cache::detail::kCpuRelaxInterval));
717 LOG_TRACE()
718 << "Elapsed time for copying " << kName << " " << elapsed_copy.count() << " for " << changes
719 << " data items is over threshold. Will relax CPU every " << cpu_relax_iterations_parse_
720 << " iterations";
721 }
722 }
723 }
724
725 if (changes > 0) {
726 const auto elapsed_parse = scope.ElapsedTotal(std::string{pg_cache::detail::kParseStage});
727 if (elapsed_parse > pg_cache::detail::kCpuRelaxThreshold) {
728 cpu_relax_iterations_parse_ = static_cast<
729 std::size_t>(static_cast<double>(changes) / (elapsed_parse / pg_cache::detail::kCpuRelaxInterval));
730 LOG_TRACE()
731 << "Elapsed time for parsing " << kName << " " << elapsed_parse.count() << " for " << changes
732 << " data items is over threshold. Will relax CPU every " << cpu_relax_iterations_parse_
733 << " iterations";
734 }
735 }
736 if (changes > 0 || type == cache::UpdateType::kFull) {
737 // Set current cache
738 pg_cache::detail::OnWritesDone(*data_cache);
739 stats_scope.Finish(data_cache->size());
740 this->Set(std::move(data_cache));
741 } else {
742 stats_scope.FinishNoChanges();
743 }
744}
745
746template <typename PostgreCachePolicy>
747bool PostgreCache<PostgreCachePolicy>::MayReturnNull() const {
748 return pg_cache::detail::MayReturnNull<PolicyType>();
749}
750
751template <typename PostgreCachePolicy>
752void PostgreCache<PostgreCachePolicy>::CacheResults(
753 storages::postgres::ResultSet res,
754 CachedData& data_cache,
755 cache::UpdateStatisticsScope& stats_scope,
756 tracing::ScopeTime& scope
757) {
758 auto values = res.AsSetOf<RawValueType>(pg_cache::detail::GetTag<PostgreCachePolicy>());
759 utils::CpuRelax relax{cpu_relax_iterations_parse_, &scope};
760 for (auto p = values.begin(); p != values.end(); ++p) {
761 relax.Relax();
762 try {
763 using pg_cache::detail::CacheInsertOrAssign;
764 CacheInsertOrAssign(
765 *data_cache,
766 pg_cache::detail::ExtractValue<PostgreCachePolicy>(*p),
767 PostgreCachePolicy::kKeyMember
768 );
769 } catch (const std::exception& e) {
771 LOG_ERROR()
772 << "Error parsing data row in cache '" << kName << "' to '" << compiler::GetTypeName<ValueType>()
773 << "': " << e.what();
774 }
775 }
776}
777
778template <typename PostgreCachePolicy>
779typename PostgreCache<PostgreCachePolicy>::CachedData PostgreCache<
780 PostgreCachePolicy>::GetDataSnapshot(cache::UpdateType type, tracing::ScopeTime& scope) {
782 auto data = this->Get();
783 if (data) {
784 return pg_cache::detail::CopyContainer(*data, cpu_relax_iterations_copy_, scope);
785 }
786 }
787 return std::make_unique<DataType>();
788}
789
790namespace impl {
791
792std::string GetPostgreCacheSchema();
793
794} // namespace impl
795
796template <typename PostgreCachePolicy>
797yaml_config::Schema PostgreCache<PostgreCachePolicy>::GetStaticConfigSchema() {
798 using ParentType = typename pg_cache::detail::PolicyChecker<PostgreCachePolicy>::BaseType;
799 return yaml_config::MergeSchemas<ParentType>(impl::GetPostgreCacheSchema());
800}
801
802} // namespace components
803
804namespace utils::impl::projected_set {
805
806template <typename Set, typename Value, typename KeyMember>
807void CacheInsertOrAssign(Set& set, Value&& value, const KeyMember& /*key_member*/) {
808 DoInsert(set, std::forward<Value>(value));
809}
810
811} // namespace utils::impl::projected_set
812
813USERVER_NAMESPACE_END