userver: userver/cache/base_postgres_cache.hpp Source File
Loading...
Searching...
No Matches
base_postgres_cache.hpp
Go to the documentation of this file.
1#pragma once
2
3/// @file userver/cache/base_postgres_cache.hpp
4/// @brief @copybrief components::PostgreCache
5
6#include <userver/cache/base_postgres_cache_fwd.hpp>
7
8#include <chrono>
9#include <map>
10#include <string_view>
11#include <type_traits>
12#include <unordered_map>
13
14#include <fmt/format.h>
15
16#include <userver/cache/cache_statistics.hpp>
17#include <userver/cache/caching_component_base.hpp>
18#include <userver/components/component_config.hpp>
19#include <userver/components/component_context.hpp>
20
21#include <userver/storages/postgres/cluster.hpp>
22#include <userver/storages/postgres/component.hpp>
23#include <userver/storages/postgres/io/chrono.hpp>
24
25#include <userver/compiler/demangle.hpp>
26#include <userver/logging/log.hpp>
27#include <userver/tracing/span.hpp>
28#include <userver/utils/assert.hpp>
29#include <userver/utils/cpu_relax.hpp>
30#include <userver/utils/meta.hpp>
31#include <userver/utils/void_t.hpp>
32#include <userver/yaml_config/merge_schemas.hpp>
33
34USERVER_NAMESPACE_BEGIN
35
36namespace components {
37
38// clang-format off
39
40/// @page pg_cache Caching Component for PostgreSQL
41///
42/// A typical components::PostgreCache usage consists of trait definition:
43///
44/// @snippet cache/postgres_cache_test.cpp Pg Cache Policy Trivial
45///
46/// and registration of the component in components::ComponentList:
47///
48/// @snippet cache/postgres_cache_test.cpp Pg Cache Trivial Usage
49///
50/// See @ref scripts/docs/en/userver/caches.md for introduction into caches.
51///
52///
53/// @section pg_cc_configuration Configuration
54///
55/// components::PostgreCache static configuration file should have a PostgreSQL
56/// component name specified in `pgcomponent` configuration parameter.
57///
58/// Optionally the operation timeouts for cache loading can be specified.
59///
60/// ### Avoiding memory leaks
61/// components::CachingComponentBase
62///
63/// Name | Description | Default value
64/// ---- | ----------- | -------------
65/// full-update-op-timeout | timeout for a full update | 1m
66/// incremental-update-op-timeout | timeout for an incremental update | 1s
67/// update-correction | incremental update window adjustment | - (0 for caches with defined GetLastKnownUpdated)
68/// chunk-size | number of rows to request from PostgreSQL via portals, 0 to fetch all rows in one request without portals | 1000
69///
70/// @section pg_cc_cache_policy Cache policy
71///
72/// Cache policy is the template argument of components::PostgreCache component.
73/// Please see the following code snippet for documentation.
74///
75/// @snippet cache/postgres_cache_test.cpp Pg Cache Policy Example
76///
77/// The query can be a std::string. But due to non-guaranteed order of static
78/// data members initialization, std::string should be returned from a static
79/// member function, please see the following code snippet.
80///
81/// @snippet cache/postgres_cache_test.cpp Pg Cache Policy GetQuery Example
82///
83/// Policy may have static function GetLastKnownUpdated. It should be used
84/// when new entries from database are taken via revision, identifier, or
85/// anything else, but not timestamp of the last update.
86/// If this function is supplied, new entries are taken from db with condition
87/// 'WHERE kUpdatedField > GetLastKnownUpdated(cache_container)'.
88/// Otherwise, condition is
89/// 'WHERE kUpdatedField > last_update - correction_'.
90/// See the following code snippet for an example of usage
91///
92/// @snippet cache/postgres_cache_test.cpp Pg Cache Policy Custom Updated Example
93///
94/// In case one provides a custom CacheContainer within Policy, it is notified
95/// of Update completion via its public member function OnWritesDone, if any.
96/// See the following code snippet for an example of usage:
97///
98/// @snippet cache/postgres_cache_test.cpp Pg Cache Policy Custom Container With Write Notification Example
99///
100/// @section pg_cc_forward_declaration Forward Declaration
101///
102/// To forward declare a cache you can forward declare a trait and
103/// include userver/cache/base_postgres_cache_fwd.hpp header. It is also useful to
104/// forward declare the cache value type.
105///
106/// @snippet cache/postgres_cache_test_fwd.hpp Pg Cache Fwd Example
107///
108/// ----------
109///
110/// @htmlonly <div class="bottom-nav"> @endhtmlonly
111/// ⇦ @ref scripts/docs/en/userver/cache_dumps.md | @ref scripts/docs/en/userver/lru_cache.md ⇨
112/// @htmlonly </div> @endhtmlonly
113
114// clang-format on
115
116namespace pg_cache::detail {
117
118template <typename T>
119using ValueType = typename T::ValueType;
120template <typename T>
121inline constexpr bool kHasValueType = meta::kIsDetected<ValueType, T>;
122
123template <typename T>
124using RawValueTypeImpl = typename T::RawValueType;
125template <typename T>
126inline constexpr bool kHasRawValueType = meta::kIsDetected<RawValueTypeImpl, T>;
127template <typename T>
128using RawValueType = meta::DetectedOr<ValueType<T>, RawValueTypeImpl, T>;
129
130template <typename PostgreCachePolicy>
131auto ExtractValue(RawValueType<PostgreCachePolicy>&& raw) {
132 if constexpr (kHasRawValueType<PostgreCachePolicy>) {
133 return Convert(std::move(raw),
134 formats::parse::To<ValueType<PostgreCachePolicy>>());
135 } else {
136 return std::move(raw);
137 }
138}
139
140// Component name in policy
141template <typename T>
142using HasNameImpl = std::enable_if_t<!std::string_view{T::kName}.empty()>;
143template <typename T>
144inline constexpr bool kHasName = meta::kIsDetected<HasNameImpl, T>;
145
146// Component query in policy
147template <typename T>
148using HasQueryImpl = decltype(T::kQuery);
149template <typename T>
150inline constexpr bool kHasQuery = meta::kIsDetected<HasQueryImpl, T>;
151
152// Component GetQuery in policy
153template <typename T>
154using HasGetQueryImpl = decltype(T::GetQuery());
155template <typename T>
156inline constexpr bool kHasGetQuery = meta::kIsDetected<HasGetQueryImpl, T>;
157
158// Component kWhere in policy
159template <typename T>
160using HasWhere = decltype(T::kWhere);
161template <typename T>
162inline constexpr bool kHasWhere = meta::kIsDetected<HasWhere, T>;
163
164// Update field
165template <typename T>
166using HasUpdatedField = decltype(T::kUpdatedField);
167template <typename T>
168inline constexpr bool kHasUpdatedField = meta::kIsDetected<HasUpdatedField, T>;
169
170template <typename T>
171using WantIncrementalUpdates =
172 std::enable_if_t<!std::string_view{T::kUpdatedField}.empty()>;
173template <typename T>
174inline constexpr bool kWantIncrementalUpdates =
175 meta::kIsDetected<WantIncrementalUpdates, T>;
176
177// Key member in policy
178template <typename T>
179using KeyMemberTypeImpl =
180 std::decay_t<std::invoke_result_t<decltype(T::kKeyMember), ValueType<T>>>;
181template <typename T>
182inline constexpr bool kHasKeyMember = meta::kIsDetected<KeyMemberTypeImpl, T>;
183template <typename T>
184using KeyMemberType = meta::DetectedType<KeyMemberTypeImpl, T>;
185
186// Data container for cache
187template <typename T, typename = USERVER_NAMESPACE::utils::void_t<>>
188struct DataCacheContainer {
189 static_assert(meta::kIsStdHashable<KeyMemberType<T>>,
190 "With default CacheContainer, key type must be std::hash-able");
191
192 using type = std::unordered_map<KeyMemberType<T>, ValueType<T>>;
193};
194
195template <typename T>
196struct DataCacheContainer<
197 T, USERVER_NAMESPACE::utils::void_t<typename T::CacheContainer>> {
198 using type = typename T::CacheContainer;
199};
200
201template <typename T>
202using DataCacheContainerType = typename DataCacheContainer<T>::type;
203
204// We have to whitelist container types, for which we perform by-element
205// copying, because it's not correct for certain custom containers.
206template <typename T>
207inline constexpr bool kIsContainerCopiedByElement =
208 meta::kIsInstantiationOf<std::unordered_map, T> ||
209 meta::kIsInstantiationOf<std::map, T>;
210
211template <typename T>
212std::unique_ptr<T> CopyContainer(
213 const T& container, [[maybe_unused]] std::size_t cpu_relax_iterations,
214 tracing::ScopeTime& scope) {
215 if constexpr (kIsContainerCopiedByElement<T>) {
216 auto copy = std::make_unique<T>();
217 if constexpr (meta::kIsReservable<T>) {
218 copy->reserve(container.size());
219 }
220
221 utils::CpuRelax relax{cpu_relax_iterations, &scope};
222 for (const auto& kv : container) {
223 relax.Relax();
224 copy->insert(kv);
225 }
226 return copy;
227 } else {
228 return std::make_unique<T>(container);
229 }
230}
231
232template <typename Container, typename Value, typename KeyMember,
233 typename... Args>
234void CacheInsertOrAssign(Container& container, Value&& value,
235 const KeyMember& key_member, Args&&... /*args*/) {
236 // Args are only used to de-prioritize this default overload.
237 static_assert(sizeof...(Args) == 0);
238 // Copy 'key' to avoid aliasing issues in 'insert_or_assign'.
239 auto key = std::invoke(key_member, value);
240 container.insert_or_assign(std::move(key), std::forward<Value>(value));
241}
242
243template <typename T>
244using HasOnWritesDoneImpl = decltype(std::declval<T&>().OnWritesDone());
245
246template <typename T>
247void OnWritesDone(T& container) {
248 if constexpr (meta::kIsDetected<HasOnWritesDoneImpl, T>) {
249 container.OnWritesDone();
250 }
251}
252
253template <typename T>
254using HasCustomUpdatedImpl =
255 decltype(T::GetLastKnownUpdated(std::declval<DataCacheContainerType<T>>()));
256
257template <typename T>
258inline constexpr bool kHasCustomUpdated =
259 meta::kIsDetected<HasCustomUpdatedImpl, T>;
260
261template <typename T>
262using UpdatedFieldTypeImpl = typename T::UpdatedFieldType;
263template <typename T>
264inline constexpr bool kHasUpdatedFieldType =
265 meta::kIsDetected<UpdatedFieldTypeImpl, T>;
266template <typename T>
267using UpdatedFieldType =
268 meta::DetectedOr<storages::postgres::TimePointTz, UpdatedFieldTypeImpl, T>;
269
270template <typename T>
271constexpr bool CheckUpdatedFieldType() {
272 if constexpr (kHasUpdatedFieldType<T>) {
273 static_assert(
274 std::is_same_v<typename T::UpdatedFieldType,
275 storages::postgres::TimePointTz> ||
276 std::is_same_v<typename T::UpdatedFieldType,
277 storages::postgres::TimePoint> ||
278 kHasCustomUpdated<T>,
279 "Invalid UpdatedFieldType, must be either TimePointTz or TimePoint");
280 } else {
281 static_assert(!kWantIncrementalUpdates<T>,
282 "UpdatedFieldType must be explicitly specified when using "
283 "incremental updates");
284 }
285 return true;
286}
287
288// Cluster host type policy
289template <typename T>
290using HasClusterHostTypeImpl = decltype(T::kClusterHostType);
291
292template <typename T>
293constexpr storages::postgres::ClusterHostTypeFlags ClusterHostType() {
294 if constexpr (meta::kIsDetected<HasClusterHostTypeImpl, T>) {
295 return T::kClusterHostType;
296 } else {
297 return storages::postgres::ClusterHostType::kSlave;
298 }
299}
300
301// May return null policy
302template <typename T>
303using HasMayReturnNull = decltype(T::kMayReturnNull);
304
305template <typename T>
306constexpr bool MayReturnNull() {
307 if constexpr (meta::kIsDetected<HasMayReturnNull, T>) {
308 return T::kMayReturnNull;
309 } else {
310 return false;
311 }
312}
313
314template <typename PostgreCachePolicy>
315struct PolicyChecker {
316 // Static assertions for cache traits
317 static_assert(
318 kHasName<PostgreCachePolicy>,
319 "The PosgreSQL cache policy must contain a static member `kName`");
320 static_assert(
321 kHasValueType<PostgreCachePolicy>,
322 "The PosgreSQL cache policy must define a type alias `ValueType`");
323 static_assert(
324 kHasKeyMember<PostgreCachePolicy>,
325 "The PostgreSQL cache policy must contain a static member `kKeyMember` "
326 "with a pointer to a data or a function member with the object's key");
327 static_assert(kHasQuery<PostgreCachePolicy> ||
328 kHasGetQuery<PostgreCachePolicy>,
329 "The PosgreSQL cache policy must contain a static data member "
330 "`kQuery` with a select statement or a static member function "
331 "`GetQuery` returning the query");
332 static_assert(!(kHasQuery<PostgreCachePolicy> &&
333 kHasGetQuery<PostgreCachePolicy>),
334 "The PosgreSQL cache policy must define `kQuery` or "
335 "`GetQuery`, not both");
336 static_assert(
337 kHasUpdatedField<PostgreCachePolicy>,
338 "The PosgreSQL cache policy must contain a static member "
339 "`kUpdatedField`. If you don't want to use incremental updates, "
340 "please set its value to `nullptr`");
341 static_assert(CheckUpdatedFieldType<PostgreCachePolicy>());
342
343 static_assert(ClusterHostType<PostgreCachePolicy>() &
344 storages::postgres::kClusterHostRolesMask,
345 "Cluster host role must be specified for caching component, "
346 "please be more specific");
347
348 static storages::postgres::Query GetQuery() {
349 if constexpr (kHasGetQuery<PostgreCachePolicy>) {
350 return PostgreCachePolicy::GetQuery();
351 } else {
352 return PostgreCachePolicy::kQuery;
353 }
354 }
355
356 using BaseType =
357 CachingComponentBase<DataCacheContainerType<PostgreCachePolicy>>;
358};
359
360inline constexpr std::chrono::minutes kDefaultFullUpdateTimeout{1};
361inline constexpr std::chrono::seconds kDefaultIncrementalUpdateTimeout{1};
362inline constexpr std::chrono::milliseconds kStatementTimeoutOff{0};
363inline constexpr std::chrono::milliseconds kCpuRelaxThreshold{10};
364inline constexpr std::chrono::milliseconds kCpuRelaxInterval{2};
365
366inline constexpr std::string_view kCopyStage = "copy_data";
367inline constexpr std::string_view kFetchStage = "fetch";
368inline constexpr std::string_view kParseStage = "parse";
369
370inline constexpr std::size_t kDefaultChunkSize = 1000;
371} // namespace pg_cache::detail
372
373/// @ingroup userver_components
374///
375/// @brief Caching component for PostgreSQL. See @ref pg_cache.
376///
377/// @see @ref pg_cache, @ref scripts/docs/en/userver/caches.md
378template <typename PostgreCachePolicy>
379class PostgreCache final
380 : public pg_cache::detail::PolicyChecker<PostgreCachePolicy>::BaseType {
381 public:
382 // Type aliases
383 using PolicyType = PostgreCachePolicy;
384 using ValueType = pg_cache::detail::ValueType<PolicyType>;
385 using RawValueType = pg_cache::detail::RawValueType<PolicyType>;
386 using DataType = pg_cache::detail::DataCacheContainerType<PolicyType>;
387 using PolicyCheckerType = pg_cache::detail::PolicyChecker<PostgreCachePolicy>;
388 using UpdatedFieldType =
389 pg_cache::detail::UpdatedFieldType<PostgreCachePolicy>;
390 using BaseType = typename PolicyCheckerType::BaseType;
391
392 // Calculated constants
393 constexpr static bool kIncrementalUpdates =
394 pg_cache::detail::kWantIncrementalUpdates<PolicyType>;
395 constexpr static auto kClusterHostTypeFlags =
396 pg_cache::detail::ClusterHostType<PolicyType>();
397 constexpr static auto kName = PolicyType::kName;
398
399 PostgreCache(const ComponentConfig&, const ComponentContext&);
400 ~PostgreCache() override;
401
402 static yaml_config::Schema GetStaticConfigSchema();
403
404 private:
405 using CachedData = std::unique_ptr<DataType>;
406
407 UpdatedFieldType GetLastUpdated(
408 std::chrono::system_clock::time_point last_update,
409 const DataType& cache) const;
410
411 void Update(cache::UpdateType type,
412 const std::chrono::system_clock::time_point& last_update,
413 const std::chrono::system_clock::time_point& now,
414 cache::UpdateStatisticsScope& stats_scope) override;
415
416 bool MayReturnNull() const override;
417
418 CachedData GetDataSnapshot(cache::UpdateType type, tracing::ScopeTime& scope);
419 void CacheResults(storages::postgres::ResultSet res, CachedData& data_cache,
420 cache::UpdateStatisticsScope& stats_scope,
421 tracing::ScopeTime& scope);
422
423 static storages::postgres::Query GetAllQuery();
424 static storages::postgres::Query GetDeltaQuery();
425
426 std::chrono::milliseconds ParseCorrection(const ComponentConfig& config);
427
428 std::vector<storages::postgres::ClusterPtr> clusters_;
429
430 const std::chrono::system_clock::duration correction_;
431 const std::chrono::milliseconds full_update_timeout_;
432 const std::chrono::milliseconds incremental_update_timeout_;
433 const std::size_t chunk_size_;
434 std::size_t cpu_relax_iterations_parse_{0};
435 std::size_t cpu_relax_iterations_copy_{0};
436};
437
438template <typename PostgreCachePolicy>
439inline constexpr bool kHasValidate<PostgreCache<PostgreCachePolicy>> = true;
440
441template <typename PostgreCachePolicy>
442PostgreCache<PostgreCachePolicy>::PostgreCache(const ComponentConfig& config,
443 const ComponentContext& context)
444 : BaseType{config, context},
445 correction_{ParseCorrection(config)},
446 full_update_timeout_{
447 config["full-update-op-timeout"].As<std::chrono::milliseconds>(
448 pg_cache::detail::kDefaultFullUpdateTimeout)},
449 incremental_update_timeout_{
450 config["incremental-update-op-timeout"].As<std::chrono::milliseconds>(
451 pg_cache::detail::kDefaultIncrementalUpdateTimeout)},
452 chunk_size_{config["chunk-size"].As<size_t>(
453 pg_cache::detail::kDefaultChunkSize)} {
456 "Either set 'chunk-size' to 0, or enable PostgreSQL portals by building "
457 "the framework with CMake option USERVER_FEATURE_PATCH_LIBPQ set to ON.");
458
459 if (this->GetAllowedUpdateTypes() ==
460 cache::AllowedUpdateTypes::kFullAndIncremental &&
461 !kIncrementalUpdates) {
462 throw std::logic_error(
463 "Incremental update support is requested in config but no update field "
464 "name is specified in traits of '" +
465 config.Name() + "' cache");
466 }
467 if (correction_.count() < 0) {
468 throw std::logic_error(
469 "Refusing to set forward (negative) update correction requested in "
470 "config for '" +
471 config.Name() + "' cache");
472 }
473
474 const auto pg_alias = config["pgcomponent"].As<std::string>("");
475 if (pg_alias.empty()) {
477 "No `pgcomponent` entry in configuration"};
478 }
479 auto& pg_cluster_comp = context.FindComponent<components::Postgres>(pg_alias);
480 const auto shard_count = pg_cluster_comp.GetShardCount();
481 clusters_.resize(shard_count);
482 for (size_t i = 0; i < shard_count; ++i) {
483 clusters_[i] = pg_cluster_comp.GetClusterForShard(i);
484 }
485
486 LOG_INFO() << "Cache " << kName << " full update query `"
487 << GetAllQuery().Statement() << "` incremental update query `"
488 << GetDeltaQuery().Statement() << "`";
489
490 this->StartPeriodicUpdates();
491}
492
493template <typename PostgreCachePolicy>
494PostgreCache<PostgreCachePolicy>::~PostgreCache() {
495 this->StopPeriodicUpdates();
496}
497
498template <typename PostgreCachePolicy>
499storages::postgres::Query PostgreCache<PostgreCachePolicy>::GetAllQuery() {
500 storages::postgres::Query query = PolicyCheckerType::GetQuery();
501 if constexpr (pg_cache::detail::kHasWhere<PostgreCachePolicy>) {
502 return {fmt::format("{} where {}", query.Statement(),
503 PostgreCachePolicy::kWhere),
504 query.GetName()};
505 } else {
506 return query;
507 }
508}
509
510template <typename PostgreCachePolicy>
511storages::postgres::Query PostgreCache<PostgreCachePolicy>::GetDeltaQuery() {
512 if constexpr (kIncrementalUpdates) {
513 storages::postgres::Query query = PolicyCheckerType::GetQuery();
514
515 if constexpr (pg_cache::detail::kHasWhere<PostgreCachePolicy>) {
516 return {
517 fmt::format("{} where ({}) and {} >= $1", query.Statement(),
518 PostgreCachePolicy::kWhere, PolicyType::kUpdatedField),
519 query.GetName()};
520 } else {
521 return {fmt::format("{} where {} >= $1", query.Statement(),
522 PolicyType::kUpdatedField),
523 query.GetName()};
524 }
525 } else {
526 return GetAllQuery();
527 }
528}
529
530template <typename PostgreCachePolicy>
531std::chrono::milliseconds PostgreCache<PostgreCachePolicy>::ParseCorrection(
532 const ComponentConfig& config) {
533 static constexpr std::string_view kUpdateCorrection = "update-correction";
534 if (pg_cache::detail::kHasCustomUpdated<PostgreCachePolicy> ||
535 this->GetAllowedUpdateTypes() == cache::AllowedUpdateTypes::kOnlyFull) {
536 return config[kUpdateCorrection].As<std::chrono::milliseconds>(0);
537 } else {
538 return config[kUpdateCorrection].As<std::chrono::milliseconds>();
539 }
540}
541
542template <typename PostgreCachePolicy>
543typename PostgreCache<PostgreCachePolicy>::UpdatedFieldType
544PostgreCache<PostgreCachePolicy>::GetLastUpdated(
545 [[maybe_unused]] std::chrono::system_clock::time_point last_update,
546 const DataType& cache) const {
547 if constexpr (pg_cache::detail::kHasCustomUpdated<PostgreCachePolicy>) {
548 return PostgreCachePolicy::GetLastKnownUpdated(cache);
549 } else {
550 return UpdatedFieldType{last_update - correction_};
551 }
552}
553
554template <typename PostgreCachePolicy>
555void PostgreCache<PostgreCachePolicy>::Update(
556 cache::UpdateType type,
557 const std::chrono::system_clock::time_point& last_update,
558 const std::chrono::system_clock::time_point& /*now*/,
559 cache::UpdateStatisticsScope& stats_scope) {
560 namespace pg = storages::postgres;
561 if constexpr (!kIncrementalUpdates) {
562 type = cache::UpdateType::kFull;
563 }
564 const auto query =
565 (type == cache::UpdateType::kFull) ? GetAllQuery() : GetDeltaQuery();
566 const std::chrono::milliseconds timeout = (type == cache::UpdateType::kFull)
567 ? full_update_timeout_
568 : incremental_update_timeout_;
569
570 // COPY current cached data
571 auto scope = tracing::Span::CurrentSpan().CreateScopeTime(
572 std::string{pg_cache::detail::kCopyStage});
573 auto data_cache = GetDataSnapshot(type, scope);
574 [[maybe_unused]] const auto old_size = data_cache->size();
575
576 scope.Reset(std::string{pg_cache::detail::kFetchStage});
577
578 size_t changes = 0;
579 // Iterate clusters
580 for (auto& cluster : clusters_) {
581 if (chunk_size_ > 0) {
582 auto trx = cluster->Begin(
583 kClusterHostTypeFlags, pg::Transaction::RO,
584 pg::CommandControl{timeout, pg_cache::detail::kStatementTimeoutOff});
585 auto portal =
586 trx.MakePortal(query, GetLastUpdated(last_update, *data_cache));
587 while (portal) {
588 scope.Reset(std::string{pg_cache::detail::kFetchStage});
589 auto res = portal.Fetch(chunk_size_);
590 stats_scope.IncreaseDocumentsReadCount(res.Size());
591
592 scope.Reset(std::string{pg_cache::detail::kParseStage});
593 CacheResults(res, data_cache, stats_scope, scope);
594 changes += res.Size();
595 }
596 trx.Commit();
597 } else {
598 bool has_parameter = query.Statement().find('$') != std::string::npos;
599 auto res = has_parameter
600 ? cluster->Execute(
601 kClusterHostTypeFlags,
602 pg::CommandControl{
603 timeout, pg_cache::detail::kStatementTimeoutOff},
604 query, GetLastUpdated(last_update, *data_cache))
605 : cluster->Execute(
606 kClusterHostTypeFlags,
607 pg::CommandControl{
608 timeout, pg_cache::detail::kStatementTimeoutOff},
609 query);
610 stats_scope.IncreaseDocumentsReadCount(res.Size());
611
612 scope.Reset(std::string{pg_cache::detail::kParseStage});
613 CacheResults(res, data_cache, stats_scope, scope);
614 changes += res.Size();
615 }
616 }
617
618 scope.Reset();
619
620 if constexpr (pg_cache::detail::kIsContainerCopiedByElement<DataType>) {
621 if (old_size > 0) {
622 const auto elapsed_copy =
623 scope.ElapsedTotal(std::string{pg_cache::detail::kCopyStage});
624 if (elapsed_copy > pg_cache::detail::kCpuRelaxThreshold) {
625 cpu_relax_iterations_copy_ = static_cast<std::size_t>(
626 static_cast<double>(old_size) /
627 (elapsed_copy / pg_cache::detail::kCpuRelaxInterval));
628 LOG_TRACE() << "Elapsed time for copying " << kName << " "
629 << elapsed_copy.count() << " for " << changes
630 << " data items is over threshold. Will relax CPU every "
631 << cpu_relax_iterations_parse_ << " iterations";
632 }
633 }
634 }
635
636 if (changes > 0) {
637 const auto elapsed_parse =
638 scope.ElapsedTotal(std::string{pg_cache::detail::kParseStage});
639 if (elapsed_parse > pg_cache::detail::kCpuRelaxThreshold) {
640 cpu_relax_iterations_parse_ = static_cast<std::size_t>(
641 static_cast<double>(changes) /
642 (elapsed_parse / pg_cache::detail::kCpuRelaxInterval));
643 LOG_TRACE() << "Elapsed time for parsing " << kName << " "
644 << elapsed_parse.count() << " for " << changes
645 << " data items is over threshold. Will relax CPU every "
646 << cpu_relax_iterations_parse_ << " iterations";
647 }
648 }
649 if (changes > 0 || type == cache::UpdateType::kFull) {
650 // Set current cache
651 stats_scope.Finish(data_cache->size());
652 pg_cache::detail::OnWritesDone(*data_cache);
653 this->Set(std::move(data_cache));
654 } else {
655 stats_scope.FinishNoChanges();
656 }
657}
658
659template <typename PostgreCachePolicy>
660bool PostgreCache<PostgreCachePolicy>::MayReturnNull() const {
661 return pg_cache::detail::MayReturnNull<PolicyType>();
662}
663
664template <typename PostgreCachePolicy>
665void PostgreCache<PostgreCachePolicy>::CacheResults(
666 storages::postgres::ResultSet res, CachedData& data_cache,
667 cache::UpdateStatisticsScope& stats_scope, tracing::ScopeTime& scope) {
668 auto values = res.AsSetOf<RawValueType>(storages::postgres::kRowTag);
669 utils::CpuRelax relax{cpu_relax_iterations_parse_, &scope};
670 for (auto p = values.begin(); p != values.end(); ++p) {
671 relax.Relax();
672 try {
673 using pg_cache::detail::CacheInsertOrAssign;
674 CacheInsertOrAssign(
675 *data_cache, pg_cache::detail::ExtractValue<PostgreCachePolicy>(*p),
676 PostgreCachePolicy::kKeyMember);
677 } catch (const std::exception& e) {
678 stats_scope.IncreaseDocumentsParseFailures(1);
679 LOG_ERROR() << "Error parsing data row in cache '" << kName << "' to '"
680 << compiler::GetTypeName<ValueType>() << "': " << e.what();
681 }
682 }
683}
684
685template <typename PostgreCachePolicy>
686typename PostgreCache<PostgreCachePolicy>::CachedData
687PostgreCache<PostgreCachePolicy>::GetDataSnapshot(cache::UpdateType type,
688 tracing::ScopeTime& scope) {
689 if (type == cache::UpdateType::kIncremental) {
690 auto data = this->Get();
691 if (data) {
692 return pg_cache::detail::CopyContainer(*data, cpu_relax_iterations_copy_,
693 scope);
694 }
695 }
696 return std::make_unique<DataType>();
697}
698
699namespace impl {
700
701std::string GetPostgreCacheSchema();
702
703} // namespace impl
704
705template <typename PostgreCachePolicy>
706yaml_config::Schema PostgreCache<PostgreCachePolicy>::GetStaticConfigSchema() {
707 using ParentType =
708 typename pg_cache::detail::PolicyChecker<PostgreCachePolicy>::BaseType;
709 return yaml_config::MergeSchemas<ParentType>(impl::GetPostgreCacheSchema());
710}
711
712} // namespace components
713
714namespace utils::impl::projected_set {
715
716template <typename Set, typename Value, typename KeyMember>
717void CacheInsertOrAssign(Set& set, Value&& value,
718 const KeyMember& /*key_member*/) {
719 DoInsert(set, std::forward<Value>(value));
720}
721
722} // namespace utils::impl::projected_set
723
724USERVER_NAMESPACE_END