userver: userver/cache/base_postgres_cache.hpp Source File
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages Concepts
base_postgres_cache.hpp
Go to the documentation of this file.
1#pragma once
2
3/// @file userver/cache/base_postgres_cache.hpp
4/// @brief @copybrief components::PostgreCache
5
6#include <userver/cache/base_postgres_cache_fwd.hpp>
7
8#include <chrono>
9#include <map>
10#include <string_view>
11#include <type_traits>
12#include <unordered_map>
13
14#include <fmt/format.h>
15
16#include <userver/cache/cache_statistics.hpp>
17#include <userver/cache/caching_component_base.hpp>
18#include <userver/components/component_config.hpp>
19#include <userver/components/component_context.hpp>
20
21#include <userver/storages/postgres/cluster.hpp>
22#include <userver/storages/postgres/component.hpp>
23#include <userver/storages/postgres/io/chrono.hpp>
24
25#include <userver/compiler/demangle.hpp>
26#include <userver/logging/log.hpp>
27#include <userver/tracing/span.hpp>
28#include <userver/utils/assert.hpp>
29#include <userver/utils/cpu_relax.hpp>
30#include <userver/utils/meta.hpp>
31#include <userver/utils/void_t.hpp>
32#include <userver/yaml_config/merge_schemas.hpp>
33
34USERVER_NAMESPACE_BEGIN
35
36namespace components {
37
38// clang-format off
39
40/// @page pg_cache Caching Component for PostgreSQL
41///
42/// A typical components::PostgreCache usage consists of trait definition:
43///
44/// @snippet postgresql/src/cache/postgres_cache_test.cpp Pg Cache Policy Trivial
45///
46/// and registration of the component in components::ComponentList:
47///
48/// @snippet postgresql/src/cache/postgres_cache_test.cpp Pg Cache Trivial Usage
49///
50/// See @ref scripts/docs/en/userver/caches.md for introduction into caches.
51///
52///
53/// @section pg_cc_configuration Configuration
54///
55/// components::PostgreCache static configuration file should have a PostgreSQL
56/// component name specified in `pgcomponent` configuration parameter.
57///
58/// Optionally the operation timeouts for cache loading can be specified.
59///
60/// For avoiding "memory leaks", see the respective section
61/// in @ref components::CachingComponentBase.
62///
63/// Name | Description | Default value
64/// ---- | ----------- | -------------
65/// full-update-op-timeout | timeout for a full update | 1m
66/// incremental-update-op-timeout | timeout for an incremental update | 1s
67/// update-correction | incremental update window adjustment | - (0 for caches with defined GetLastKnownUpdated)
68/// chunk-size | number of rows to request from PostgreSQL via portals, 0 to fetch all rows in one request without portals | 1000
69///
70/// @section pg_cc_cache_policy Cache policy
71///
72/// Cache policy is the template argument of components::PostgreCache component.
73/// Please see the following code snippet for documentation.
74///
75/// @snippet cache/postgres_cache_test.cpp Pg Cache Policy Example
76///
77/// The query can be a std::string. But due to non-guaranteed order of static
78/// data members initialization, std::string should be returned from a static
79/// member function, please see the following code snippet.
80///
81/// @snippet cache/postgres_cache_test.cpp Pg Cache Policy GetQuery Example
82///
83/// Policy may have static function GetLastKnownUpdated. It should be used
84/// when new entries from database are taken via revision, identifier, or
85/// anything else, but not timestamp of the last update.
86/// If this function is supplied, new entries are taken from db with condition
87/// 'WHERE kUpdatedField > GetLastKnownUpdated(cache_container)'.
88/// Otherwise, condition is
89/// 'WHERE kUpdatedField > last_update - correction_'.
90/// See the following code snippet for an example of usage
91///
92/// @snippet cache/postgres_cache_test.cpp Pg Cache Policy Custom Updated Example
93///
94/// Cache can also store only subset of data. For example for the database that is is defined in the following way:
95///
96/// @include samples/postgres_cache_order_by/schemas/postgresql/key_value.sql
97///
98/// it is possible to create a cache that stores only the latest `value`:
99///
100/// @snippet samples/postgres_cache_order_by/main.cpp Last pg cache
101///
102/// In case one provides a custom CacheContainer within Policy, it is notified
103/// of Update completion via its public member function OnWritesDone, if any.
104/// Custom CacheContainer must provide size method and insert_or_assign method
105/// similar to std::unordered_map's one or CacheInsertOrAssign function similar
106/// to one defined in namespace utils::impl::projected_set (i.e. used for
107/// utils::ProjectedUnorderedSet).
108/// See the following code snippet for an example of usage:
109///
110/// @snippet cache/postgres_cache_test.cpp Pg Cache Policy Custom Container With Write Notification Example
111///
112/// @section pg_cc_forward_declaration Forward Declaration
113///
114/// To forward declare a cache you can forward declare a trait and
115/// include userver/cache/base_postgres_cache_fwd.hpp header. It is also useful to
116/// forward declare the cache value type.
117///
118/// @snippet cache/postgres_cache_test_fwd.hpp Pg Cache Fwd Example
119///
120/// ----------
121///
122/// @htmlonly <div class="bottom-nav"> @endhtmlonly
123/// ⇦ @ref scripts/docs/en/userver/cache_dumps.md | @ref scripts/docs/en/userver/lru_cache.md ⇨
124/// @htmlonly </div> @endhtmlonly
125
126// clang-format on
127
128namespace pg_cache::detail {
129
130template <typename T>
131using ValueType = typename T::ValueType;
132template <typename T>
133inline constexpr bool kHasValueType = meta::IsDetected<ValueType, T>;
134
135template <typename T>
136using RawValueTypeImpl = typename T::RawValueType;
137template <typename T>
138inline constexpr bool kHasRawValueType = meta::IsDetected<RawValueTypeImpl, T>;
139template <typename T>
140using RawValueType = meta::DetectedOr<ValueType<T>, RawValueTypeImpl, T>;
141
142template <typename PostgreCachePolicy>
143auto ExtractValue(RawValueType<PostgreCachePolicy>&& raw) {
144 if constexpr (kHasRawValueType<PostgreCachePolicy>) {
145 return Convert(std::move(raw), formats::parse::To<ValueType<PostgreCachePolicy>>());
146 } else {
147 return std::move(raw);
148 }
149}
150
151// Component name in policy
152template <typename T>
153using HasNameImpl = std::enable_if_t<!std::string_view{T::kName}.empty()>;
154template <typename T>
155inline constexpr bool kHasName = meta::IsDetected<HasNameImpl, T>;
156
157// Component query in policy
158template <typename T>
159using HasQueryImpl = decltype(T::kQuery);
160template <typename T>
161inline constexpr bool kHasQuery = meta::IsDetected<HasQueryImpl, T>;
162
163// Component GetQuery in policy
164template <typename T>
165using HasGetQueryImpl = decltype(T::GetQuery());
166template <typename T>
167inline constexpr bool kHasGetQuery = meta::IsDetected<HasGetQueryImpl, T>;
168
169// Component kWhere in policy
170template <typename T>
171using HasWhere = decltype(T::kWhere);
172template <typename T>
173inline constexpr bool kHasWhere = meta::IsDetected<HasWhere, T>;
174
175// Component kOrderBy in policy
176template <typename T>
177using HasOrderBy = decltype(T::kOrderBy);
178template <typename T>
179inline constexpr bool kHasOrderBy = meta::IsDetected<HasOrderBy, T>;
180
181// Update field
182template <typename T>
183using HasUpdatedField = decltype(T::kUpdatedField);
184template <typename T>
185inline constexpr bool kHasUpdatedField = meta::IsDetected<HasUpdatedField, T>;
186
187template <typename T>
188using WantIncrementalUpdates = std::enable_if_t<!std::string_view{T::kUpdatedField}.empty()>;
189template <typename T>
190inline constexpr bool kWantIncrementalUpdates = meta::IsDetected<WantIncrementalUpdates, T>;
191
192// Key member in policy
193template <typename T>
194using KeyMemberTypeImpl = std::decay_t<std::invoke_result_t<decltype(T::kKeyMember), ValueType<T>>>;
195template <typename T>
196inline constexpr bool kHasKeyMember = meta::IsDetected<KeyMemberTypeImpl, T>;
197template <typename T>
198using KeyMemberType = meta::DetectedType<KeyMemberTypeImpl, T>;
199
200// size method in custom container in policy
201template <typename T>
202using SizeMethodInvokeResultImpl = decltype(std::declval<T>().size());
203template <typename T>
204inline constexpr bool kHasSizeMethod = meta::IsDetected<SizeMethodInvokeResultImpl, T> &&
205 std::is_convertible_v<SizeMethodInvokeResultImpl<T>, std::size_t>;
206
207// insert_or_assign method in custom container in policy
208template <typename T>
209using InsertOrAssignMethodInvokeResultImpl = decltype(std::declval<typename T::CacheContainer>().insert_or_assign(
210 std::declval<KeyMemberTypeImpl<T>>(),
211 std::declval<ValueType<T>>()
212));
213template <typename T>
214inline constexpr bool kHasInsertOrAssignMethod = meta::IsDetected<InsertOrAssignMethodInvokeResultImpl, T>;
215
216// CacheInsertOrAssign function for custom container in policy
217template <typename T>
218using CacheInsertOrAssignFunctionInvokeResultImpl = decltype(CacheInsertOrAssign(
219 std::declval<typename T::CacheContainer&>(),
220 std::declval<ValueType<T>>(),
221 std::declval<KeyMemberTypeImpl<T>>()
222));
223template <typename T>
224inline constexpr bool kHasCacheInsertOrAssignFunction =
225 meta::IsDetected<CacheInsertOrAssignFunctionInvokeResultImpl, T>;
226
227// Data container for cache
228template <typename T, typename = USERVER_NAMESPACE::utils::void_t<>>
229struct DataCacheContainer {
230 static_assert(
231 meta::kIsStdHashable<KeyMemberType<T>>,
232 "With default CacheContainer, key type must be std::hash-able"
233 );
234
235 using type = std::unordered_map<KeyMemberType<T>, ValueType<T>>;
236};
237
238template <typename T>
239struct DataCacheContainer<T, USERVER_NAMESPACE::utils::void_t<typename T::CacheContainer>> {
240 static_assert(kHasSizeMethod<typename T::CacheContainer>, "Custom CacheContainer must provide `size` method");
241 static_assert(
242 kHasInsertOrAssignMethod<T> || kHasCacheInsertOrAssignFunction<T>,
243 "Custom CacheContainer must provide `insert_or_assign` method similar to std::unordered_map's "
244 "one or CacheInsertOrAssign function"
245 );
246
247 using type = typename T::CacheContainer;
248};
249
250template <typename T>
251using DataCacheContainerType = typename DataCacheContainer<T>::type;
252
253// We have to whitelist container types, for which we perform by-element
254// copying, because it's not correct for certain custom containers.
255template <typename T>
256inline constexpr bool kIsContainerCopiedByElement =
257 meta::kIsInstantiationOf<std::unordered_map, T> || meta::kIsInstantiationOf<std::map, T>;
258
259template <typename T>
260std::unique_ptr<T>
261CopyContainer(const T& container, [[maybe_unused]] std::size_t cpu_relax_iterations, tracing::ScopeTime& scope) {
262 if constexpr (kIsContainerCopiedByElement<T>) {
263 auto copy = std::make_unique<T>();
264 if constexpr (meta::kIsReservable<T>) {
265 copy->reserve(container.size());
266 }
267
268 utils::CpuRelax relax{cpu_relax_iterations, &scope};
269 for (const auto& kv : container) {
270 relax.Relax();
271 copy->insert(kv);
272 }
273 return copy;
274 } else {
275 return std::make_unique<T>(container);
276 }
277}
278
279template <typename Container, typename Value, typename KeyMember, typename... Args>
280void CacheInsertOrAssign(Container& container, Value&& value, const KeyMember& key_member, Args&&... /*args*/) {
281 // Args are only used to de-prioritize this default overload.
282 static_assert(sizeof...(Args) == 0);
283 // Copy 'key' to avoid aliasing issues in 'insert_or_assign'.
284 auto key = std::invoke(key_member, value);
285 container.insert_or_assign(std::move(key), std::forward<Value>(value));
286}
287
288template <typename T>
289using HasOnWritesDoneImpl = decltype(std::declval<T&>().OnWritesDone());
290
291template <typename T>
292void OnWritesDone(T& container) {
293 if constexpr (meta::IsDetected<HasOnWritesDoneImpl, T>) {
294 container.OnWritesDone();
295 }
296}
297
298template <typename T>
299using HasCustomUpdatedImpl = decltype(T::GetLastKnownUpdated(std::declval<DataCacheContainerType<T>>()));
300
301template <typename T>
302inline constexpr bool kHasCustomUpdated = meta::IsDetected<HasCustomUpdatedImpl, T>;
303
304template <typename T>
305using UpdatedFieldTypeImpl = typename T::UpdatedFieldType;
306template <typename T>
307inline constexpr bool kHasUpdatedFieldType = meta::IsDetected<UpdatedFieldTypeImpl, T>;
308template <typename T>
309using UpdatedFieldType = meta::DetectedOr<storages::postgres::TimePointTz, UpdatedFieldTypeImpl, T>;
310
311template <typename T>
312constexpr bool CheckUpdatedFieldType() {
313 if constexpr (kHasUpdatedFieldType<T>) {
314#if USERVER_POSTGRES_ENABLE_LEGACY_TIMESTAMP
315 static_assert(
316 std::is_same_v<typename T::UpdatedFieldType, storages::postgres::TimePointTz> ||
317 std::is_same_v<typename T::UpdatedFieldType, storages::postgres::TimePointWithoutTz> ||
318 std::is_same_v<typename T::UpdatedFieldType, storages::postgres::TimePoint> || kHasCustomUpdated<T>,
319 "Invalid UpdatedFieldType, must be either TimePointTz or "
320 "TimePointWithoutTz"
321 "or (legacy) system_clock::time_point"
322 );
323#else
324 static_assert(
325 std::is_same_v<typename T::UpdatedFieldType, storages::postgres::TimePointTz> ||
326 std::is_same_v<typename T::UpdatedFieldType, storages::postgres::TimePointWithoutTz> ||
327 kHasCustomUpdated<T>,
328 "Invalid UpdatedFieldType, must be either TimePointTz or "
329 "TimePointWithoutTz"
330 );
331#endif
332 } else {
333 static_assert(
334 !kWantIncrementalUpdates<T>,
335 "UpdatedFieldType must be explicitly specified when using "
336 "incremental updates"
337 );
338 }
339 return true;
340}
341
342// Cluster host type policy
343template <typename T>
344using HasClusterHostTypeImpl = decltype(T::kClusterHostType);
345
346template <typename T>
347constexpr storages::postgres::ClusterHostTypeFlags ClusterHostType() {
348 if constexpr (meta::IsDetected<HasClusterHostTypeImpl, T>) {
349 return T::kClusterHostType;
350 } else {
352 }
353}
354
355// May return null policy
356template <typename T>
357using HasMayReturnNull = decltype(T::kMayReturnNull);
358
359template <typename T>
360constexpr bool MayReturnNull() {
361 if constexpr (meta::IsDetected<HasMayReturnNull, T>) {
362 return T::kMayReturnNull;
363 } else {
364 return false;
365 }
366}
367
368template <typename PostgreCachePolicy>
369struct PolicyChecker {
370 // Static assertions for cache traits
371 static_assert(kHasName<PostgreCachePolicy>, "The PosgreSQL cache policy must contain a static member `kName`");
372 static_assert(kHasValueType<PostgreCachePolicy>, "The PosgreSQL cache policy must define a type alias `ValueType`");
373 static_assert(
374 kHasKeyMember<PostgreCachePolicy>,
375 "The PostgreSQL cache policy must contain a static member `kKeyMember` "
376 "with a pointer to a data or a function member with the object's key"
377 );
378 static_assert(
379 kHasQuery<PostgreCachePolicy> || kHasGetQuery<PostgreCachePolicy>,
380 "The PosgreSQL cache policy must contain a static data member "
381 "`kQuery` with a select statement or a static member function "
382 "`GetQuery` returning the query"
383 );
384 static_assert(
385 !(kHasQuery<PostgreCachePolicy> && kHasGetQuery<PostgreCachePolicy>),
386 "The PosgreSQL cache policy must define `kQuery` or "
387 "`GetQuery`, not both"
388 );
389 static_assert(
390 kHasUpdatedField<PostgreCachePolicy>,
391 "The PosgreSQL cache policy must contain a static member "
392 "`kUpdatedField`. If you don't want to use incremental updates, "
393 "please set its value to `nullptr`"
394 );
395 static_assert(CheckUpdatedFieldType<PostgreCachePolicy>());
396
397 static_assert(
398 ClusterHostType<PostgreCachePolicy>() & storages::postgres::kClusterHostRolesMask,
399 "Cluster host role must be specified for caching component, "
400 "please be more specific"
401 );
402
403 static storages::postgres::Query GetQuery() {
404 if constexpr (kHasGetQuery<PostgreCachePolicy>) {
405 return PostgreCachePolicy::GetQuery();
406 } else {
407 return PostgreCachePolicy::kQuery;
408 }
409 }
410
411 using BaseType = CachingComponentBase<DataCacheContainerType<PostgreCachePolicy>>;
412};
413
414inline constexpr std::chrono::minutes kDefaultFullUpdateTimeout{1};
415inline constexpr std::chrono::seconds kDefaultIncrementalUpdateTimeout{1};
416inline constexpr std::chrono::milliseconds kStatementTimeoutOff{0};
417inline constexpr std::chrono::milliseconds kCpuRelaxThreshold{10};
418inline constexpr std::chrono::milliseconds kCpuRelaxInterval{2};
419
420inline constexpr std::string_view kCopyStage = "copy_data";
421inline constexpr std::string_view kFetchStage = "fetch";
422inline constexpr std::string_view kParseStage = "parse";
423
424inline constexpr std::size_t kDefaultChunkSize = 1000;
425} // namespace pg_cache::detail
426
427/// @ingroup userver_components
428///
429/// @brief Caching component for PostgreSQL. See @ref pg_cache.
430///
431/// @see @ref pg_cache, @ref scripts/docs/en/userver/caches.md
432template <typename PostgreCachePolicy>
433class PostgreCache final : public pg_cache::detail::PolicyChecker<PostgreCachePolicy>::BaseType {
434public:
435 // Type aliases
436 using PolicyType = PostgreCachePolicy;
437 using ValueType = pg_cache::detail::ValueType<PolicyType>;
438 using RawValueType = pg_cache::detail::RawValueType<PolicyType>;
439 using DataType = pg_cache::detail::DataCacheContainerType<PolicyType>;
440 using PolicyCheckerType = pg_cache::detail::PolicyChecker<PostgreCachePolicy>;
441 using UpdatedFieldType = pg_cache::detail::UpdatedFieldType<PostgreCachePolicy>;
442 using BaseType = typename PolicyCheckerType::BaseType;
443
444 // Calculated constants
445 constexpr static bool kIncrementalUpdates = pg_cache::detail::kWantIncrementalUpdates<PolicyType>;
446 constexpr static auto kClusterHostTypeFlags = pg_cache::detail::ClusterHostType<PolicyType>();
447 constexpr static auto kName = PolicyType::kName;
448
449 PostgreCache(const ComponentConfig&, const ComponentContext&);
450 ~PostgreCache() override;
451
452 static yaml_config::Schema GetStaticConfigSchema();
453
454private:
455 using CachedData = std::unique_ptr<DataType>;
456
457 UpdatedFieldType GetLastUpdated(std::chrono::system_clock::time_point last_update, const DataType& cache) const;
458
459 void Update(
460 cache::UpdateType type,
461 const std::chrono::system_clock::time_point& last_update,
462 const std::chrono::system_clock::time_point& now,
463 cache::UpdateStatisticsScope& stats_scope
464 ) override;
465
466 bool MayReturnNull() const override;
467
468 CachedData GetDataSnapshot(cache::UpdateType type, tracing::ScopeTime& scope);
469 void CacheResults(
470 storages::postgres::ResultSet res,
471 CachedData& data_cache,
472 cache::UpdateStatisticsScope& stats_scope,
473 tracing::ScopeTime& scope
474 );
475
476 static storages::postgres::Query GetAllQuery();
477 static storages::postgres::Query GetDeltaQuery();
478 static std::string GetWhereClause();
479 static std::string GetDeltaWhereClause();
480 static std::string GetOrderByClause();
481
482 std::chrono::milliseconds ParseCorrection(const ComponentConfig& config);
483
484 std::vector<storages::postgres::ClusterPtr> clusters_;
485
486 const std::chrono::system_clock::duration correction_;
487 const std::chrono::milliseconds full_update_timeout_;
488 const std::chrono::milliseconds incremental_update_timeout_;
489 const std::size_t chunk_size_;
490 std::size_t cpu_relax_iterations_parse_{0};
491 std::size_t cpu_relax_iterations_copy_{0};
492};
493
494template <typename PostgreCachePolicy>
495inline constexpr bool kHasValidate<PostgreCache<PostgreCachePolicy>> = true;
496
497template <typename PostgreCachePolicy>
498PostgreCache<PostgreCachePolicy>::PostgreCache(const ComponentConfig& config, const ComponentContext& context)
499 : BaseType{config, context},
500 correction_{ParseCorrection(config)},
501 full_update_timeout_{
502 config["full-update-op-timeout"].As<std::chrono::milliseconds>(pg_cache::detail::kDefaultFullUpdateTimeout)},
503 incremental_update_timeout_{config["incremental-update-op-timeout"].As<std::chrono::milliseconds>(
504 pg_cache::detail::kDefaultIncrementalUpdateTimeout
505 )},
506 chunk_size_{config["chunk-size"].As<size_t>(pg_cache::detail::kDefaultChunkSize)} {
508 !chunk_size_ || storages::postgres::Portal::IsSupportedByDriver(),
509 "Either set 'chunk-size' to 0, or enable PostgreSQL portals by building "
510 "the framework with CMake option USERVER_FEATURE_PATCH_LIBPQ set to ON."
511 );
512
513 if (this->GetAllowedUpdateTypes() == cache::AllowedUpdateTypes::kFullAndIncremental && !kIncrementalUpdates) {
514 throw std::logic_error(
515 "Incremental update support is requested in config but no update field "
516 "name is specified in traits of '" +
517 config.Name() + "' cache"
518 );
519 }
520 if (correction_.count() < 0) {
521 throw std::logic_error(
522 "Refusing to set forward (negative) update correction requested in "
523 "config for '" +
524 config.Name() + "' cache"
525 );
526 }
527
528 const auto pg_alias = config["pgcomponent"].As<std::string>("");
529 if (pg_alias.empty()) {
530 throw storages::postgres::InvalidConfig{"No `pgcomponent` entry in configuration"};
531 }
532 auto& pg_cluster_comp = context.FindComponent<components::Postgres>(pg_alias);
533 const auto shard_count = pg_cluster_comp.GetShardCount();
534 clusters_.resize(shard_count);
535 for (size_t i = 0; i < shard_count; ++i) {
536 clusters_[i] = pg_cluster_comp.GetClusterForShard(i);
537 }
538
539 LOG_INFO() << "Cache " << kName << " full update query `" << GetAllQuery().GetStatementView()
540 << "` incremental update query `" << GetDeltaQuery().GetStatementView() << "`";
541
542 this->StartPeriodicUpdates();
543}
544
545template <typename PostgreCachePolicy>
546PostgreCache<PostgreCachePolicy>::~PostgreCache() {
547 this->StopPeriodicUpdates();
548}
549
550template <typename PostgreCachePolicy>
551std::string PostgreCache<PostgreCachePolicy>::GetWhereClause() {
552 if constexpr (pg_cache::detail::kHasWhere<PostgreCachePolicy>) {
553 return fmt::format(FMT_COMPILE("where {}"), PostgreCachePolicy::kWhere);
554 } else {
555 return "";
556 }
557}
558
559template <typename PostgreCachePolicy>
560std::string PostgreCache<PostgreCachePolicy>::GetDeltaWhereClause() {
561 if constexpr (pg_cache::detail::kHasWhere<PostgreCachePolicy>) {
562 return fmt::format(
563 FMT_COMPILE("where ({}) and {} >= $1"), PostgreCachePolicy::kWhere, PostgreCachePolicy::kUpdatedField
564 );
565 } else {
566 return fmt::format(FMT_COMPILE("where {} >= $1"), PostgreCachePolicy::kUpdatedField);
567 }
568}
569
570template <typename PostgreCachePolicy>
571std::string PostgreCache<PostgreCachePolicy>::GetOrderByClause() {
572 if constexpr (pg_cache::detail::kHasOrderBy<PostgreCachePolicy>) {
573 return fmt::format(FMT_COMPILE("order by {}"), PostgreCachePolicy::kOrderBy);
574 } else {
575 return "";
576 }
577}
578
579template <typename PostgreCachePolicy>
580storages::postgres::Query PostgreCache<PostgreCachePolicy>::GetAllQuery() {
581 const storages::postgres::Query query = PolicyCheckerType::GetQuery();
582 return fmt::format("{} {} {}", query.GetStatementView(), GetWhereClause(), GetOrderByClause());
583}
584
585template <typename PostgreCachePolicy>
586storages::postgres::Query PostgreCache<PostgreCachePolicy>::GetDeltaQuery() {
587 if constexpr (kIncrementalUpdates) {
588 const storages::postgres::Query query = PolicyCheckerType::GetQuery();
589 return storages::postgres::Query{
590 fmt::format("{} {} {}", query.GetStatementView(), GetDeltaWhereClause(), GetOrderByClause()),
592 };
593 } else {
594 return GetAllQuery();
595 }
596}
597
598template <typename PostgreCachePolicy>
599std::chrono::milliseconds PostgreCache<PostgreCachePolicy>::ParseCorrection(const ComponentConfig& config) {
600 static constexpr std::string_view kUpdateCorrection = "update-correction";
601 if (pg_cache::detail::kHasCustomUpdated<PostgreCachePolicy> ||
602 this->GetAllowedUpdateTypes() == cache::AllowedUpdateTypes::kOnlyFull) {
603 return config[kUpdateCorrection].As<std::chrono::milliseconds>(0);
604 } else {
605 return config[kUpdateCorrection].As<std::chrono::milliseconds>();
606 }
607}
608
609template <typename PostgreCachePolicy>
610typename PostgreCache<PostgreCachePolicy>::UpdatedFieldType PostgreCache<PostgreCachePolicy>::GetLastUpdated(
611 [[maybe_unused]] std::chrono::system_clock::time_point last_update,
612 const DataType& cache
613) const {
614 if constexpr (pg_cache::detail::kHasCustomUpdated<PostgreCachePolicy>) {
615 return PostgreCachePolicy::GetLastKnownUpdated(cache);
616 } else {
617 return UpdatedFieldType{last_update - correction_};
618 }
619}
620
621template <typename PostgreCachePolicy>
622void PostgreCache<PostgreCachePolicy>::Update(
623 cache::UpdateType type,
624 const std::chrono::system_clock::time_point& last_update,
625 const std::chrono::system_clock::time_point& /*now*/,
626 cache::UpdateStatisticsScope& stats_scope
627) {
628 namespace pg = storages::postgres;
629 if constexpr (!kIncrementalUpdates) {
631 }
632 const auto query = (type == cache::UpdateType::kFull) ? GetAllQuery() : GetDeltaQuery();
633 const std::chrono::milliseconds timeout =
634 (type == cache::UpdateType::kFull) ? full_update_timeout_ : incremental_update_timeout_;
635
636 // COPY current cached data
637 auto scope = tracing::Span::CurrentSpan().CreateScopeTime(std::string{pg_cache::detail::kCopyStage});
638 auto data_cache = GetDataSnapshot(type, scope);
639 [[maybe_unused]] const auto old_size = data_cache->size();
640
641 scope.Reset(std::string{pg_cache::detail::kFetchStage});
642
643 size_t changes = 0;
644 // Iterate clusters
645 for (auto& cluster : clusters_) {
646 if (chunk_size_ > 0) {
647 auto trx = cluster->Begin(
648 kClusterHostTypeFlags,
650 pg::CommandControl{timeout, pg_cache::detail::kStatementTimeoutOff}
651 );
652 auto portal = trx.MakePortal(query, GetLastUpdated(last_update, *data_cache));
653 while (portal) {
654 scope.Reset(std::string{pg_cache::detail::kFetchStage});
655 auto res = portal.Fetch(chunk_size_);
656 stats_scope.IncreaseDocumentsReadCount(res.Size());
657
658 scope.Reset(std::string{pg_cache::detail::kParseStage});
659 CacheResults(res, data_cache, stats_scope, scope);
660 changes += res.Size();
661 }
662 trx.Commit();
663 } else {
664 const bool has_parameter = query.GetStatementView().find('$') != std::string::npos;
665 auto res = has_parameter ? cluster->Execute(
666 kClusterHostTypeFlags,
667 pg::CommandControl{timeout, pg_cache::detail::kStatementTimeoutOff},
668 query,
669 GetLastUpdated(last_update, *data_cache)
670 )
671 : cluster->Execute(
672 kClusterHostTypeFlags,
673 pg::CommandControl{timeout, pg_cache::detail::kStatementTimeoutOff},
674 query
675 );
676 stats_scope.IncreaseDocumentsReadCount(res.Size());
677
678 scope.Reset(std::string{pg_cache::detail::kParseStage});
679 CacheResults(res, data_cache, stats_scope, scope);
680 changes += res.Size();
681 }
682 }
683
684 scope.Reset();
685
686 if constexpr (pg_cache::detail::kIsContainerCopiedByElement<DataType>) {
687 if (old_size > 0) {
688 const auto elapsed_copy = scope.ElapsedTotal(std::string{pg_cache::detail::kCopyStage});
689 if (elapsed_copy > pg_cache::detail::kCpuRelaxThreshold) {
690 cpu_relax_iterations_copy_ = static_cast<std::size_t>(
691 static_cast<double>(old_size) / (elapsed_copy / pg_cache::detail::kCpuRelaxInterval)
692 );
693 LOG_TRACE() << "Elapsed time for copying " << kName << " " << elapsed_copy.count() << " for " << changes
694 << " data items is over threshold. Will relax CPU every " << cpu_relax_iterations_parse_
695 << " iterations";
696 }
697 }
698 }
699
700 if (changes > 0) {
701 const auto elapsed_parse = scope.ElapsedTotal(std::string{pg_cache::detail::kParseStage});
702 if (elapsed_parse > pg_cache::detail::kCpuRelaxThreshold) {
703 cpu_relax_iterations_parse_ = static_cast<std::size_t>(
704 static_cast<double>(changes) / (elapsed_parse / pg_cache::detail::kCpuRelaxInterval)
705 );
706 LOG_TRACE() << "Elapsed time for parsing " << kName << " " << elapsed_parse.count() << " for " << changes
707 << " data items is over threshold. Will relax CPU every " << cpu_relax_iterations_parse_
708 << " iterations";
709 }
710 }
711 if (changes > 0 || type == cache::UpdateType::kFull) {
712 // Set current cache
713 pg_cache::detail::OnWritesDone(*data_cache);
714 stats_scope.Finish(data_cache->size());
715 this->Set(std::move(data_cache));
716 } else {
717 stats_scope.FinishNoChanges();
718 }
719}
720
721template <typename PostgreCachePolicy>
722bool PostgreCache<PostgreCachePolicy>::MayReturnNull() const {
723 return pg_cache::detail::MayReturnNull<PolicyType>();
724}
725
726template <typename PostgreCachePolicy>
727void PostgreCache<PostgreCachePolicy>::CacheResults(
728 storages::postgres::ResultSet res,
729 CachedData& data_cache,
730 cache::UpdateStatisticsScope& stats_scope,
731 tracing::ScopeTime& scope
732) {
733 auto values = res.AsSetOf<RawValueType>(storages::postgres::kRowTag);
734 utils::CpuRelax relax{cpu_relax_iterations_parse_, &scope};
735 for (auto p = values.begin(); p != values.end(); ++p) {
736 relax.Relax();
737 try {
738 using pg_cache::detail::CacheInsertOrAssign;
739 CacheInsertOrAssign(
740 *data_cache, pg_cache::detail::ExtractValue<PostgreCachePolicy>(*p), PostgreCachePolicy::kKeyMember
741 );
742 } catch (const std::exception& e) {
744 LOG_ERROR() << "Error parsing data row in cache '" << kName << "' to '"
745 << compiler::GetTypeName<ValueType>() << "': " << e.what();
746 }
747 }
748}
749
750template <typename PostgreCachePolicy>
751typename PostgreCache<PostgreCachePolicy>::CachedData
752PostgreCache<PostgreCachePolicy>::GetDataSnapshot(cache::UpdateType type, tracing::ScopeTime& scope) {
754 auto data = this->Get();
755 if (data) {
756 return pg_cache::detail::CopyContainer(*data, cpu_relax_iterations_copy_, scope);
757 }
758 }
759 return std::make_unique<DataType>();
760}
761
762namespace impl {
763
764std::string GetPostgreCacheSchema();
765
766} // namespace impl
767
768template <typename PostgreCachePolicy>
769yaml_config::Schema PostgreCache<PostgreCachePolicy>::GetStaticConfigSchema() {
770 using ParentType = typename pg_cache::detail::PolicyChecker<PostgreCachePolicy>::BaseType;
771 return yaml_config::MergeSchemas<ParentType>(impl::GetPostgreCacheSchema());
772}
773
774} // namespace components
775
776namespace utils::impl::projected_set {
777
778template <typename Set, typename Value, typename KeyMember>
779void CacheInsertOrAssign(Set& set, Value&& value, const KeyMember& /*key_member*/) {
780 DoInsert(set, std::forward<Value>(value));
781}
782
783} // namespace utils::impl::projected_set
784
785USERVER_NAMESPACE_END