userver: userver/cache/base_postgres_cache.hpp Source File
Loading...
Searching...
No Matches
base_postgres_cache.hpp
Go to the documentation of this file.
1#pragma once
2
3/// @file userver/cache/base_postgres_cache.hpp
4/// @brief @copybrief components::PostgreCache
5
6#include <userver/cache/base_postgres_cache_fwd.hpp>
7
8#include <chrono>
9#include <map>
10#include <string_view>
11#include <type_traits>
12#include <unordered_map>
13
14#include <fmt/format.h>
15
16#include <userver/cache/cache_statistics.hpp>
17#include <userver/cache/caching_component_base.hpp>
18#include <userver/components/component_config.hpp>
19#include <userver/components/component_context.hpp>
20
21#include <userver/storages/postgres/cluster.hpp>
22#include <userver/storages/postgres/component.hpp>
23#include <userver/storages/postgres/io/chrono.hpp>
24
25#include <userver/compiler/demangle.hpp>
26#include <userver/logging/log.hpp>
27#include <userver/tracing/span.hpp>
28#include <userver/utils/assert.hpp>
29#include <userver/utils/cpu_relax.hpp>
30#include <userver/utils/meta.hpp>
31#include <userver/utils/void_t.hpp>
32#include <userver/yaml_config/merge_schemas.hpp>
33
34USERVER_NAMESPACE_BEGIN
35
36namespace components {
37
38// clang-format off
39
40/// @page pg_cache Caching Component for PostgreSQL
41///
42/// A typical components::PostgreCache usage consists of trait definition:
43///
44/// @snippet cache/postgres_cache_test.cpp Pg Cache Policy Trivial
45///
46/// and registration of the component in components::ComponentList:
47///
48/// @snippet cache/postgres_cache_test.cpp Pg Cache Trivial Usage
49///
50/// See @ref scripts/docs/en/userver/caches.md for introduction into caches.
51///
52///
53/// @section pg_cc_configuration Configuration
54///
55/// components::PostgreCache static configuration file should have a PostgreSQL
56/// component name specified in `pgcomponent` configuration parameter.
57///
58/// Optionally the operation timeouts for cache loading can be specified.
59///
60/// ### Avoiding memory leaks
61/// components::CachingComponentBase
62///
63/// Name | Description | Default value
64/// ---- | ----------- | -------------
65/// full-update-op-timeout | timeout for a full update | 1m
66/// incremental-update-op-timeout | timeout for an incremental update | 1s
67/// update-correction | incremental update window adjustment | - (0 for caches with defined GetLastKnownUpdated)
68/// chunk-size | number of rows to request from PostgreSQL via portals, 0 to fetch all rows in one request without portals | 1000
69///
70/// @section pg_cc_cache_policy Cache policy
71///
72/// Cache policy is the template argument of components::PostgreCache component.
73/// Please see the following code snippet for documentation.
74///
75/// @snippet cache/postgres_cache_test.cpp Pg Cache Policy Example
76///
77/// The query can be a std::string. But due to non-guaranteed order of static
78/// data members initialization, std::string should be returned from a static
79/// member function, please see the following code snippet.
80///
81/// @snippet cache/postgres_cache_test.cpp Pg Cache Policy GetQuery Example
82///
83/// Policy may have static function GetLastKnownUpdated. It should be used
84/// when new entries from database are taken via revision, identifier, or
85/// anything else, but not timestamp of the last update.
86/// If this function is supplied, new entries are taken from db with condition
87/// 'WHERE kUpdatedField > GetLastKnownUpdated(cache_container)'.
88/// Otherwise, condition is
89/// 'WHERE kUpdatedField > last_update - correction_'.
90/// See the following code snippet for an example of usage
91///
92/// @snippet cache/postgres_cache_test.cpp Pg Cache Policy Custom Updated Example
93///
94/// In case one provides a custom CacheContainer within Policy, it is notified
95/// of Update completion via its public member function OnWritesDone, if any.
96/// See the following code snippet for an example of usage:
97///
98/// @snippet cache/postgres_cache_test.cpp Pg Cache Policy Custom Container With Write Notification Example
99///
100/// @section pg_cc_forward_declaration Forward Declaration
101///
102/// To forward declare a cache you can forward declare a trait and
103/// include userver/cache/base_postgres_cache_fwd.hpp header. It is also useful to
104/// forward declare the cache value type.
105///
106/// @snippet cache/postgres_cache_test_fwd.hpp Pg Cache Fwd Example
107///
108/// ----------
109///
110/// @htmlonly <div class="bottom-nav"> @endhtmlonly
111/// ⇦ @ref scripts/docs/en/userver/cache_dumps.md | @ref scripts/docs/en/userver/lru_cache.md ⇨
112/// @htmlonly </div> @endhtmlonly
113
114// clang-format on
115
116namespace pg_cache::detail {
117
118template <typename T>
119using ValueType = typename T::ValueType;
120template <typename T>
121inline constexpr bool kHasValueType = meta::kIsDetected<ValueType, T>;
122
123template <typename T>
124using RawValueTypeImpl = typename T::RawValueType;
125template <typename T>
126inline constexpr bool kHasRawValueType = meta::kIsDetected<RawValueTypeImpl, T>;
127template <typename T>
128using RawValueType = meta::DetectedOr<ValueType<T>, RawValueTypeImpl, T>;
129
130template <typename PostgreCachePolicy>
131auto ExtractValue(RawValueType<PostgreCachePolicy>&& raw) {
132 if constexpr (kHasRawValueType<PostgreCachePolicy>) {
133 return Convert(std::move(raw), formats::parse::To<ValueType<PostgreCachePolicy>>());
134 } else {
135 return std::move(raw);
136 }
137}
138
139// Component name in policy
140template <typename T>
141using HasNameImpl = std::enable_if_t<!std::string_view{T::kName}.empty()>;
142template <typename T>
143inline constexpr bool kHasName = meta::kIsDetected<HasNameImpl, T>;
144
145// Component query in policy
146template <typename T>
147using HasQueryImpl = decltype(T::kQuery);
148template <typename T>
149inline constexpr bool kHasQuery = meta::kIsDetected<HasQueryImpl, T>;
150
151// Component GetQuery in policy
152template <typename T>
153using HasGetQueryImpl = decltype(T::GetQuery());
154template <typename T>
155inline constexpr bool kHasGetQuery = meta::kIsDetected<HasGetQueryImpl, T>;
156
157// Component kWhere in policy
158template <typename T>
159using HasWhere = decltype(T::kWhere);
160template <typename T>
161inline constexpr bool kHasWhere = meta::kIsDetected<HasWhere, T>;
162
163// Update field
164template <typename T>
165using HasUpdatedField = decltype(T::kUpdatedField);
166template <typename T>
167inline constexpr bool kHasUpdatedField = meta::kIsDetected<HasUpdatedField, T>;
168
169template <typename T>
170using WantIncrementalUpdates = std::enable_if_t<!std::string_view{T::kUpdatedField}.empty()>;
171template <typename T>
172inline constexpr bool kWantIncrementalUpdates = meta::kIsDetected<WantIncrementalUpdates, T>;
173
174// Key member in policy
175template <typename T>
176using KeyMemberTypeImpl = std::decay_t<std::invoke_result_t<decltype(T::kKeyMember), ValueType<T>>>;
177template <typename T>
178inline constexpr bool kHasKeyMember = meta::kIsDetected<KeyMemberTypeImpl, T>;
179template <typename T>
180using KeyMemberType = meta::DetectedType<KeyMemberTypeImpl, T>;
181
182// Data container for cache
183template <typename T, typename = USERVER_NAMESPACE::utils::void_t<>>
184struct DataCacheContainer {
185 static_assert(
186 meta::kIsStdHashable<KeyMemberType<T>>,
187 "With default CacheContainer, key type must be std::hash-able"
188 );
189
190 using type = std::unordered_map<KeyMemberType<T>, ValueType<T>>;
191};
192
193template <typename T>
194struct DataCacheContainer<T, USERVER_NAMESPACE::utils::void_t<typename T::CacheContainer>> {
195 using type = typename T::CacheContainer;
196};
197
198template <typename T>
199using DataCacheContainerType = typename DataCacheContainer<T>::type;
200
201// We have to whitelist container types, for which we perform by-element
202// copying, because it's not correct for certain custom containers.
203template <typename T>
204inline constexpr bool kIsContainerCopiedByElement =
205 meta::kIsInstantiationOf<std::unordered_map, T> || meta::kIsInstantiationOf<std::map, T>;
206
207template <typename T>
208std::unique_ptr<T>
209CopyContainer(const T& container, [[maybe_unused]] std::size_t cpu_relax_iterations, tracing::ScopeTime& scope) {
210 if constexpr (kIsContainerCopiedByElement<T>) {
211 auto copy = std::make_unique<T>();
212 if constexpr (meta::kIsReservable<T>) {
213 copy->reserve(container.size());
214 }
215
216 utils::CpuRelax relax{cpu_relax_iterations, &scope};
217 for (const auto& kv : container) {
218 relax.Relax();
219 copy->insert(kv);
220 }
221 return copy;
222 } else {
223 return std::make_unique<T>(container);
224 }
225}
226
227template <typename Container, typename Value, typename KeyMember, typename... Args>
228void CacheInsertOrAssign(Container& container, Value&& value, const KeyMember& key_member, Args&&... /*args*/) {
229 // Args are only used to de-prioritize this default overload.
230 static_assert(sizeof...(Args) == 0);
231 // Copy 'key' to avoid aliasing issues in 'insert_or_assign'.
232 auto key = std::invoke(key_member, value);
233 container.insert_or_assign(std::move(key), std::forward<Value>(value));
234}
235
236template <typename T>
237using HasOnWritesDoneImpl = decltype(std::declval<T&>().OnWritesDone());
238
239template <typename T>
240void OnWritesDone(T& container) {
241 if constexpr (meta::kIsDetected<HasOnWritesDoneImpl, T>) {
242 container.OnWritesDone();
243 }
244}
245
246template <typename T>
247using HasCustomUpdatedImpl = decltype(T::GetLastKnownUpdated(std::declval<DataCacheContainerType<T>>()));
248
249template <typename T>
250inline constexpr bool kHasCustomUpdated = meta::kIsDetected<HasCustomUpdatedImpl, T>;
251
252template <typename T>
253using UpdatedFieldTypeImpl = typename T::UpdatedFieldType;
254template <typename T>
255inline constexpr bool kHasUpdatedFieldType = meta::kIsDetected<UpdatedFieldTypeImpl, T>;
256template <typename T>
257using UpdatedFieldType = meta::DetectedOr<storages::postgres::TimePointTz, UpdatedFieldTypeImpl, T>;
258
259template <typename T>
260constexpr bool CheckUpdatedFieldType() {
261 if constexpr (kHasUpdatedFieldType<T>) {
262#if USERVER_POSTGRES_ENABLE_LEGACY_TIMESTAMP
263 static_assert(
264 std::is_same_v<typename T::UpdatedFieldType, storages::postgres::TimePointTz> ||
265 std::is_same_v<typename T::UpdatedFieldType, storages::postgres::TimePointWithoutTz> ||
266 std::is_same_v<typename T::UpdatedFieldType, storages::postgres::TimePoint> || kHasCustomUpdated<T>,
267 "Invalid UpdatedFieldType, must be either TimePointTz or "
268 "TimePointWithoutTz"
269 "or (legacy) system_clock::time_point"
270 );
271#else
272 static_assert(
273 std::is_same_v<typename T::UpdatedFieldType, storages::postgres::TimePointTz> ||
274 std::is_same_v<typename T::UpdatedFieldType, storages::postgres::TimePointWithoutTz> ||
275 kHasCustomUpdated<T>,
276 "Invalid UpdatedFieldType, must be either TimePointTz or "
277 "TimePointWithoutTz"
278 );
279#endif
280 } else {
281 static_assert(
282 !kWantIncrementalUpdates<T>,
283 "UpdatedFieldType must be explicitly specified when using "
284 "incremental updates"
285 );
286 }
287 return true;
288}
289
290// Cluster host type policy
291template <typename T>
292using HasClusterHostTypeImpl = decltype(T::kClusterHostType);
293
294template <typename T>
295constexpr storages::postgres::ClusterHostTypeFlags ClusterHostType() {
296 if constexpr (meta::kIsDetected<HasClusterHostTypeImpl, T>) {
297 return T::kClusterHostType;
298 } else {
299 return storages::postgres::ClusterHostType::kSlave;
300 }
301}
302
303// May return null policy
304template <typename T>
305using HasMayReturnNull = decltype(T::kMayReturnNull);
306
307template <typename T>
308constexpr bool MayReturnNull() {
309 if constexpr (meta::kIsDetected<HasMayReturnNull, T>) {
310 return T::kMayReturnNull;
311 } else {
312 return false;
313 }
314}
315
316template <typename PostgreCachePolicy>
317struct PolicyChecker {
318 // Static assertions for cache traits
319 static_assert(kHasName<PostgreCachePolicy>, "The PosgreSQL cache policy must contain a static member `kName`");
320 static_assert(kHasValueType<PostgreCachePolicy>, "The PosgreSQL cache policy must define a type alias `ValueType`");
321 static_assert(
322 kHasKeyMember<PostgreCachePolicy>,
323 "The PostgreSQL cache policy must contain a static member `kKeyMember` "
324 "with a pointer to a data or a function member with the object's key"
325 );
326 static_assert(
327 kHasQuery<PostgreCachePolicy> || kHasGetQuery<PostgreCachePolicy>,
328 "The PosgreSQL cache policy must contain a static data member "
329 "`kQuery` with a select statement or a static member function "
330 "`GetQuery` returning the query"
331 );
332 static_assert(
333 !(kHasQuery<PostgreCachePolicy> && kHasGetQuery<PostgreCachePolicy>),
334 "The PosgreSQL cache policy must define `kQuery` or "
335 "`GetQuery`, not both"
336 );
337 static_assert(
338 kHasUpdatedField<PostgreCachePolicy>,
339 "The PosgreSQL cache policy must contain a static member "
340 "`kUpdatedField`. If you don't want to use incremental updates, "
341 "please set its value to `nullptr`"
342 );
343 static_assert(CheckUpdatedFieldType<PostgreCachePolicy>());
344
345 static_assert(
346 ClusterHostType<PostgreCachePolicy>() & storages::postgres::kClusterHostRolesMask,
347 "Cluster host role must be specified for caching component, "
348 "please be more specific"
349 );
350
351 static storages::postgres::Query GetQuery() {
352 if constexpr (kHasGetQuery<PostgreCachePolicy>) {
353 return PostgreCachePolicy::GetQuery();
354 } else {
355 return PostgreCachePolicy::kQuery;
356 }
357 }
358
359 using BaseType = CachingComponentBase<DataCacheContainerType<PostgreCachePolicy>>;
360};
361
362inline constexpr std::chrono::minutes kDefaultFullUpdateTimeout{1};
363inline constexpr std::chrono::seconds kDefaultIncrementalUpdateTimeout{1};
364inline constexpr std::chrono::milliseconds kStatementTimeoutOff{0};
365inline constexpr std::chrono::milliseconds kCpuRelaxThreshold{10};
366inline constexpr std::chrono::milliseconds kCpuRelaxInterval{2};
367
368inline constexpr std::string_view kCopyStage = "copy_data";
369inline constexpr std::string_view kFetchStage = "fetch";
370inline constexpr std::string_view kParseStage = "parse";
371
372inline constexpr std::size_t kDefaultChunkSize = 1000;
373} // namespace pg_cache::detail
374
375/// @ingroup userver_components
376///
377/// @brief Caching component for PostgreSQL. See @ref pg_cache.
378///
379/// @see @ref pg_cache, @ref scripts/docs/en/userver/caches.md
380template <typename PostgreCachePolicy>
381class PostgreCache final : public pg_cache::detail::PolicyChecker<PostgreCachePolicy>::BaseType {
382public:
383 // Type aliases
384 using PolicyType = PostgreCachePolicy;
385 using ValueType = pg_cache::detail::ValueType<PolicyType>;
386 using RawValueType = pg_cache::detail::RawValueType<PolicyType>;
387 using DataType = pg_cache::detail::DataCacheContainerType<PolicyType>;
388 using PolicyCheckerType = pg_cache::detail::PolicyChecker<PostgreCachePolicy>;
389 using UpdatedFieldType = pg_cache::detail::UpdatedFieldType<PostgreCachePolicy>;
390 using BaseType = typename PolicyCheckerType::BaseType;
391
392 // Calculated constants
393 constexpr static bool kIncrementalUpdates = pg_cache::detail::kWantIncrementalUpdates<PolicyType>;
394 constexpr static auto kClusterHostTypeFlags = pg_cache::detail::ClusterHostType<PolicyType>();
395 constexpr static auto kName = PolicyType::kName;
396
397 PostgreCache(const ComponentConfig&, const ComponentContext&);
398 ~PostgreCache() override;
399
400 static yaml_config::Schema GetStaticConfigSchema();
401
402private:
403 using CachedData = std::unique_ptr<DataType>;
404
405 UpdatedFieldType GetLastUpdated(std::chrono::system_clock::time_point last_update, const DataType& cache) const;
406
407 void Update(
408 cache::UpdateType type,
409 const std::chrono::system_clock::time_point& last_update,
410 const std::chrono::system_clock::time_point& now,
411 cache::UpdateStatisticsScope& stats_scope
412 ) override;
413
414 bool MayReturnNull() const override;
415
416 CachedData GetDataSnapshot(cache::UpdateType type, tracing::ScopeTime& scope);
417 void CacheResults(
418 storages::postgres::ResultSet res,
419 CachedData& data_cache,
420 cache::UpdateStatisticsScope& stats_scope,
421 tracing::ScopeTime& scope
422 );
423
424 static storages::postgres::Query GetAllQuery();
425 static storages::postgres::Query GetDeltaQuery();
426
427 std::chrono::milliseconds ParseCorrection(const ComponentConfig& config);
428
429 std::vector<storages::postgres::ClusterPtr> clusters_;
430
431 const std::chrono::system_clock::duration correction_;
432 const std::chrono::milliseconds full_update_timeout_;
433 const std::chrono::milliseconds incremental_update_timeout_;
434 const std::size_t chunk_size_;
435 std::size_t cpu_relax_iterations_parse_{0};
436 std::size_t cpu_relax_iterations_copy_{0};
437};
438
439template <typename PostgreCachePolicy>
440inline constexpr bool kHasValidate<PostgreCache<PostgreCachePolicy>> = true;
441
442template <typename PostgreCachePolicy>
443PostgreCache<PostgreCachePolicy>::PostgreCache(const ComponentConfig& config, const ComponentContext& context)
444 : BaseType{config, context},
445 correction_{ParseCorrection(config)},
446 full_update_timeout_{
447 config["full-update-op-timeout"].As<std::chrono::milliseconds>(pg_cache::detail::kDefaultFullUpdateTimeout)},
448 incremental_update_timeout_{config["incremental-update-op-timeout"].As<std::chrono::milliseconds>(
449 pg_cache::detail::kDefaultIncrementalUpdateTimeout
450 )},
451 chunk_size_{config["chunk-size"].As<size_t>(pg_cache::detail::kDefaultChunkSize)} {
453 !chunk_size_ || storages::postgres::Portal::IsSupportedByDriver(),
454 "Either set 'chunk-size' to 0, or enable PostgreSQL portals by building "
455 "the framework with CMake option USERVER_FEATURE_PATCH_LIBPQ set to ON."
456 );
457
458 if (this->GetAllowedUpdateTypes() == cache::AllowedUpdateTypes::kFullAndIncremental && !kIncrementalUpdates) {
459 throw std::logic_error(
460 "Incremental update support is requested in config but no update field "
461 "name is specified in traits of '" +
462 config.Name() + "' cache"
463 );
464 }
465 if (correction_.count() < 0) {
466 throw std::logic_error(
467 "Refusing to set forward (negative) update correction requested in "
468 "config for '" +
469 config.Name() + "' cache"
470 );
471 }
472
473 const auto pg_alias = config["pgcomponent"].As<std::string>("");
474 if (pg_alias.empty()) {
475 throw storages::postgres::InvalidConfig{"No `pgcomponent` entry in configuration"};
476 }
477 auto& pg_cluster_comp = context.FindComponent<components::Postgres>(pg_alias);
478 const auto shard_count = pg_cluster_comp.GetShardCount();
479 clusters_.resize(shard_count);
480 for (size_t i = 0; i < shard_count; ++i) {
481 clusters_[i] = pg_cluster_comp.GetClusterForShard(i);
482 }
483
484 LOG_INFO() << "Cache " << kName << " full update query `" << GetAllQuery().Statement()
485 << "` incremental update query `" << GetDeltaQuery().Statement() << "`";
486
487 this->StartPeriodicUpdates();
488}
489
490template <typename PostgreCachePolicy>
491PostgreCache<PostgreCachePolicy>::~PostgreCache() {
492 this->StopPeriodicUpdates();
493}
494
495template <typename PostgreCachePolicy>
496storages::postgres::Query PostgreCache<PostgreCachePolicy>::GetAllQuery() {
497 storages::postgres::Query query = PolicyCheckerType::GetQuery();
498 if constexpr (pg_cache::detail::kHasWhere<PostgreCachePolicy>) {
499 return {fmt::format("{} where {}", query.Statement(), PostgreCachePolicy::kWhere), query.GetName()};
500 } else {
501 return query;
502 }
503}
504
505template <typename PostgreCachePolicy>
506storages::postgres::Query PostgreCache<PostgreCachePolicy>::GetDeltaQuery() {
507 if constexpr (kIncrementalUpdates) {
508 storages::postgres::Query query = PolicyCheckerType::GetQuery();
509
510 if constexpr (pg_cache::detail::kHasWhere<PostgreCachePolicy>) {
511 return {
512 fmt::format(
513 "{} where ({}) and {} >= $1",
514 query.Statement(),
515 PostgreCachePolicy::kWhere,
516 PolicyType::kUpdatedField
517 ),
518 query.GetName()};
519 } else {
520 return {fmt::format("{} where {} >= $1", query.Statement(), PolicyType::kUpdatedField), query.GetName()};
521 }
522 } else {
523 return GetAllQuery();
524 }
525}
526
527template <typename PostgreCachePolicy>
528std::chrono::milliseconds PostgreCache<PostgreCachePolicy>::ParseCorrection(const ComponentConfig& config) {
529 static constexpr std::string_view kUpdateCorrection = "update-correction";
530 if (pg_cache::detail::kHasCustomUpdated<PostgreCachePolicy> ||
531 this->GetAllowedUpdateTypes() == cache::AllowedUpdateTypes::kOnlyFull) {
532 return config[kUpdateCorrection].As<std::chrono::milliseconds>(0);
533 } else {
534 return config[kUpdateCorrection].As<std::chrono::milliseconds>();
535 }
536}
537
538template <typename PostgreCachePolicy>
539typename PostgreCache<PostgreCachePolicy>::UpdatedFieldType PostgreCache<PostgreCachePolicy>::GetLastUpdated(
540 [[maybe_unused]] std::chrono::system_clock::time_point last_update,
541 const DataType& cache
542) const {
543 if constexpr (pg_cache::detail::kHasCustomUpdated<PostgreCachePolicy>) {
544 return PostgreCachePolicy::GetLastKnownUpdated(cache);
545 } else {
546 return UpdatedFieldType{last_update - correction_};
547 }
548}
549
550template <typename PostgreCachePolicy>
551void PostgreCache<PostgreCachePolicy>::Update(
552 cache::UpdateType type,
553 const std::chrono::system_clock::time_point& last_update,
554 const std::chrono::system_clock::time_point& /*now*/,
555 cache::UpdateStatisticsScope& stats_scope
556) {
557 namespace pg = storages::postgres;
558 if constexpr (!kIncrementalUpdates) {
560 }
561 const auto query = (type == cache::UpdateType::kFull) ? GetAllQuery() : GetDeltaQuery();
562 const std::chrono::milliseconds timeout =
563 (type == cache::UpdateType::kFull) ? full_update_timeout_ : incremental_update_timeout_;
564
565 // COPY current cached data
566 auto scope = tracing::Span::CurrentSpan().CreateScopeTime(std::string{pg_cache::detail::kCopyStage});
567 auto data_cache = GetDataSnapshot(type, scope);
568 [[maybe_unused]] const auto old_size = data_cache->size();
569
570 scope.Reset(std::string{pg_cache::detail::kFetchStage});
571
572 size_t changes = 0;
573 // Iterate clusters
574 for (auto& cluster : clusters_) {
575 if (chunk_size_ > 0) {
576 auto trx = cluster->Begin(
577 kClusterHostTypeFlags,
578 pg::Transaction::RO,
579 pg::CommandControl{timeout, pg_cache::detail::kStatementTimeoutOff}
580 );
581 auto portal = trx.MakePortal(query, GetLastUpdated(last_update, *data_cache));
582 while (portal) {
583 scope.Reset(std::string{pg_cache::detail::kFetchStage});
584 auto res = portal.Fetch(chunk_size_);
585 stats_scope.IncreaseDocumentsReadCount(res.Size());
586
587 scope.Reset(std::string{pg_cache::detail::kParseStage});
588 CacheResults(res, data_cache, stats_scope, scope);
589 changes += res.Size();
590 }
591 trx.Commit();
592 } else {
593 bool has_parameter = query.Statement().find('$') != std::string::npos;
594 auto res = has_parameter ? cluster->Execute(
595 kClusterHostTypeFlags,
596 pg::CommandControl{timeout, pg_cache::detail::kStatementTimeoutOff},
597 query,
598 GetLastUpdated(last_update, *data_cache)
599 )
600 : cluster->Execute(
601 kClusterHostTypeFlags,
602 pg::CommandControl{timeout, pg_cache::detail::kStatementTimeoutOff},
603 query
604 );
605 stats_scope.IncreaseDocumentsReadCount(res.Size());
606
607 scope.Reset(std::string{pg_cache::detail::kParseStage});
608 CacheResults(res, data_cache, stats_scope, scope);
609 changes += res.Size();
610 }
611 }
612
613 scope.Reset();
614
615 if constexpr (pg_cache::detail::kIsContainerCopiedByElement<DataType>) {
616 if (old_size > 0) {
617 const auto elapsed_copy = scope.ElapsedTotal(std::string{pg_cache::detail::kCopyStage});
618 if (elapsed_copy > pg_cache::detail::kCpuRelaxThreshold) {
619 cpu_relax_iterations_copy_ = static_cast<std::size_t>(
620 static_cast<double>(old_size) / (elapsed_copy / pg_cache::detail::kCpuRelaxInterval)
621 );
622 LOG_TRACE() << "Elapsed time for copying " << kName << " " << elapsed_copy.count() << " for " << changes
623 << " data items is over threshold. Will relax CPU every " << cpu_relax_iterations_parse_
624 << " iterations";
625 }
626 }
627 }
628
629 if (changes > 0) {
630 const auto elapsed_parse = scope.ElapsedTotal(std::string{pg_cache::detail::kParseStage});
631 if (elapsed_parse > pg_cache::detail::kCpuRelaxThreshold) {
632 cpu_relax_iterations_parse_ = static_cast<std::size_t>(
633 static_cast<double>(changes) / (elapsed_parse / pg_cache::detail::kCpuRelaxInterval)
634 );
635 LOG_TRACE() << "Elapsed time for parsing " << kName << " " << elapsed_parse.count() << " for " << changes
636 << " data items is over threshold. Will relax CPU every " << cpu_relax_iterations_parse_
637 << " iterations";
638 }
639 }
640 if (changes > 0 || type == cache::UpdateType::kFull) {
641 // Set current cache
642 pg_cache::detail::OnWritesDone(*data_cache);
643 stats_scope.Finish(data_cache->size());
644 this->Set(std::move(data_cache));
645 } else {
646 stats_scope.FinishNoChanges();
647 }
648}
649
650template <typename PostgreCachePolicy>
651bool PostgreCache<PostgreCachePolicy>::MayReturnNull() const {
652 return pg_cache::detail::MayReturnNull<PolicyType>();
653}
654
655template <typename PostgreCachePolicy>
656void PostgreCache<PostgreCachePolicy>::CacheResults(
657 storages::postgres::ResultSet res,
658 CachedData& data_cache,
659 cache::UpdateStatisticsScope& stats_scope,
660 tracing::ScopeTime& scope
661) {
662 auto values = res.AsSetOf<RawValueType>(storages::postgres::kRowTag);
663 utils::CpuRelax relax{cpu_relax_iterations_parse_, &scope};
664 for (auto p = values.begin(); p != values.end(); ++p) {
665 relax.Relax();
666 try {
667 using pg_cache::detail::CacheInsertOrAssign;
668 CacheInsertOrAssign(
669 *data_cache, pg_cache::detail::ExtractValue<PostgreCachePolicy>(*p), PostgreCachePolicy::kKeyMember
670 );
671 } catch (const std::exception& e) {
672 stats_scope.IncreaseDocumentsParseFailures(1);
673 LOG_ERROR() << "Error parsing data row in cache '" << kName << "' to '"
674 << compiler::GetTypeName<ValueType>() << "': " << e.what();
675 }
676 }
677}
678
679template <typename PostgreCachePolicy>
680typename PostgreCache<PostgreCachePolicy>::CachedData
681PostgreCache<PostgreCachePolicy>::GetDataSnapshot(cache::UpdateType type, tracing::ScopeTime& scope) {
683 auto data = this->Get();
684 if (data) {
685 return pg_cache::detail::CopyContainer(*data, cpu_relax_iterations_copy_, scope);
686 }
687 }
688 return std::make_unique<DataType>();
689}
690
691namespace impl {
692
693std::string GetPostgreCacheSchema();
694
695} // namespace impl
696
697template <typename PostgreCachePolicy>
698yaml_config::Schema PostgreCache<PostgreCachePolicy>::GetStaticConfigSchema() {
699 using ParentType = typename pg_cache::detail::PolicyChecker<PostgreCachePolicy>::BaseType;
700 return yaml_config::MergeSchemas<ParentType>(impl::GetPostgreCacheSchema());
701}
702
703} // namespace components
704
705namespace utils::impl::projected_set {
706
707template <typename Set, typename Value, typename KeyMember>
708void CacheInsertOrAssign(Set& set, Value&& value, const KeyMember& /*key_member*/) {
709 DoInsert(set, std::forward<Value>(value));
710}
711
712} // namespace utils::impl::projected_set
713
714USERVER_NAMESPACE_END