userver: userver/cache/base_postgres_cache.hpp Source File
⚠️ This is the documentation for an old userver version. Click here to switch to the latest version.
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages Concepts
base_postgres_cache.hpp
Go to the documentation of this file.
1#pragma once
2
3/// @file userver/cache/base_postgres_cache.hpp
4/// @brief @copybrief components::PostgreCache
5
6#include <userver/cache/base_postgres_cache_fwd.hpp>
7
8#include <chrono>
9#include <map>
10#include <string_view>
11#include <type_traits>
12#include <unordered_map>
13
14#include <fmt/format.h>
15
16#include <userver/cache/cache_statistics.hpp>
17#include <userver/cache/caching_component_base.hpp>
18#include <userver/components/component_config.hpp>
19#include <userver/components/component_context.hpp>
20
21#include <userver/storages/postgres/cluster.hpp>
22#include <userver/storages/postgres/component.hpp>
23#include <userver/storages/postgres/io/chrono.hpp>
24
25#include <userver/compiler/demangle.hpp>
26#include <userver/logging/log.hpp>
27#include <userver/tracing/span.hpp>
28#include <userver/utils/assert.hpp>
29#include <userver/utils/cpu_relax.hpp>
30#include <userver/utils/meta.hpp>
31#include <userver/utils/void_t.hpp>
32#include <userver/yaml_config/merge_schemas.hpp>
33
34USERVER_NAMESPACE_BEGIN
35
36namespace components {
37
38// clang-format off
39
40/// @page pg_cache Caching Component for PostgreSQL
41///
42/// A typical components::PostgreCache usage consists of trait definition:
43///
44/// @snippet cache/postgres_cache_test.cpp Pg Cache Policy Trivial
45///
46/// and registration of the component in components::ComponentList:
47///
48/// @snippet cache/postgres_cache_test.cpp Pg Cache Trivial Usage
49///
50/// See @ref scripts/docs/en/userver/caches.md for introduction into caches.
51///
52///
53/// @section pg_cc_configuration Configuration
54///
55/// components::PostgreCache static configuration file should have a PostgreSQL
56/// component name specified in `pgcomponent` configuration parameter.
57///
58/// Optionally the operation timeouts for cache loading can be specified.
59///
60/// Name | Description | Default value
61/// ---- | ----------- | -------------
62/// full-update-op-timeout | timeout for a full update | 1m
63/// incremental-update-op-timeout | timeout for an incremental update | 1s
64/// update-correction | incremental update window adjustment | - (0 for caches with defined GetLastKnownUpdated)
65/// chunk-size | number of rows to request from PostgreSQL, 0 to fetch all rows in one request | 1000
66///
67/// @section pg_cc_cache_policy Cache policy
68///
69/// Cache policy is the template argument of components::PostgreCache component.
70/// Please see the following code snippet for documentation.
71///
72/// @snippet cache/postgres_cache_test.cpp Pg Cache Policy Example
73///
74/// The query can be a std::string. But due to non-guaranteed order of static
75/// data members initialization, std::string should be returned from a static
76/// member function, please see the following code snippet.
77///
78/// @snippet cache/postgres_cache_test.cpp Pg Cache Policy GetQuery Example
79///
80/// Policy may have static function GetLastKnownUpdated. It should be used
81/// when new entries from database are taken via revision, identifier, or
82/// anything else, but not timestamp of the last update.
83/// If this function is supplied, new entries are taken from db with condition
84/// 'WHERE kUpdatedField > GetLastKnownUpdated(cache_container)'.
85/// Otherwise, condition is
86/// 'WHERE kUpdatedField > last_update - correction_'.
87/// See the following code snippet for an example of usage
88///
89/// @snippet cache/postgres_cache_test.cpp Pg Cache Policy Custom Updated Example
90///
91/// In case one provides a custom CacheContainer within Policy, it is notified
92/// of Update completion via its public member function OnWritesDone, if any.
93/// See the following code snippet for an example of usage:
94///
95/// @snippet cache/postgres_cache_test.cpp Pg Cache Policy Custom Container With Write Notification Example
96///
97/// @section pg_cc_forward_declaration Forward Declaration
98///
99/// To forward declare a cache you can forward declare a trait and
100/// include userver/cache/base_postgres_cache_fwd.hpp header. It is also useful to
101/// forward declare the cache value type.
102///
103/// @snippet cache/postgres_cache_test_fwd.hpp Pg Cache Fwd Example
104///
105/// ----------
106///
107/// @htmlonly <div class="bottom-nav"> @endhtmlonly
108/// ⇦ @ref scripts/docs/en/userver/cache_dumps.md | @ref scripts/docs/en/userver/lru_cache.md ⇨
109/// @htmlonly </div> @endhtmlonly
110
111// clang-format on
112
113namespace pg_cache::detail {
114
115template <typename T>
116using ValueType = typename T::ValueType;
117template <typename T>
118inline constexpr bool kHasValueType = meta::kIsDetected<ValueType, T>;
119
120template <typename T>
121using RawValueTypeImpl = typename T::RawValueType;
122template <typename T>
123inline constexpr bool kHasRawValueType = meta::kIsDetected<RawValueTypeImpl, T>;
124template <typename T>
125using RawValueType = meta::DetectedOr<ValueType<T>, RawValueTypeImpl, T>;
126
127template <typename PostgreCachePolicy>
128auto ExtractValue(RawValueType<PostgreCachePolicy>&& raw) {
129 if constexpr (kHasRawValueType<PostgreCachePolicy>) {
130 return Convert(std::move(raw),
131 formats::parse::To<ValueType<PostgreCachePolicy>>());
132 } else {
133 return std::move(raw);
134 }
135}
136
137// Component name in policy
138template <typename T>
139using HasNameImpl = std::enable_if_t<!std::string_view{T::kName}.empty()>;
140template <typename T>
141inline constexpr bool kHasName = meta::kIsDetected<HasNameImpl, T>;
142
143// Component query in policy
144template <typename T>
145using HasQueryImpl = decltype(T::kQuery);
146template <typename T>
147inline constexpr bool kHasQuery = meta::kIsDetected<HasQueryImpl, T>;
148
149// Component GetQuery in policy
150template <typename T>
151using HasGetQueryImpl = decltype(T::GetQuery());
152template <typename T>
153inline constexpr bool kHasGetQuery = meta::kIsDetected<HasGetQueryImpl, T>;
154
155// Component kWhere in policy
156template <typename T>
157using HasWhere = decltype(T::kWhere);
158template <typename T>
159inline constexpr bool kHasWhere = meta::kIsDetected<HasWhere, T>;
160
161// Update field
162template <typename T>
163using HasUpdatedField = decltype(T::kUpdatedField);
164template <typename T>
165inline constexpr bool kHasUpdatedField = meta::kIsDetected<HasUpdatedField, T>;
166
167template <typename T>
168using WantIncrementalUpdates =
169 std::enable_if_t<!std::string_view{T::kUpdatedField}.empty()>;
170template <typename T>
171inline constexpr bool kWantIncrementalUpdates =
172 meta::kIsDetected<WantIncrementalUpdates, T>;
173
174// Key member in policy
175template <typename T>
176using KeyMemberTypeImpl =
177 std::decay_t<std::invoke_result_t<decltype(T::kKeyMember), ValueType<T>>>;
178template <typename T>
179inline constexpr bool kHasKeyMember = meta::kIsDetected<KeyMemberTypeImpl, T>;
180template <typename T>
181using KeyMemberType = meta::DetectedType<KeyMemberTypeImpl, T>;
182
183// Data container for cache
184template <typename T, typename = USERVER_NAMESPACE::utils::void_t<>>
185struct DataCacheContainer {
186 static_assert(meta::kIsStdHashable<KeyMemberType<T>>,
187 "With default CacheContainer, key type must be std::hash-able");
188
189 using type = std::unordered_map<KeyMemberType<T>, ValueType<T>>;
190};
191
192template <typename T>
193struct DataCacheContainer<
194 T, USERVER_NAMESPACE::utils::void_t<typename T::CacheContainer>> {
195 using type = typename T::CacheContainer;
196};
197
198template <typename T>
199using DataCacheContainerType = typename DataCacheContainer<T>::type;
200
201// We have to whitelist container types, for which we perform by-element
202// copying, because it's not correct for certain custom containers.
203template <typename T>
204inline constexpr bool kIsContainerCopiedByElement =
205 meta::kIsInstantiationOf<std::unordered_map, T> ||
206 meta::kIsInstantiationOf<std::map, T>;
207
208template <typename T>
209std::unique_ptr<T> CopyContainer(
210 const T& container, [[maybe_unused]] std::size_t cpu_relax_iterations,
211 tracing::ScopeTime& scope) {
212 if constexpr (kIsContainerCopiedByElement<T>) {
213 auto copy = std::make_unique<T>();
214 if constexpr (meta::kIsReservable<T>) {
215 copy->reserve(container.size());
216 }
217
218 utils::CpuRelax relax{cpu_relax_iterations, &scope};
219 for (const auto& kv : container) {
220 relax.Relax();
221 copy->insert(kv);
222 }
223 return copy;
224 } else {
225 return std::make_unique<T>(container);
226 }
227}
228
229template <typename Container, typename Value, typename KeyMember,
230 typename... Args>
231void CacheInsertOrAssign(Container& container, Value&& value,
232 const KeyMember& key_member, Args&&... /*args*/) {
233 // Args are only used to de-prioritize this default overload.
234 static_assert(sizeof...(Args) == 0);
235 // Copy 'key' to avoid aliasing issues in 'insert_or_assign'.
236 auto key = std::invoke(key_member, value);
237 container.insert_or_assign(std::move(key), std::forward<Value>(value));
238}
239
240template <typename T>
241using HasOnWritesDoneImpl = decltype(std::declval<T&>().OnWritesDone());
242
243template <typename T>
244void OnWritesDone(T& container) {
245 if constexpr (meta::kIsDetected<HasOnWritesDoneImpl, T>) {
246 container.OnWritesDone();
247 }
248}
249
250template <typename T>
251using HasCustomUpdatedImpl =
252 decltype(T::GetLastKnownUpdated(std::declval<DataCacheContainerType<T>>()));
253
254template <typename T>
255inline constexpr bool kHasCustomUpdated =
256 meta::kIsDetected<HasCustomUpdatedImpl, T>;
257
258template <typename T>
259using UpdatedFieldTypeImpl = typename T::UpdatedFieldType;
260template <typename T>
261inline constexpr bool kHasUpdatedFieldType =
262 meta::kIsDetected<UpdatedFieldTypeImpl, T>;
263template <typename T>
264using UpdatedFieldType =
265 meta::DetectedOr<storages::postgres::TimePointTz, UpdatedFieldTypeImpl, T>;
266
267template <typename T>
268constexpr bool CheckUpdatedFieldType() {
269 if constexpr (kHasUpdatedFieldType<T>) {
270 static_assert(
271 std::is_same_v<typename T::UpdatedFieldType,
272 storages::postgres::TimePointTz> ||
273 std::is_same_v<typename T::UpdatedFieldType,
274 storages::postgres::TimePoint> ||
275 kHasCustomUpdated<T>,
276 "Invalid UpdatedFieldType, must be either TimePointTz or TimePoint");
277 } else {
278 static_assert(!kWantIncrementalUpdates<T>,
279 "UpdatedFieldType must be explicitly specified when using "
280 "incremental updates");
281 }
282 return true;
283}
284
285// Cluster host type policy
286template <typename T>
287using HasClusterHostTypeImpl = decltype(T::kClusterHostType);
288
289template <typename T>
290constexpr storages::postgres::ClusterHostTypeFlags ClusterHostType() {
291 if constexpr (meta::kIsDetected<HasClusterHostTypeImpl, T>) {
292 return T::kClusterHostType;
293 } else {
294 return storages::postgres::ClusterHostType::kSlave;
295 }
296}
297
298// May return null policy
299template <typename T>
300using HasMayReturnNull = decltype(T::kMayReturnNull);
301
302template <typename T>
303constexpr bool MayReturnNull() {
304 if constexpr (meta::kIsDetected<HasMayReturnNull, T>) {
305 return T::kMayReturnNull;
306 } else {
307 return false;
308 }
309}
310
311template <typename PostgreCachePolicy>
312struct PolicyChecker {
313 // Static assertions for cache traits
314 static_assert(
315 kHasName<PostgreCachePolicy>,
316 "The PosgreSQL cache policy must contain a static member `kName`");
317 static_assert(
318 kHasValueType<PostgreCachePolicy>,
319 "The PosgreSQL cache policy must define a type alias `ValueType`");
320 static_assert(
321 kHasKeyMember<PostgreCachePolicy>,
322 "The PostgreSQL cache policy must contain a static member `kKeyMember` "
323 "with a pointer to a data or a function member with the object's key");
324 static_assert(kHasQuery<PostgreCachePolicy> ||
325 kHasGetQuery<PostgreCachePolicy>,
326 "The PosgreSQL cache policy must contain a static data member "
327 "`kQuery` with a select statement or a static member function "
328 "`GetQuery` returning the query");
329 static_assert(!(kHasQuery<PostgreCachePolicy> &&
330 kHasGetQuery<PostgreCachePolicy>),
331 "The PosgreSQL cache policy must define `kQuery` or "
332 "`GetQuery`, not both");
333 static_assert(
334 kHasUpdatedField<PostgreCachePolicy>,
335 "The PosgreSQL cache policy must contain a static member "
336 "`kUpdatedField`. If you don't want to use incremental updates, "
337 "please set its value to `nullptr`");
338 static_assert(CheckUpdatedFieldType<PostgreCachePolicy>());
339
340 static_assert(ClusterHostType<PostgreCachePolicy>() &
341 storages::postgres::kClusterHostRolesMask,
342 "Cluster host role must be specified for caching component, "
343 "please be more specific");
344
345 static storages::postgres::Query GetQuery() {
346 if constexpr (kHasGetQuery<PostgreCachePolicy>) {
347 return PostgreCachePolicy::GetQuery();
348 } else {
349 return PostgreCachePolicy::kQuery;
350 }
351 }
352
353 using BaseType =
354 CachingComponentBase<DataCacheContainerType<PostgreCachePolicy>>;
355};
356
357inline constexpr std::chrono::minutes kDefaultFullUpdateTimeout{1};
358inline constexpr std::chrono::seconds kDefaultIncrementalUpdateTimeout{1};
359inline constexpr std::chrono::milliseconds kStatementTimeoutOff{0};
360inline constexpr std::chrono::milliseconds kCpuRelaxThreshold{10};
361inline constexpr std::chrono::milliseconds kCpuRelaxInterval{2};
362
363inline constexpr std::string_view kCopyStage = "copy_data";
364inline constexpr std::string_view kFetchStage = "fetch";
365inline constexpr std::string_view kParseStage = "parse";
366
367inline constexpr std::size_t kDefaultChunkSize = 1000;
368} // namespace pg_cache::detail
369
370/// @ingroup userver_components
371///
372/// @brief Caching component for PostgreSQL. See @ref pg_cache.
373///
374/// @see @ref pg_cache, @ref scripts/docs/en/userver/caches.md
375template <typename PostgreCachePolicy>
376class PostgreCache final
377 : public pg_cache::detail::PolicyChecker<PostgreCachePolicy>::BaseType {
378 public:
379 // Type aliases
380 using PolicyType = PostgreCachePolicy;
381 using ValueType = pg_cache::detail::ValueType<PolicyType>;
382 using RawValueType = pg_cache::detail::RawValueType<PolicyType>;
383 using DataType = pg_cache::detail::DataCacheContainerType<PolicyType>;
384 using PolicyCheckerType = pg_cache::detail::PolicyChecker<PostgreCachePolicy>;
385 using UpdatedFieldType =
386 pg_cache::detail::UpdatedFieldType<PostgreCachePolicy>;
387 using BaseType = typename PolicyCheckerType::BaseType;
388
389 // Calculated constants
390 constexpr static bool kIncrementalUpdates =
391 pg_cache::detail::kWantIncrementalUpdates<PolicyType>;
392 constexpr static auto kClusterHostTypeFlags =
393 pg_cache::detail::ClusterHostType<PolicyType>();
394 constexpr static auto kName = PolicyType::kName;
395
396 PostgreCache(const ComponentConfig&, const ComponentContext&);
397 ~PostgreCache() override;
398
399 static yaml_config::Schema GetStaticConfigSchema();
400
401 private:
402 using CachedData = std::unique_ptr<DataType>;
403
404 UpdatedFieldType GetLastUpdated(
405 std::chrono::system_clock::time_point last_update,
406 const DataType& cache) const;
407
408 void Update(cache::UpdateType type,
409 const std::chrono::system_clock::time_point& last_update,
410 const std::chrono::system_clock::time_point& now,
411 cache::UpdateStatisticsScope& stats_scope) override;
412
413 bool MayReturnNull() const override;
414
415 CachedData GetDataSnapshot(cache::UpdateType type, tracing::ScopeTime& scope);
416 void CacheResults(storages::postgres::ResultSet res, CachedData& data_cache,
417 cache::UpdateStatisticsScope& stats_scope,
418 tracing::ScopeTime& scope);
419
420 static storages::postgres::Query GetAllQuery();
421 static storages::postgres::Query GetDeltaQuery();
422
423 std::chrono::milliseconds ParseCorrection(const ComponentConfig& config);
424
425 std::vector<storages::postgres::ClusterPtr> clusters_;
426
427 const std::chrono::system_clock::duration correction_;
428 const std::chrono::milliseconds full_update_timeout_;
429 const std::chrono::milliseconds incremental_update_timeout_;
430 const std::size_t chunk_size_;
431 std::size_t cpu_relax_iterations_parse_{0};
432 std::size_t cpu_relax_iterations_copy_{0};
433};
434
435template <typename PostgreCachePolicy>
436inline constexpr bool kHasValidate<PostgreCache<PostgreCachePolicy>> = true;
437
438template <typename PostgreCachePolicy>
439PostgreCache<PostgreCachePolicy>::PostgreCache(const ComponentConfig& config,
440 const ComponentContext& context)
441 : BaseType{config, context},
442 correction_{ParseCorrection(config)},
443 full_update_timeout_{
444 config["full-update-op-timeout"].As<std::chrono::milliseconds>(
445 pg_cache::detail::kDefaultFullUpdateTimeout)},
446 incremental_update_timeout_{
447 config["incremental-update-op-timeout"].As<std::chrono::milliseconds>(
448 pg_cache::detail::kDefaultIncrementalUpdateTimeout)},
449 chunk_size_{config["chunk-size"].As<size_t>(
450 pg_cache::detail::kDefaultChunkSize)} {
451 if (this->GetAllowedUpdateTypes() ==
452 cache::AllowedUpdateTypes::kFullAndIncremental &&
453 !kIncrementalUpdates) {
454 throw std::logic_error(
455 "Incremental update support is requested in config but no update field "
456 "name is specified in traits of '" +
457 config.Name() + "' cache");
458 }
459 if (correction_.count() < 0) {
460 throw std::logic_error(
461 "Refusing to set forward (negative) update correction requested in "
462 "config for '" +
463 config.Name() + "' cache");
464 }
465
466 const auto pg_alias = config["pgcomponent"].As<std::string>("");
467 if (pg_alias.empty()) {
468 throw storages::postgres::InvalidConfig{
469 "No `pgcomponent` entry in configuration"};
470 }
471 auto& pg_cluster_comp = context.FindComponent<components::Postgres>(pg_alias);
472 const auto shard_count = pg_cluster_comp.GetShardCount();
473 clusters_.resize(shard_count);
474 for (size_t i = 0; i < shard_count; ++i) {
475 clusters_[i] = pg_cluster_comp.GetClusterForShard(i);
476 }
477
478 LOG_INFO() << "Cache " << kName << " full update query `"
479 << GetAllQuery().Statement() << "` incremental update query `"
480 << GetDeltaQuery().Statement() << "`";
481
482 this->StartPeriodicUpdates();
483}
484
485template <typename PostgreCachePolicy>
486PostgreCache<PostgreCachePolicy>::~PostgreCache() {
487 this->StopPeriodicUpdates();
488}
489
490template <typename PostgreCachePolicy>
491storages::postgres::Query PostgreCache<PostgreCachePolicy>::GetAllQuery() {
492 storages::postgres::Query query = PolicyCheckerType::GetQuery();
493 if constexpr (pg_cache::detail::kHasWhere<PostgreCachePolicy>) {
494 return {fmt::format("{} where {}", query.Statement(),
495 PostgreCachePolicy::kWhere),
496 query.GetName()};
497 } else {
498 return query;
499 }
500}
501
502template <typename PostgreCachePolicy>
503storages::postgres::Query PostgreCache<PostgreCachePolicy>::GetDeltaQuery() {
504 if constexpr (kIncrementalUpdates) {
505 storages::postgres::Query query = PolicyCheckerType::GetQuery();
506
507 if constexpr (pg_cache::detail::kHasWhere<PostgreCachePolicy>) {
508 return {
509 fmt::format("{} where ({}) and {} >= $1", query.Statement(),
510 PostgreCachePolicy::kWhere, PolicyType::kUpdatedField),
511 query.GetName()};
512 } else {
513 return {fmt::format("{} where {} >= $1", query.Statement(),
514 PolicyType::kUpdatedField),
515 query.GetName()};
516 }
517 } else {
518 return GetAllQuery();
519 }
520}
521
522template <typename PostgreCachePolicy>
523std::chrono::milliseconds PostgreCache<PostgreCachePolicy>::ParseCorrection(
524 const ComponentConfig& config) {
525 static constexpr std::string_view kUpdateCorrection = "update-correction";
526 if (pg_cache::detail::kHasCustomUpdated<PostgreCachePolicy> ||
527 this->GetAllowedUpdateTypes() == cache::AllowedUpdateTypes::kOnlyFull) {
528 return config[kUpdateCorrection].As<std::chrono::milliseconds>(0);
529 } else {
530 return config[kUpdateCorrection].As<std::chrono::milliseconds>();
531 }
532}
533
534template <typename PostgreCachePolicy>
535typename PostgreCache<PostgreCachePolicy>::UpdatedFieldType
536PostgreCache<PostgreCachePolicy>::GetLastUpdated(
537 [[maybe_unused]] std::chrono::system_clock::time_point last_update,
538 const DataType& cache) const {
539 if constexpr (pg_cache::detail::kHasCustomUpdated<PostgreCachePolicy>) {
540 return PostgreCachePolicy::GetLastKnownUpdated(cache);
541 } else {
542 return UpdatedFieldType{last_update - correction_};
543 }
544}
545
546template <typename PostgreCachePolicy>
547void PostgreCache<PostgreCachePolicy>::Update(
548 cache::UpdateType type,
549 const std::chrono::system_clock::time_point& last_update,
550 const std::chrono::system_clock::time_point& /*now*/,
551 cache::UpdateStatisticsScope& stats_scope) {
552 namespace pg = storages::postgres;
553 if constexpr (!kIncrementalUpdates) {
554 type = cache::UpdateType::kFull;
555 }
556 const auto query =
557 (type == cache::UpdateType::kFull) ? GetAllQuery() : GetDeltaQuery();
558 const std::chrono::milliseconds timeout = (type == cache::UpdateType::kFull)
559 ? full_update_timeout_
560 : incremental_update_timeout_;
561
562 // COPY current cached data
563 auto scope = tracing::Span::CurrentSpan().CreateScopeTime(
564 std::string{pg_cache::detail::kCopyStage});
565 auto data_cache = GetDataSnapshot(type, scope);
566 [[maybe_unused]] const auto old_size = data_cache->size();
567
568 scope.Reset(std::string{pg_cache::detail::kFetchStage});
569
570 size_t changes = 0;
571 // Iterate clusters
572 for (auto& cluster : clusters_) {
573 if (chunk_size_ > 0) {
574 auto trx = cluster->Begin(
575 kClusterHostTypeFlags, pg::Transaction::RO,
576 pg::CommandControl{timeout, pg_cache::detail::kStatementTimeoutOff});
577 auto portal =
578 trx.MakePortal(query, GetLastUpdated(last_update, *data_cache));
579 while (portal) {
580 scope.Reset(std::string{pg_cache::detail::kFetchStage});
581 auto res = portal.Fetch(chunk_size_);
582 stats_scope.IncreaseDocumentsReadCount(res.Size());
583
584 scope.Reset(std::string{pg_cache::detail::kParseStage});
585 CacheResults(res, data_cache, stats_scope, scope);
586 changes += res.Size();
587 }
588 trx.Commit();
589 } else {
590 bool has_parameter = query.Statement().find('$') != std::string::npos;
591 auto res = has_parameter
592 ? cluster->Execute(
593 kClusterHostTypeFlags,
594 pg::CommandControl{
595 timeout, pg_cache::detail::kStatementTimeoutOff},
596 query, GetLastUpdated(last_update, *data_cache))
597 : cluster->Execute(
598 kClusterHostTypeFlags,
599 pg::CommandControl{
600 timeout, pg_cache::detail::kStatementTimeoutOff},
601 query);
602 stats_scope.IncreaseDocumentsReadCount(res.Size());
603
604 scope.Reset(std::string{pg_cache::detail::kParseStage});
605 CacheResults(res, data_cache, stats_scope, scope);
606 changes += res.Size();
607 }
608 }
609
610 scope.Reset();
611
612 if constexpr (pg_cache::detail::kIsContainerCopiedByElement<DataType>) {
613 if (old_size > 0) {
614 const auto elapsed_copy =
615 scope.ElapsedTotal(std::string{pg_cache::detail::kCopyStage});
616 if (elapsed_copy > pg_cache::detail::kCpuRelaxThreshold) {
617 cpu_relax_iterations_copy_ = static_cast<std::size_t>(
618 static_cast<double>(old_size) /
619 (elapsed_copy / pg_cache::detail::kCpuRelaxInterval));
620 LOG_TRACE() << "Elapsed time for copying " << kName << " "
621 << elapsed_copy.count() << " for " << changes
622 << " data items is over threshold. Will relax CPU every "
623 << cpu_relax_iterations_parse_ << " iterations";
624 }
625 }
626 }
627
628 if (changes > 0) {
629 const auto elapsed_parse =
630 scope.ElapsedTotal(std::string{pg_cache::detail::kParseStage});
631 if (elapsed_parse > pg_cache::detail::kCpuRelaxThreshold) {
632 cpu_relax_iterations_parse_ = static_cast<std::size_t>(
633 static_cast<double>(changes) /
634 (elapsed_parse / pg_cache::detail::kCpuRelaxInterval));
635 LOG_TRACE() << "Elapsed time for parsing " << kName << " "
636 << elapsed_parse.count() << " for " << changes
637 << " data items is over threshold. Will relax CPU every "
638 << cpu_relax_iterations_parse_ << " iterations";
639 }
640 }
641 if (changes > 0 || type == cache::UpdateType::kFull) {
642 // Set current cache
643 stats_scope.Finish(data_cache->size());
644 pg_cache::detail::OnWritesDone(*data_cache);
645 this->Set(std::move(data_cache));
646 } else {
647 stats_scope.FinishNoChanges();
648 }
649}
650
651template <typename PostgreCachePolicy>
652bool PostgreCache<PostgreCachePolicy>::MayReturnNull() const {
653 return pg_cache::detail::MayReturnNull<PolicyType>();
654}
655
656template <typename PostgreCachePolicy>
657void PostgreCache<PostgreCachePolicy>::CacheResults(
658 storages::postgres::ResultSet res, CachedData& data_cache,
659 cache::UpdateStatisticsScope& stats_scope, tracing::ScopeTime& scope) {
660 auto values = res.AsSetOf<RawValueType>(storages::postgres::kRowTag);
661 utils::CpuRelax relax{cpu_relax_iterations_parse_, &scope};
662 for (auto p = values.begin(); p != values.end(); ++p) {
663 relax.Relax();
664 try {
665 using pg_cache::detail::CacheInsertOrAssign;
666 CacheInsertOrAssign(
667 *data_cache, pg_cache::detail::ExtractValue<PostgreCachePolicy>(*p),
668 PostgreCachePolicy::kKeyMember);
669 } catch (const std::exception& e) {
670 stats_scope.IncreaseDocumentsParseFailures(1);
671 LOG_ERROR() << "Error parsing data row in cache '" << kName << "' to '"
672 << compiler::GetTypeName<ValueType>() << "': " << e.what();
673 }
674 }
675}
676
677template <typename PostgreCachePolicy>
678typename PostgreCache<PostgreCachePolicy>::CachedData
679PostgreCache<PostgreCachePolicy>::GetDataSnapshot(cache::UpdateType type,
680 tracing::ScopeTime& scope) {
681 if (type == cache::UpdateType::kIncremental) {
682 auto data = this->Get();
683 if (data) {
684 return pg_cache::detail::CopyContainer(*data, cpu_relax_iterations_copy_,
685 scope);
686 }
687 }
688 return std::make_unique<DataType>();
689}
690
691namespace impl {
692
693std::string GetPostgreCacheSchema();
694
695} // namespace impl
696
697template <typename PostgreCachePolicy>
698yaml_config::Schema PostgreCache<PostgreCachePolicy>::GetStaticConfigSchema() {
699 using ParentType =
700 typename pg_cache::detail::PolicyChecker<PostgreCachePolicy>::BaseType;
701 return yaml_config::MergeSchemas<ParentType>(impl::GetPostgreCacheSchema());
702}
703
704} // namespace components
705
706namespace utils::impl::projected_set {
707
708template <typename Set, typename Value, typename KeyMember>
709void CacheInsertOrAssign(Set& set, Value&& value,
710 const KeyMember& /*key_member*/) {
711 DoInsert(set, std::forward<Value>(value));
712}
713
714} // namespace utils::impl::projected_set
715
716USERVER_NAMESPACE_END