userver: userver/cache/base_postgres_cache.hpp Source File
⚠️ This is the documentation for an old userver version. Click here to switch to the latest version.
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages Concepts
base_postgres_cache.hpp
Go to the documentation of this file.
1#pragma once
2
3/// @file userver/cache/base_postgres_cache.hpp
4/// @brief @copybrief components::PostgreCache
5
6#include <userver/cache/base_postgres_cache_fwd.hpp>
7
8#include <chrono>
9#include <map>
10#include <string_view>
11#include <type_traits>
12#include <unordered_map>
13
14#include <fmt/format.h>
15
16#include <userver/cache/cache_statistics.hpp>
17#include <userver/cache/caching_component_base.hpp>
18#include <userver/components/component_config.hpp>
19#include <userver/components/component_context.hpp>
20
21#include <userver/storages/postgres/cluster.hpp>
22#include <userver/storages/postgres/component.hpp>
23#include <userver/storages/postgres/io/chrono.hpp>
24
25#include <userver/compiler/demangle.hpp>
26#include <userver/logging/log.hpp>
27#include <userver/tracing/span.hpp>
28#include <userver/utils/assert.hpp>
29#include <userver/utils/cpu_relax.hpp>
30#include <userver/utils/meta.hpp>
31#include <userver/utils/void_t.hpp>
32#include <userver/yaml_config/merge_schemas.hpp>
33
34USERVER_NAMESPACE_BEGIN
35
36namespace components {
37
38// clang-format off
39
40/// @page pg_cache Caching Component for PostgreSQL
41///
42/// A typical components::PostgreCache usage consists of trait definition:
43///
44/// @snippet cache/postgres_cache_test.cpp Pg Cache Policy Trivial
45///
46/// and registration of the component in components::ComponentList:
47///
48/// @snippet cache/postgres_cache_test.cpp Pg Cache Trivial Usage
49///
50/// See @ref scripts/docs/en/userver/caches.md for introduction into caches.
51///
52///
53/// @section pg_cc_configuration Configuration
54///
55/// components::PostgreCache static configuration file should have a PostgreSQL
56/// component name specified in `pgcomponent` configuration parameter.
57///
58/// Optionally the operation timeouts for cache loading can be specified.
59///
60/// ### Avoiding memory leaks
61/// components::CachingComponentBase
62///
63/// Name | Description | Default value
64/// ---- | ----------- | -------------
65/// full-update-op-timeout | timeout for a full update | 1m
66/// incremental-update-op-timeout | timeout for an incremental update | 1s
67/// update-correction | incremental update window adjustment | - (0 for caches with defined GetLastKnownUpdated)
68/// chunk-size | number of rows to request from PostgreSQL via portals, 0 to fetch all rows in one request without portals | 1000
69///
70/// @section pg_cc_cache_policy Cache policy
71///
72/// Cache policy is the template argument of components::PostgreCache component.
73/// Please see the following code snippet for documentation.
74///
75/// @snippet cache/postgres_cache_test.cpp Pg Cache Policy Example
76///
77/// The query can be a std::string. But due to non-guaranteed order of static
78/// data members initialization, std::string should be returned from a static
79/// member function, please see the following code snippet.
80///
81/// @snippet cache/postgres_cache_test.cpp Pg Cache Policy GetQuery Example
82///
83/// Policy may have static function GetLastKnownUpdated. It should be used
84/// when new entries from database are taken via revision, identifier, or
85/// anything else, but not timestamp of the last update.
86/// If this function is supplied, new entries are taken from db with condition
87/// 'WHERE kUpdatedField > GetLastKnownUpdated(cache_container)'.
88/// Otherwise, condition is
89/// 'WHERE kUpdatedField > last_update - correction_'.
90/// See the following code snippet for an example of usage
91///
92/// @snippet cache/postgres_cache_test.cpp Pg Cache Policy Custom Updated Example
93///
94/// In case one provides a custom CacheContainer within Policy, it is notified
95/// of Update completion via its public member function OnWritesDone, if any.
96/// See the following code snippet for an example of usage:
97///
98/// @snippet cache/postgres_cache_test.cpp Pg Cache Policy Custom Container With Write Notification Example
99///
100/// @section pg_cc_forward_declaration Forward Declaration
101///
102/// To forward declare a cache you can forward declare a trait and
103/// include userver/cache/base_postgres_cache_fwd.hpp header. It is also useful to
104/// forward declare the cache value type.
105///
106/// @snippet cache/postgres_cache_test_fwd.hpp Pg Cache Fwd Example
107///
108/// ----------
109///
110/// @htmlonly <div class="bottom-nav"> @endhtmlonly
111/// ⇦ @ref scripts/docs/en/userver/cache_dumps.md | @ref scripts/docs/en/userver/lru_cache.md ⇨
112/// @htmlonly </div> @endhtmlonly
113
114// clang-format on
115
116namespace pg_cache::detail {
117
118template <typename T>
119using ValueType = typename T::ValueType;
120template <typename T>
121inline constexpr bool kHasValueType = meta::kIsDetected<ValueType, T>;
122
123template <typename T>
124using RawValueTypeImpl = typename T::RawValueType;
125template <typename T>
126inline constexpr bool kHasRawValueType = meta::kIsDetected<RawValueTypeImpl, T>;
127template <typename T>
128using RawValueType = meta::DetectedOr<ValueType<T>, RawValueTypeImpl, T>;
129
130template <typename PostgreCachePolicy>
131auto ExtractValue(RawValueType<PostgreCachePolicy>&& raw) {
132 if constexpr (kHasRawValueType<PostgreCachePolicy>) {
133 return Convert(std::move(raw),
134 formats::parse::To<ValueType<PostgreCachePolicy>>());
135 } else {
136 return std::move(raw);
137 }
138}
139
140// Component name in policy
141template <typename T>
142using HasNameImpl = std::enable_if_t<!std::string_view{T::kName}.empty()>;
143template <typename T>
144inline constexpr bool kHasName = meta::kIsDetected<HasNameImpl, T>;
145
146// Component query in policy
147template <typename T>
148using HasQueryImpl = decltype(T::kQuery);
149template <typename T>
150inline constexpr bool kHasQuery = meta::kIsDetected<HasQueryImpl, T>;
151
152// Component GetQuery in policy
153template <typename T>
154using HasGetQueryImpl = decltype(T::GetQuery());
155template <typename T>
156inline constexpr bool kHasGetQuery = meta::kIsDetected<HasGetQueryImpl, T>;
157
158// Component kWhere in policy
159template <typename T>
160using HasWhere = decltype(T::kWhere);
161template <typename T>
162inline constexpr bool kHasWhere = meta::kIsDetected<HasWhere, T>;
163
164// Update field
165template <typename T>
166using HasUpdatedField = decltype(T::kUpdatedField);
167template <typename T>
168inline constexpr bool kHasUpdatedField = meta::kIsDetected<HasUpdatedField, T>;
169
170template <typename T>
171using WantIncrementalUpdates =
172 std::enable_if_t<!std::string_view{T::kUpdatedField}.empty()>;
173template <typename T>
174inline constexpr bool kWantIncrementalUpdates =
175 meta::kIsDetected<WantIncrementalUpdates, T>;
176
177// Key member in policy
178template <typename T>
179using KeyMemberTypeImpl =
180 std::decay_t<std::invoke_result_t<decltype(T::kKeyMember), ValueType<T>>>;
181template <typename T>
182inline constexpr bool kHasKeyMember = meta::kIsDetected<KeyMemberTypeImpl, T>;
183template <typename T>
184using KeyMemberType = meta::DetectedType<KeyMemberTypeImpl, T>;
185
186// Data container for cache
187template <typename T, typename = USERVER_NAMESPACE::utils::void_t<>>
188struct DataCacheContainer {
189 static_assert(meta::kIsStdHashable<KeyMemberType<T>>,
190 "With default CacheContainer, key type must be std::hash-able");
191
192 using type = std::unordered_map<KeyMemberType<T>, ValueType<T>>;
193};
194
195template <typename T>
196struct DataCacheContainer<
197 T, USERVER_NAMESPACE::utils::void_t<typename T::CacheContainer>> {
198 using type = typename T::CacheContainer;
199};
200
201template <typename T>
202using DataCacheContainerType = typename DataCacheContainer<T>::type;
203
204// We have to whitelist container types, for which we perform by-element
205// copying, because it's not correct for certain custom containers.
206template <typename T>
207inline constexpr bool kIsContainerCopiedByElement =
208 meta::kIsInstantiationOf<std::unordered_map, T> ||
209 meta::kIsInstantiationOf<std::map, T>;
210
211template <typename T>
212std::unique_ptr<T> CopyContainer(
213 const T& container, [[maybe_unused]] std::size_t cpu_relax_iterations,
214 tracing::ScopeTime& scope) {
215 if constexpr (kIsContainerCopiedByElement<T>) {
216 auto copy = std::make_unique<T>();
217 if constexpr (meta::kIsReservable<T>) {
218 copy->reserve(container.size());
219 }
220
221 utils::CpuRelax relax{cpu_relax_iterations, &scope};
222 for (const auto& kv : container) {
223 relax.Relax();
224 copy->insert(kv);
225 }
226 return copy;
227 } else {
228 return std::make_unique<T>(container);
229 }
230}
231
232template <typename Container, typename Value, typename KeyMember,
233 typename... Args>
234void CacheInsertOrAssign(Container& container, Value&& value,
235 const KeyMember& key_member, Args&&... /*args*/) {
236 // Args are only used to de-prioritize this default overload.
237 static_assert(sizeof...(Args) == 0);
238 // Copy 'key' to avoid aliasing issues in 'insert_or_assign'.
239 auto key = std::invoke(key_member, value);
240 container.insert_or_assign(std::move(key), std::forward<Value>(value));
241}
242
243template <typename T>
244using HasOnWritesDoneImpl = decltype(std::declval<T&>().OnWritesDone());
245
246template <typename T>
247void OnWritesDone(T& container) {
248 if constexpr (meta::kIsDetected<HasOnWritesDoneImpl, T>) {
249 container.OnWritesDone();
250 }
251}
252
253template <typename T>
254using HasCustomUpdatedImpl =
255 decltype(T::GetLastKnownUpdated(std::declval<DataCacheContainerType<T>>()));
256
257template <typename T>
258inline constexpr bool kHasCustomUpdated =
259 meta::kIsDetected<HasCustomUpdatedImpl, T>;
260
261template <typename T>
262using UpdatedFieldTypeImpl = typename T::UpdatedFieldType;
263template <typename T>
264inline constexpr bool kHasUpdatedFieldType =
265 meta::kIsDetected<UpdatedFieldTypeImpl, T>;
266template <typename T>
267using UpdatedFieldType =
268 meta::DetectedOr<storages::postgres::TimePointTz, UpdatedFieldTypeImpl, T>;
269
270template <typename T>
271constexpr bool CheckUpdatedFieldType() {
272 if constexpr (kHasUpdatedFieldType<T>) {
273 static_assert(
274 std::is_same_v<typename T::UpdatedFieldType,
275 storages::postgres::TimePointTz> ||
276 std::is_same_v<typename T::UpdatedFieldType,
277 storages::postgres::TimePoint> ||
278 kHasCustomUpdated<T>,
279 "Invalid UpdatedFieldType, must be either TimePointTz or TimePoint");
280 } else {
281 static_assert(!kWantIncrementalUpdates<T>,
282 "UpdatedFieldType must be explicitly specified when using "
283 "incremental updates");
284 }
285 return true;
286}
287
288// Cluster host type policy
289template <typename T>
290using HasClusterHostTypeImpl = decltype(T::kClusterHostType);
291
292template <typename T>
293constexpr storages::postgres::ClusterHostTypeFlags ClusterHostType() {
294 if constexpr (meta::kIsDetected<HasClusterHostTypeImpl, T>) {
295 return T::kClusterHostType;
296 } else {
297 return storages::postgres::ClusterHostType::kSlave;
298 }
299}
300
301// May return null policy
302template <typename T>
303using HasMayReturnNull = decltype(T::kMayReturnNull);
304
305template <typename T>
306constexpr bool MayReturnNull() {
307 if constexpr (meta::kIsDetected<HasMayReturnNull, T>) {
308 return T::kMayReturnNull;
309 } else {
310 return false;
311 }
312}
313
314template <typename PostgreCachePolicy>
315struct PolicyChecker {
316 // Static assertions for cache traits
317 static_assert(
318 kHasName<PostgreCachePolicy>,
319 "The PosgreSQL cache policy must contain a static member `kName`");
320 static_assert(
321 kHasValueType<PostgreCachePolicy>,
322 "The PosgreSQL cache policy must define a type alias `ValueType`");
323 static_assert(
324 kHasKeyMember<PostgreCachePolicy>,
325 "The PostgreSQL cache policy must contain a static member `kKeyMember` "
326 "with a pointer to a data or a function member with the object's key");
327 static_assert(kHasQuery<PostgreCachePolicy> ||
328 kHasGetQuery<PostgreCachePolicy>,
329 "The PosgreSQL cache policy must contain a static data member "
330 "`kQuery` with a select statement or a static member function "
331 "`GetQuery` returning the query");
332 static_assert(!(kHasQuery<PostgreCachePolicy> &&
333 kHasGetQuery<PostgreCachePolicy>),
334 "The PosgreSQL cache policy must define `kQuery` or "
335 "`GetQuery`, not both");
336 static_assert(
337 kHasUpdatedField<PostgreCachePolicy>,
338 "The PosgreSQL cache policy must contain a static member "
339 "`kUpdatedField`. If you don't want to use incremental updates, "
340 "please set its value to `nullptr`");
341 static_assert(CheckUpdatedFieldType<PostgreCachePolicy>());
342
343 static_assert(ClusterHostType<PostgreCachePolicy>() &
344 storages::postgres::kClusterHostRolesMask,
345 "Cluster host role must be specified for caching component, "
346 "please be more specific");
347
348 static storages::postgres::Query GetQuery() {
349 if constexpr (kHasGetQuery<PostgreCachePolicy>) {
350 return PostgreCachePolicy::GetQuery();
351 } else {
352 return PostgreCachePolicy::kQuery;
353 }
354 }
355
356 using BaseType =
357 CachingComponentBase<DataCacheContainerType<PostgreCachePolicy>>;
358};
359
360inline constexpr std::chrono::minutes kDefaultFullUpdateTimeout{1};
361inline constexpr std::chrono::seconds kDefaultIncrementalUpdateTimeout{1};
362inline constexpr std::chrono::milliseconds kStatementTimeoutOff{0};
363inline constexpr std::chrono::milliseconds kCpuRelaxThreshold{10};
364inline constexpr std::chrono::milliseconds kCpuRelaxInterval{2};
365
366inline constexpr std::string_view kCopyStage = "copy_data";
367inline constexpr std::string_view kFetchStage = "fetch";
368inline constexpr std::string_view kParseStage = "parse";
369
370inline constexpr std::size_t kDefaultChunkSize = 1000;
371} // namespace pg_cache::detail
372
373/// @ingroup userver_components
374///
375/// @brief Caching component for PostgreSQL. See @ref pg_cache.
376///
377/// @see @ref pg_cache, @ref scripts/docs/en/userver/caches.md
378template <typename PostgreCachePolicy>
379class PostgreCache final
380 : public pg_cache::detail::PolicyChecker<PostgreCachePolicy>::BaseType {
381 public:
382 // Type aliases
383 using PolicyType = PostgreCachePolicy;
384 using ValueType = pg_cache::detail::ValueType<PolicyType>;
385 using RawValueType = pg_cache::detail::RawValueType<PolicyType>;
386 using DataType = pg_cache::detail::DataCacheContainerType<PolicyType>;
387 using PolicyCheckerType = pg_cache::detail::PolicyChecker<PostgreCachePolicy>;
388 using UpdatedFieldType =
389 pg_cache::detail::UpdatedFieldType<PostgreCachePolicy>;
390 using BaseType = typename PolicyCheckerType::BaseType;
391
392 // Calculated constants
393 constexpr static bool kIncrementalUpdates =
394 pg_cache::detail::kWantIncrementalUpdates<PolicyType>;
395 constexpr static auto kClusterHostTypeFlags =
396 pg_cache::detail::ClusterHostType<PolicyType>();
397 constexpr static auto kName = PolicyType::kName;
398
399 PostgreCache(const ComponentConfig&, const ComponentContext&);
400 ~PostgreCache() override;
401
402 static yaml_config::Schema GetStaticConfigSchema();
403
404 private:
405 using CachedData = std::unique_ptr<DataType>;
406
407 UpdatedFieldType GetLastUpdated(
408 std::chrono::system_clock::time_point last_update,
409 const DataType& cache) const;
410
411 void Update(cache::UpdateType type,
412 const std::chrono::system_clock::time_point& last_update,
413 const std::chrono::system_clock::time_point& now,
414 cache::UpdateStatisticsScope& stats_scope) override;
415
416 bool MayReturnNull() const override;
417
418 CachedData GetDataSnapshot(cache::UpdateType type, tracing::ScopeTime& scope);
419 void CacheResults(storages::postgres::ResultSet res, CachedData& data_cache,
420 cache::UpdateStatisticsScope& stats_scope,
421 tracing::ScopeTime& scope);
422
423 static storages::postgres::Query GetAllQuery();
424 static storages::postgres::Query GetDeltaQuery();
425
426 std::chrono::milliseconds ParseCorrection(const ComponentConfig& config);
427
428 std::vector<storages::postgres::ClusterPtr> clusters_;
429
430 const std::chrono::system_clock::duration correction_;
431 const std::chrono::milliseconds full_update_timeout_;
432 const std::chrono::milliseconds incremental_update_timeout_;
433 const std::size_t chunk_size_;
434 std::size_t cpu_relax_iterations_parse_{0};
435 std::size_t cpu_relax_iterations_copy_{0};
436};
437
438template <typename PostgreCachePolicy>
439inline constexpr bool kHasValidate<PostgreCache<PostgreCachePolicy>> = true;
440
441template <typename PostgreCachePolicy>
442PostgreCache<PostgreCachePolicy>::PostgreCache(const ComponentConfig& config,
443 const ComponentContext& context)
444 : BaseType{config, context},
445 correction_{ParseCorrection(config)},
446 full_update_timeout_{
447 config["full-update-op-timeout"].As<std::chrono::milliseconds>(
448 pg_cache::detail::kDefaultFullUpdateTimeout)},
449 incremental_update_timeout_{
450 config["incremental-update-op-timeout"].As<std::chrono::milliseconds>(
451 pg_cache::detail::kDefaultIncrementalUpdateTimeout)},
452 chunk_size_{config["chunk-size"].As<size_t>(
453 pg_cache::detail::kDefaultChunkSize)} {
456 "Either set 'chunk-size' to 0, or enable PostgreSQL portals by building "
457 "the framework with CMake option USERVER_FEATURE_PATCH_LIBPQ set to ON.");
458
459 if (this->GetAllowedUpdateTypes() ==
460 cache::AllowedUpdateTypes::kFullAndIncremental &&
461 !kIncrementalUpdates) {
462 throw std::logic_error(
463 "Incremental update support is requested in config but no update field "
464 "name is specified in traits of '" +
465 config.Name() + "' cache");
466 }
467 if (correction_.count() < 0) {
468 throw std::logic_error(
469 "Refusing to set forward (negative) update correction requested in "
470 "config for '" +
471 config.Name() + "' cache");
472 }
473
474 const auto pg_alias = config["pgcomponent"].As<std::string>("");
475 if (pg_alias.empty()) {
477 "No `pgcomponent` entry in configuration"};
478 }
479 auto& pg_cluster_comp = context.FindComponent<components::Postgres>(pg_alias);
480 const auto shard_count = pg_cluster_comp.GetShardCount();
481 clusters_.resize(shard_count);
482 for (size_t i = 0; i < shard_count; ++i) {
483 clusters_[i] = pg_cluster_comp.GetClusterForShard(i);
484 }
485
486 LOG_INFO() << "Cache " << kName << " full update query `"
487 << GetAllQuery().Statement() << "` incremental update query `"
488 << GetDeltaQuery().Statement() << "`";
489
490 this->StartPeriodicUpdates();
491}
492
493template <typename PostgreCachePolicy>
494PostgreCache<PostgreCachePolicy>::~PostgreCache() {
495 this->StopPeriodicUpdates();
496}
497
498template <typename PostgreCachePolicy>
499storages::postgres::Query PostgreCache<PostgreCachePolicy>::GetAllQuery() {
500 storages::postgres::Query query = PolicyCheckerType::GetQuery();
501 if constexpr (pg_cache::detail::kHasWhere<PostgreCachePolicy>) {
502 return {fmt::format("{} where {}", query.Statement(),
503 PostgreCachePolicy::kWhere),
504 query.GetName()};
505 } else {
506 return query;
507 }
508}
509
510template <typename PostgreCachePolicy>
511storages::postgres::Query PostgreCache<PostgreCachePolicy>::GetDeltaQuery() {
512 if constexpr (kIncrementalUpdates) {
513 storages::postgres::Query query = PolicyCheckerType::GetQuery();
514
515 if constexpr (pg_cache::detail::kHasWhere<PostgreCachePolicy>) {
516 return {
517 fmt::format("{} where ({}) and {} >= $1", query.Statement(),
518 PostgreCachePolicy::kWhere, PolicyType::kUpdatedField),
519 query.GetName()};
520 } else {
521 return {fmt::format("{} where {} >= $1", query.Statement(),
522 PolicyType::kUpdatedField),
523 query.GetName()};
524 }
525 } else {
526 return GetAllQuery();
527 }
528}
529
530template <typename PostgreCachePolicy>
531std::chrono::milliseconds PostgreCache<PostgreCachePolicy>::ParseCorrection(
532 const ComponentConfig& config) {
533 static constexpr std::string_view kUpdateCorrection = "update-correction";
534 if (pg_cache::detail::kHasCustomUpdated<PostgreCachePolicy> ||
535 this->GetAllowedUpdateTypes() == cache::AllowedUpdateTypes::kOnlyFull) {
536 return config[kUpdateCorrection].As<std::chrono::milliseconds>(0);
537 } else {
538 return config[kUpdateCorrection].As<std::chrono::milliseconds>();
539 }
540}
541
542template <typename PostgreCachePolicy>
543typename PostgreCache<PostgreCachePolicy>::UpdatedFieldType
544PostgreCache<PostgreCachePolicy>::GetLastUpdated(
545 [[maybe_unused]] std::chrono::system_clock::time_point last_update,
546 const DataType& cache) const {
547 if constexpr (pg_cache::detail::kHasCustomUpdated<PostgreCachePolicy>) {
548 return PostgreCachePolicy::GetLastKnownUpdated(cache);
549 } else {
550 return UpdatedFieldType{last_update - correction_};
551 }
552}
553
554template <typename PostgreCachePolicy>
555void PostgreCache<PostgreCachePolicy>::Update(
556 cache::UpdateType type,
557 const std::chrono::system_clock::time_point& last_update,
558 const std::chrono::system_clock::time_point& /*now*/,
559 cache::UpdateStatisticsScope& stats_scope) {
560 namespace pg = storages::postgres;
561 if constexpr (!kIncrementalUpdates) {
562 type = cache::UpdateType::kFull;
563 }
564 const auto query =
565 (type == cache::UpdateType::kFull) ? GetAllQuery() : GetDeltaQuery();
566 const std::chrono::milliseconds timeout = (type == cache::UpdateType::kFull)
567 ? full_update_timeout_
568 : incremental_update_timeout_;
569
570 // COPY current cached data
571 auto scope = tracing::Span::CurrentSpan().CreateScopeTime(
572 std::string{pg_cache::detail::kCopyStage});
573 auto data_cache = GetDataSnapshot(type, scope);
574 [[maybe_unused]] const auto old_size = data_cache->size();
575
576 scope.Reset(std::string{pg_cache::detail::kFetchStage});
577
578 size_t changes = 0;
579 // Iterate clusters
580 for (auto& cluster : clusters_) {
581 if (chunk_size_ > 0) {
582 auto trx = cluster->Begin(
583 kClusterHostTypeFlags, pg::Transaction::RO,
584 pg::CommandControl{timeout, pg_cache::detail::kStatementTimeoutOff});
585 auto portal =
586 trx.MakePortal(query, GetLastUpdated(last_update, *data_cache));
587 while (portal) {
588 scope.Reset(std::string{pg_cache::detail::kFetchStage});
589 auto res = portal.Fetch(chunk_size_);
590 stats_scope.IncreaseDocumentsReadCount(res.Size());
591
592 scope.Reset(std::string{pg_cache::detail::kParseStage});
593 CacheResults(res, data_cache, stats_scope, scope);
594 changes += res.Size();
595 }
596 trx.Commit();
597 } else {
598 bool has_parameter = query.Statement().find('$') != std::string::npos;
599 auto res = has_parameter
600 ? cluster->Execute(
601 kClusterHostTypeFlags,
602 pg::CommandControl{
603 timeout, pg_cache::detail::kStatementTimeoutOff},
604 query, GetLastUpdated(last_update, *data_cache))
605 : cluster->Execute(
606 kClusterHostTypeFlags,
607 pg::CommandControl{
608 timeout, pg_cache::detail::kStatementTimeoutOff},
609 query);
610 stats_scope.IncreaseDocumentsReadCount(res.Size());
611
612 scope.Reset(std::string{pg_cache::detail::kParseStage});
613 CacheResults(res, data_cache, stats_scope, scope);
614 changes += res.Size();
615 }
616 }
617
618 scope.Reset();
619
620 if constexpr (pg_cache::detail::kIsContainerCopiedByElement<DataType>) {
621 if (old_size > 0) {
622 const auto elapsed_copy =
623 scope.ElapsedTotal(std::string{pg_cache::detail::kCopyStage});
624 if (elapsed_copy > pg_cache::detail::kCpuRelaxThreshold) {
625 cpu_relax_iterations_copy_ = static_cast<std::size_t>(
626 static_cast<double>(old_size) /
627 (elapsed_copy / pg_cache::detail::kCpuRelaxInterval));
628 LOG_TRACE() << "Elapsed time for copying " << kName << " "
629 << elapsed_copy.count() << " for " << changes
630 << " data items is over threshold. Will relax CPU every "
631 << cpu_relax_iterations_parse_ << " iterations";
632 }
633 }
634 }
635
636 if (changes > 0) {
637 const auto elapsed_parse =
638 scope.ElapsedTotal(std::string{pg_cache::detail::kParseStage});
639 if (elapsed_parse > pg_cache::detail::kCpuRelaxThreshold) {
640 cpu_relax_iterations_parse_ = static_cast<std::size_t>(
641 static_cast<double>(changes) /
642 (elapsed_parse / pg_cache::detail::kCpuRelaxInterval));
643 LOG_TRACE() << "Elapsed time for parsing " << kName << " "
644 << elapsed_parse.count() << " for " << changes
645 << " data items is over threshold. Will relax CPU every "
646 << cpu_relax_iterations_parse_ << " iterations";
647 }
648 }
649 if (changes > 0 || type == cache::UpdateType::kFull) {
650 // Set current cache
651 stats_scope.Finish(data_cache->size());
652 pg_cache::detail::OnWritesDone(*data_cache);
653 this->Set(std::move(data_cache));
654 } else {
655 stats_scope.FinishNoChanges();
656 }
657}
658
659template <typename PostgreCachePolicy>
660bool PostgreCache<PostgreCachePolicy>::MayReturnNull() const {
661 return pg_cache::detail::MayReturnNull<PolicyType>();
662}
663
664template <typename PostgreCachePolicy>
665void PostgreCache<PostgreCachePolicy>::CacheResults(
666 storages::postgres::ResultSet res, CachedData& data_cache,
667 cache::UpdateStatisticsScope& stats_scope, tracing::ScopeTime& scope) {
668 auto values = res.AsSetOf<RawValueType>(storages::postgres::kRowTag);
669 utils::CpuRelax relax{cpu_relax_iterations_parse_, &scope};
670 for (auto p = values.begin(); p != values.end(); ++p) {
671 relax.Relax();
672 try {
673 using pg_cache::detail::CacheInsertOrAssign;
674 CacheInsertOrAssign(
675 *data_cache, pg_cache::detail::ExtractValue<PostgreCachePolicy>(*p),
676 PostgreCachePolicy::kKeyMember);
677 } catch (const std::exception& e) {
678 stats_scope.IncreaseDocumentsParseFailures(1);
679 LOG_ERROR() << "Error parsing data row in cache '" << kName << "' to '"
680 << compiler::GetTypeName<ValueType>() << "': " << e.what();
681 }
682 }
683}
684
685template <typename PostgreCachePolicy>
686typename PostgreCache<PostgreCachePolicy>::CachedData
687PostgreCache<PostgreCachePolicy>::GetDataSnapshot(cache::UpdateType type,
688 tracing::ScopeTime& scope) {
689 if (type == cache::UpdateType::kIncremental) {
690 auto data = this->Get();
691 if (data) {
692 return pg_cache::detail::CopyContainer(*data, cpu_relax_iterations_copy_,
693 scope);
694 }
695 }
696 return std::make_unique<DataType>();
697}
698
699namespace impl {
700
701std::string GetPostgreCacheSchema();
702
703} // namespace impl
704
705template <typename PostgreCachePolicy>
706yaml_config::Schema PostgreCache<PostgreCachePolicy>::GetStaticConfigSchema() {
707 using ParentType =
708 typename pg_cache::detail::PolicyChecker<PostgreCachePolicy>::BaseType;
709 return yaml_config::MergeSchemas<ParentType>(impl::GetPostgreCacheSchema());
710}
711
712} // namespace components
713
714namespace utils::impl::projected_set {
715
716template <typename Set, typename Value, typename KeyMember>
717void CacheInsertOrAssign(Set& set, Value&& value,
718 const KeyMember& /*key_member*/) {
719 DoInsert(set, std::forward<Value>(value));
720}
721
722} // namespace utils::impl::projected_set
723
724USERVER_NAMESPACE_END