userver: userver/cache/base_postgres_cache.hpp Source File
Loading...
Searching...
No Matches
base_postgres_cache.hpp
Go to the documentation of this file.
1#pragma once
2
3/// @file userver/cache/base_postgres_cache.hpp
4/// @brief @copybrief components::PostgreCache
5
6#include <userver/cache/base_postgres_cache_fwd.hpp>
7
8#include <chrono>
9#include <map>
10#include <string_view>
11#include <type_traits>
12#include <unordered_map>
13
14#include <fmt/format.h>
15
16#include <userver/cache/cache_statistics.hpp>
17#include <userver/cache/caching_component_base.hpp>
18#include <userver/components/component_config.hpp>
19#include <userver/components/component_context.hpp>
20
21#include <userver/storages/postgres/cluster.hpp>
22#include <userver/storages/postgres/component.hpp>
23#include <userver/storages/postgres/io/chrono.hpp>
24
25#include <userver/compiler/demangle.hpp>
26#include <userver/logging/log.hpp>
27#include <userver/tracing/span.hpp>
28#include <userver/utils/assert.hpp>
29#include <userver/utils/cpu_relax.hpp>
30#include <userver/utils/meta.hpp>
31#include <userver/utils/void_t.hpp>
32#include <userver/yaml_config/merge_schemas.hpp>
33
34USERVER_NAMESPACE_BEGIN
35
36namespace components {
37
38// clang-format off
39
40/// @page pg_cache Caching Component for PostgreSQL
41///
42/// A typical components::PostgreCache usage consists of trait definition:
43///
44/// @snippet cache/postgres_cache_test.cpp Pg Cache Policy Trivial
45///
46/// and registration of the component in components::ComponentList:
47///
48/// @snippet cache/postgres_cache_test.cpp Pg Cache Trivial Usage
49///
50/// See @ref scripts/docs/en/userver/caches.md for introduction into caches.
51///
52///
53/// @section pg_cc_configuration Configuration
54///
55/// components::PostgreCache static configuration file should have a PostgreSQL
56/// component name specified in `pgcomponent` configuration parameter.
57///
58/// Optionally the operation timeouts for cache loading can be specified.
59///
60/// Name | Description | Default value
61/// ---- | ----------- | -------------
62/// full-update-op-timeout | timeout for a full update | 1m
63/// incremental-update-op-timeout | timeout for an incremental update | 1s
64/// update-correction | incremental update window adjustment | - (0 for caches with defined GetLastKnownUpdated)
65/// chunk-size | number of rows to request from PostgreSQL, 0 to fetch all rows in one request | 1000
66///
67/// @section pg_cc_cache_policy Cache policy
68///
69/// Cache policy is the template argument of components::PostgreCache component.
70/// Please see the following code snippet for documentation.
71///
72/// @snippet cache/postgres_cache_test.cpp Pg Cache Policy Example
73///
74/// The query can be a std::string. But due to non-guaranteed order of static
75/// data members initialization, std::string should be returned from a static
76/// member function, please see the following code snippet.
77///
78/// @snippet cache/postgres_cache_test.cpp Pg Cache Policy GetQuery Example
79///
80/// Policy may have static function GetLastKnownUpdated. It should be used
81/// when new entries from database are taken via revision, identifier, or
82/// anything else, but not timestamp of the last update.
83/// If this function is supplied, new entries are taken from db with condition
84/// 'WHERE kUpdatedField > GetLastKnownUpdated(cache_container)'.
85/// Otherwise, condition is
86/// 'WHERE kUpdatedField > last_update - correction_'.
87/// See the following code snippet for an example of usage
88///
89/// @snippet cache/postgres_cache_test.cpp Pg Cache Policy Custom Updated Example
90///
91/// In case one provides a custom CacheContainer within Policy, it is notified
92/// of Update completion via its public member function OnWritesDone, if any.
93/// See the following code snippet for an example of usage:
94///
95/// @snippet cache/postgres_cache_test.cpp Pg Cache Policy Custom Container With Write Notification Example
96///
97/// @section pg_cc_forward_declaration Forward Declaration
98///
99/// To forward declare a cache you can forward declare a trait and
100/// include userver/cache/base_postgres_cache_fwd.hpp header. It is also useful to
101/// forward declare the cache value type.
102///
103/// @snippet cache/postgres_cache_test_fwd.hpp Pg Cache Fwd Example
104///
105/// ----------
106///
107/// @htmlonly <div class="bottom-nav"> @endhtmlonly
108/// ⇦ @ref scripts/docs/en/userver/cache_dumps.md | @ref scripts/docs/en/userver/lru_cache.md ⇨
109/// @htmlonly </div> @endhtmlonly
110
111// clang-format on
112
113namespace pg_cache::detail {
114
115template <typename T>
116using ValueType = typename T::ValueType;
117template <typename T>
118inline constexpr bool kHasValueType = meta::kIsDetected<ValueType, T>;
119
120template <typename T>
121using RawValueTypeImpl = typename T::RawValueType;
122template <typename T>
123inline constexpr bool kHasRawValueType = meta::kIsDetected<RawValueTypeImpl, T>;
124template <typename T>
125using RawValueType = meta::DetectedOr<ValueType<T>, RawValueTypeImpl, T>;
126
127template <typename PostgreCachePolicy>
128auto ExtractValue(RawValueType<PostgreCachePolicy>&& raw) {
129 if constexpr (kHasRawValueType<PostgreCachePolicy>) {
130 return Convert(std::move(raw),
131 formats::parse::To<ValueType<PostgreCachePolicy>>());
132 } else {
133 return std::move(raw);
134 }
135}
136
137// Component name in policy
138template <typename T>
139using HasNameImpl = std::enable_if_t<!std::string_view{T::kName}.empty()>;
140template <typename T>
141inline constexpr bool kHasName = meta::kIsDetected<HasNameImpl, T>;
142
143// Component query in policy
144template <typename T>
145using HasQueryImpl = decltype(T::kQuery);
146template <typename T>
147inline constexpr bool kHasQuery = meta::kIsDetected<HasQueryImpl, T>;
148
149// Component GetQuery in policy
150template <typename T>
151using HasGetQueryImpl = decltype(T::GetQuery());
152template <typename T>
153inline constexpr bool kHasGetQuery = meta::kIsDetected<HasGetQueryImpl, T>;
154
155// Component kWhere in policy
156template <typename T>
157using HasWhere = decltype(T::kWhere);
158template <typename T>
159inline constexpr bool kHasWhere = meta::kIsDetected<HasWhere, T>;
160
161// Update field
162template <typename T>
163using HasUpdatedField = decltype(T::kUpdatedField);
164template <typename T>
165inline constexpr bool kHasUpdatedField = meta::kIsDetected<HasUpdatedField, T>;
166
167template <typename T>
168using WantIncrementalUpdates =
169 std::enable_if_t<!std::string_view{T::kUpdatedField}.empty()>;
170template <typename T>
171inline constexpr bool kWantIncrementalUpdates =
172 meta::kIsDetected<WantIncrementalUpdates, T>;
173
174// Key member in policy
175template <typename T>
176using KeyMemberTypeImpl =
177 std::decay_t<std::invoke_result_t<decltype(T::kKeyMember), ValueType<T>>>;
178template <typename T>
179inline constexpr bool kHasKeyMember = meta::kIsDetected<KeyMemberTypeImpl, T>;
180template <typename T>
181using KeyMemberType = meta::DetectedType<KeyMemberTypeImpl, T>;
182
183// Data container for cache
184template <typename T, typename = USERVER_NAMESPACE::utils::void_t<>>
185struct DataCacheContainer {
186 static_assert(meta::kIsStdHashable<KeyMemberType<T>>,
187 "With default CacheContainer, key type must be std::hash-able");
188
189 using type = std::unordered_map<KeyMemberType<T>, ValueType<T>>;
190};
191
192template <typename T>
193struct DataCacheContainer<
194 T, USERVER_NAMESPACE::utils::void_t<typename T::CacheContainer>> {
195 using type = typename T::CacheContainer;
196};
197
198template <typename T>
199using DataCacheContainerType = typename DataCacheContainer<T>::type;
200
201// We have to whitelist container types, for which we perform by-element
202// copying, because it's not correct for certain custom containers.
203template <typename T>
204inline constexpr bool kIsContainerCopiedByElement =
205 meta::kIsInstantiationOf<std::unordered_map, T> ||
206 meta::kIsInstantiationOf<std::map, T>;
207
208template <typename T>
209std::unique_ptr<T> CopyContainer(
210 const T& container, [[maybe_unused]] std::size_t cpu_relax_iterations,
211 tracing::ScopeTime& scope) {
212 if constexpr (kIsContainerCopiedByElement<T>) {
213 auto copy = std::make_unique<T>();
214 if constexpr (meta::kIsReservable<T>) {
215 copy->reserve(container.size());
216 }
217
218 utils::CpuRelax relax{cpu_relax_iterations, &scope};
219 for (const auto& kv : container) {
220 relax.Relax();
221 copy->insert(kv);
222 }
223 return copy;
224 } else {
225 return std::make_unique<T>(container);
226 }
227}
228
229template <typename Container, typename Value, typename KeyMember,
230 typename... Args>
231void CacheInsertOrAssign(Container& container, Value&& value,
232 const KeyMember& key_member, Args&&... /*args*/) {
233 // Args are only used to de-prioritize this default overload.
234 static_assert(sizeof...(Args) == 0);
235 // Copy 'key' to avoid aliasing issues in 'insert_or_assign'.
236 auto key = std::invoke(key_member, value);
237 container.insert_or_assign(std::move(key), std::forward<Value>(value));
238}
239
240template <typename T>
241using HasOnWritesDoneImpl = decltype(std::declval<T&>().OnWritesDone());
242
243template <typename T>
244void OnWritesDone(T& container) {
245 if constexpr (meta::kIsDetected<HasOnWritesDoneImpl, T>) {
246 container.OnWritesDone();
247 }
248}
249
250template <typename T>
251using HasCustomUpdatedImpl =
252 decltype(T::GetLastKnownUpdated(std::declval<DataCacheContainerType<T>>()));
253
254template <typename T>
255inline constexpr bool kHasCustomUpdated =
256 meta::kIsDetected<HasCustomUpdatedImpl, T>;
257
258template <typename T>
259using UpdatedFieldTypeImpl = typename T::UpdatedFieldType;
260template <typename T>
261inline constexpr bool kHasUpdatedFieldType =
262 meta::kIsDetected<UpdatedFieldTypeImpl, T>;
263template <typename T>
264using UpdatedFieldType =
265 meta::DetectedOr<storages::postgres::TimePointTz, UpdatedFieldTypeImpl, T>;
266
267template <typename T>
268constexpr bool CheckUpdatedFieldType() {
269 if constexpr (kHasUpdatedFieldType<T>) {
270 static_assert(
271 std::is_same_v<typename T::UpdatedFieldType,
272 storages::postgres::TimePointTz> ||
273 std::is_same_v<typename T::UpdatedFieldType,
274 storages::postgres::TimePoint> ||
275 kHasCustomUpdated<T>,
276 "Invalid UpdatedFieldType, must be either TimePointTz or TimePoint");
277 } else {
278 static_assert(!kWantIncrementalUpdates<T>,
279 "UpdatedFieldType must be explicitly specified when using "
280 "incremental updates");
281 }
282 return true;
283}
284
285// Cluster host type policy
286template <typename T>
287using HasClusterHostTypeImpl = decltype(T::kClusterHostType);
288
289template <typename T>
290constexpr storages::postgres::ClusterHostTypeFlags ClusterHostType() {
291 if constexpr (meta::kIsDetected<HasClusterHostTypeImpl, T>) {
292 return T::kClusterHostType;
293 } else {
294 return storages::postgres::ClusterHostType::kSlave;
295 }
296}
297
298// May return null policy
299template <typename T>
300using HasMayReturnNull = decltype(T::kMayReturnNull);
301
302template <typename T>
303constexpr bool MayReturnNull() {
304 if constexpr (meta::kIsDetected<HasMayReturnNull, T>) {
305 return T::kMayReturnNull;
306 } else {
307 return false;
308 }
309}
310
311template <typename PostgreCachePolicy>
312struct PolicyChecker {
313 // Static assertions for cache traits
314 static_assert(
315 kHasName<PostgreCachePolicy>,
316 "The PosgreSQL cache policy must contain a static member `kName`");
317 static_assert(
318 kHasValueType<PostgreCachePolicy>,
319 "The PosgreSQL cache policy must define a type alias `ValueType`");
320 static_assert(
321 kHasKeyMember<PostgreCachePolicy>,
322 "The PostgreSQL cache policy must contain a static member `kKeyMember` "
323 "with a pointer to a data or a function member with the object's key");
324 static_assert(kHasQuery<PostgreCachePolicy> ||
325 kHasGetQuery<PostgreCachePolicy>,
326 "The PosgreSQL cache policy must contain a static data member "
327 "`kQuery` with a select statement or a static member function "
328 "`GetQuery` returning the query");
329 static_assert(!(kHasQuery<PostgreCachePolicy> &&
330 kHasGetQuery<PostgreCachePolicy>),
331 "The PosgreSQL cache policy must define `kQuery` or "
332 "`GetQuery`, not both");
333 static_assert(
334 kHasUpdatedField<PostgreCachePolicy>,
335 "The PosgreSQL cache policy must contain a static member "
336 "`kUpdatedField`. If you don't want to use incremental updates, "
337 "please set its value to `nullptr`");
338 static_assert(CheckUpdatedFieldType<PostgreCachePolicy>());
339
340 static_assert(ClusterHostType<PostgreCachePolicy>() &
341 storages::postgres::kClusterHostRolesMask,
342 "Cluster host role must be specified for caching component, "
343 "please be more specific");
344
345 static storages::postgres::Query GetQuery() {
346 if constexpr (kHasGetQuery<PostgreCachePolicy>) {
347 return PostgreCachePolicy::GetQuery();
348 } else {
349 return PostgreCachePolicy::kQuery;
350 }
351 }
352
353 using BaseType =
354 CachingComponentBase<DataCacheContainerType<PostgreCachePolicy>>;
355};
356
357inline constexpr std::chrono::minutes kDefaultFullUpdateTimeout{1};
358inline constexpr std::chrono::seconds kDefaultIncrementalUpdateTimeout{1};
359inline constexpr std::chrono::milliseconds kStatementTimeoutOff{0};
360inline constexpr std::chrono::milliseconds kCpuRelaxThreshold{10};
361inline constexpr std::chrono::milliseconds kCpuRelaxInterval{2};
362
363inline constexpr std::string_view kCopyStage = "copy_data";
364inline constexpr std::string_view kFetchStage = "fetch";
365inline constexpr std::string_view kParseStage = "parse";
366
367inline constexpr std::size_t kDefaultChunkSize = 1000;
368} // namespace pg_cache::detail
369
370/// @ingroup userver_components
371///
372/// @brief Caching component for PostgreSQL. See @ref pg_cache.
373///
374/// @see @ref pg_cache, @ref scripts/docs/en/userver/caches.md
375template <typename PostgreCachePolicy>
376class PostgreCache final
377 : public pg_cache::detail::PolicyChecker<PostgreCachePolicy>::BaseType {
378 public:
379 // Type aliases
380 using PolicyType = PostgreCachePolicy;
381 using ValueType = pg_cache::detail::ValueType<PolicyType>;
382 using RawValueType = pg_cache::detail::RawValueType<PolicyType>;
383 using DataType = pg_cache::detail::DataCacheContainerType<PolicyType>;
384 using PolicyCheckerType = pg_cache::detail::PolicyChecker<PostgreCachePolicy>;
385 using UpdatedFieldType =
386 pg_cache::detail::UpdatedFieldType<PostgreCachePolicy>;
387 using BaseType = typename PolicyCheckerType::BaseType;
388
389 // Calculated constants
390 constexpr static bool kIncrementalUpdates =
391 pg_cache::detail::kWantIncrementalUpdates<PolicyType>;
392 constexpr static auto kClusterHostTypeFlags =
393 pg_cache::detail::ClusterHostType<PolicyType>();
394 constexpr static auto kName = PolicyType::kName;
395
396 PostgreCache(const ComponentConfig&, const ComponentContext&);
397 ~PostgreCache() override;
398
399 static yaml_config::Schema GetStaticConfigSchema();
400
401 private:
402 using CachedData = std::unique_ptr<DataType>;
403
404 UpdatedFieldType GetLastUpdated(
405 std::chrono::system_clock::time_point last_update,
406 const DataType& cache) const;
407
408 void Update(cache::UpdateType type,
409 const std::chrono::system_clock::time_point& last_update,
410 const std::chrono::system_clock::time_point& now,
411 cache::UpdateStatisticsScope& stats_scope) override;
412
413 bool MayReturnNull() const override;
414
415 CachedData GetDataSnapshot(cache::UpdateType type, tracing::ScopeTime& scope);
416 void CacheResults(storages::postgres::ResultSet res, CachedData& data_cache,
417 cache::UpdateStatisticsScope& stats_scope,
418 tracing::ScopeTime& scope);
419
420 static storages::postgres::Query GetAllQuery();
421 static storages::postgres::Query GetDeltaQuery();
422
423 std::chrono::milliseconds ParseCorrection(const ComponentConfig& config);
424
425 std::vector<storages::postgres::ClusterPtr> clusters_;
426
427 const std::chrono::system_clock::duration correction_;
428 const std::chrono::milliseconds full_update_timeout_;
429 const std::chrono::milliseconds incremental_update_timeout_;
430 const std::size_t chunk_size_;
431 std::size_t cpu_relax_iterations_parse_{0};
432 std::size_t cpu_relax_iterations_copy_{0};
433};
434
435template <typename PostgreCachePolicy>
436inline constexpr bool kHasValidate<PostgreCache<PostgreCachePolicy>> = true;
437
438template <typename PostgreCachePolicy>
439PostgreCache<PostgreCachePolicy>::PostgreCache(const ComponentConfig& config,
440 const ComponentContext& context)
441 : BaseType{config, context},
442 correction_{ParseCorrection(config)},
443 full_update_timeout_{
444 config["full-update-op-timeout"].As<std::chrono::milliseconds>(
445 pg_cache::detail::kDefaultFullUpdateTimeout)},
446 incremental_update_timeout_{
447 config["incremental-update-op-timeout"].As<std::chrono::milliseconds>(
448 pg_cache::detail::kDefaultIncrementalUpdateTimeout)},
449 chunk_size_{config["chunk-size"].As<size_t>(
450 pg_cache::detail::kDefaultChunkSize)} {
451 if (this->GetAllowedUpdateTypes() ==
452 cache::AllowedUpdateTypes::kFullAndIncremental &&
453 !kIncrementalUpdates) {
454 throw std::logic_error(
455 "Incremental update support is requested in config but no update field "
456 "name is specified in traits of '" +
457 config.Name() + "' cache");
458 }
459 if (correction_.count() < 0) {
460 throw std::logic_error(
461 "Refusing to set forward (negative) update correction requested in "
462 "config for '" +
463 config.Name() + "' cache");
464 }
465
466 const auto pg_alias = config["pgcomponent"].As<std::string>("");
467 if (pg_alias.empty()) {
468 throw storages::postgres::InvalidConfig{
469 "No `pgcomponent` entry in configuration"};
470 }
471 auto& pg_cluster_comp = context.FindComponent<components::Postgres>(pg_alias);
472 const auto shard_count = pg_cluster_comp.GetShardCount();
473 clusters_.resize(shard_count);
474 for (size_t i = 0; i < shard_count; ++i) {
475 clusters_[i] = pg_cluster_comp.GetClusterForShard(i);
476 }
477
478 LOG_INFO() << "Cache " << kName << " full update query `"
479 << GetAllQuery().Statement() << "` incremental update query `"
480 << GetDeltaQuery().Statement() << "`";
481
482 this->StartPeriodicUpdates();
483}
484
485template <typename PostgreCachePolicy>
486PostgreCache<PostgreCachePolicy>::~PostgreCache() {
487 this->StopPeriodicUpdates();
488}
489
490template <typename PostgreCachePolicy>
491storages::postgres::Query PostgreCache<PostgreCachePolicy>::GetAllQuery() {
492 storages::postgres::Query query = PolicyCheckerType::GetQuery();
493 if constexpr (pg_cache::detail::kHasWhere<PostgreCachePolicy>) {
494 return {fmt::format("{} where {}", query.Statement(),
495 PostgreCachePolicy::kWhere),
496 query.GetName()};
497 } else {
498 return query;
499 }
500}
501
502template <typename PostgreCachePolicy>
503storages::postgres::Query PostgreCache<PostgreCachePolicy>::GetDeltaQuery() {
504 if constexpr (kIncrementalUpdates) {
505 storages::postgres::Query query = PolicyCheckerType::GetQuery();
506
507 if constexpr (pg_cache::detail::kHasWhere<PostgreCachePolicy>) {
508 return {
509 fmt::format("{} where ({}) and {} >= $1", query.Statement(),
510 PostgreCachePolicy::kWhere, PolicyType::kUpdatedField),
511 query.GetName()};
512 } else {
513 return {fmt::format("{} where {} >= $1", query.Statement(),
514 PolicyType::kUpdatedField),
515 query.GetName()};
516 }
517 } else {
518 return GetAllQuery();
519 }
520}
521
522template <typename PostgreCachePolicy>
523std::chrono::milliseconds PostgreCache<PostgreCachePolicy>::ParseCorrection(
524 const ComponentConfig& config) {
525 static constexpr std::string_view kUpdateCorrection = "update-correction";
526 if (pg_cache::detail::kHasCustomUpdated<PostgreCachePolicy> ||
527 this->GetAllowedUpdateTypes() == cache::AllowedUpdateTypes::kOnlyFull) {
528 return config[kUpdateCorrection].As<std::chrono::milliseconds>(0);
529 } else {
530 return config[kUpdateCorrection].As<std::chrono::milliseconds>();
531 }
532}
533
534template <typename PostgreCachePolicy>
535typename PostgreCache<PostgreCachePolicy>::UpdatedFieldType
536PostgreCache<PostgreCachePolicy>::GetLastUpdated(
537 [[maybe_unused]] std::chrono::system_clock::time_point last_update,
538 const DataType& cache) const {
539 if constexpr (pg_cache::detail::kHasCustomUpdated<PostgreCachePolicy>) {
540 return PostgreCachePolicy::GetLastKnownUpdated(cache);
541 } else {
542 return UpdatedFieldType{last_update - correction_};
543 }
544}
545
546template <typename PostgreCachePolicy>
547void PostgreCache<PostgreCachePolicy>::Update(
548 cache::UpdateType type,
549 const std::chrono::system_clock::time_point& last_update,
550 const std::chrono::system_clock::time_point& /*now*/,
551 cache::UpdateStatisticsScope& stats_scope) {
552 namespace pg = storages::postgres;
553 if constexpr (!kIncrementalUpdates) {
554 type = cache::UpdateType::kFull;
555 }
556 const auto query =
557 (type == cache::UpdateType::kFull) ? GetAllQuery() : GetDeltaQuery();
558 const std::chrono::milliseconds timeout = (type == cache::UpdateType::kFull)
559 ? full_update_timeout_
560 : incremental_update_timeout_;
561
562 // COPY current cached data
563 auto scope = tracing::Span::CurrentSpan().CreateScopeTime(
564 std::string{pg_cache::detail::kCopyStage});
565 auto data_cache = GetDataSnapshot(type, scope);
566 [[maybe_unused]] const auto old_size = data_cache->size();
567
568 scope.Reset(std::string{pg_cache::detail::kFetchStage});
569
570 size_t changes = 0;
571 // Iterate clusters
572 for (auto& cluster : clusters_) {
573 if (chunk_size_ > 0) {
574 auto trx = cluster->Begin(
575 kClusterHostTypeFlags, pg::Transaction::RO,
576 pg::CommandControl{timeout, pg_cache::detail::kStatementTimeoutOff});
577 auto portal =
578 trx.MakePortal(query, GetLastUpdated(last_update, *data_cache));
579 while (portal) {
580 scope.Reset(std::string{pg_cache::detail::kFetchStage});
581 auto res = portal.Fetch(chunk_size_);
582 stats_scope.IncreaseDocumentsReadCount(res.Size());
583
584 scope.Reset(std::string{pg_cache::detail::kParseStage});
585 CacheResults(res, data_cache, stats_scope, scope);
586 changes += res.Size();
587 }
588 trx.Commit();
589 } else {
590 bool has_parameter = query.Statement().find('$') != std::string::npos;
591 auto res = has_parameter
592 ? cluster->Execute(
593 kClusterHostTypeFlags,
594 pg::CommandControl{
595 timeout, pg_cache::detail::kStatementTimeoutOff},
596 query, GetLastUpdated(last_update, *data_cache))
597 : cluster->Execute(
598 kClusterHostTypeFlags,
599 pg::CommandControl{
600 timeout, pg_cache::detail::kStatementTimeoutOff},
601 query);
602 stats_scope.IncreaseDocumentsReadCount(res.Size());
603
604 scope.Reset(std::string{pg_cache::detail::kParseStage});
605 CacheResults(res, data_cache, stats_scope, scope);
606 changes += res.Size();
607 }
608 }
609
610 scope.Reset();
611
612 if constexpr (pg_cache::detail::kIsContainerCopiedByElement<DataType>) {
613 if (old_size > 0) {
614 const auto elapsed_copy =
615 scope.ElapsedTotal(std::string{pg_cache::detail::kCopyStage});
616 if (elapsed_copy > pg_cache::detail::kCpuRelaxThreshold) {
617 cpu_relax_iterations_copy_ = static_cast<std::size_t>(
618 static_cast<double>(old_size) /
619 (elapsed_copy / pg_cache::detail::kCpuRelaxInterval));
620 LOG_TRACE() << "Elapsed time for copying " << kName << " "
621 << elapsed_copy.count() << " for " << changes
622 << " data items is over threshold. Will relax CPU every "
623 << cpu_relax_iterations_parse_ << " iterations";
624 }
625 }
626 }
627
628 if (changes > 0) {
629 const auto elapsed_parse =
630 scope.ElapsedTotal(std::string{pg_cache::detail::kParseStage});
631 if (elapsed_parse > pg_cache::detail::kCpuRelaxThreshold) {
632 cpu_relax_iterations_parse_ = static_cast<std::size_t>(
633 static_cast<double>(changes) /
634 (elapsed_parse / pg_cache::detail::kCpuRelaxInterval));
635 LOG_TRACE() << "Elapsed time for parsing " << kName << " "
636 << elapsed_parse.count() << " for " << changes
637 << " data items is over threshold. Will relax CPU every "
638 << cpu_relax_iterations_parse_ << " iterations";
639 }
640 }
641 if (changes > 0 || type == cache::UpdateType::kFull) {
642 // Set current cache
643 stats_scope.Finish(data_cache->size());
644 pg_cache::detail::OnWritesDone(*data_cache);
645 this->Set(std::move(data_cache));
646 } else {
647 stats_scope.FinishNoChanges();
648 }
649}
650
651template <typename PostgreCachePolicy>
652bool PostgreCache<PostgreCachePolicy>::MayReturnNull() const {
653 return pg_cache::detail::MayReturnNull<PolicyType>();
654}
655
656template <typename PostgreCachePolicy>
657void PostgreCache<PostgreCachePolicy>::CacheResults(
658 storages::postgres::ResultSet res, CachedData& data_cache,
659 cache::UpdateStatisticsScope& stats_scope, tracing::ScopeTime& scope) {
660 auto values = res.AsSetOf<RawValueType>(storages::postgres::kRowTag);
661 utils::CpuRelax relax{cpu_relax_iterations_parse_, &scope};
662 for (auto p = values.begin(); p != values.end(); ++p) {
663 relax.Relax();
664 try {
665 using pg_cache::detail::CacheInsertOrAssign;
666 CacheInsertOrAssign(
667 *data_cache, pg_cache::detail::ExtractValue<PostgreCachePolicy>(*p),
668 PostgreCachePolicy::kKeyMember);
669 } catch (const std::exception& e) {
670 stats_scope.IncreaseDocumentsParseFailures(1);
671 LOG_ERROR() << "Error parsing data row in cache '" << kName << "' to '"
672 << compiler::GetTypeName<ValueType>() << "': " << e.what();
673 }
674 }
675}
676
677template <typename PostgreCachePolicy>
678typename PostgreCache<PostgreCachePolicy>::CachedData
679PostgreCache<PostgreCachePolicy>::GetDataSnapshot(cache::UpdateType type,
680 tracing::ScopeTime& scope) {
681 if (type == cache::UpdateType::kIncremental) {
682 auto data = this->Get();
683 if (data) {
684 return pg_cache::detail::CopyContainer(*data, cpu_relax_iterations_copy_,
685 scope);
686 }
687 }
688 return std::make_unique<DataType>();
689}
690
691namespace impl {
692
693std::string GetPostgreCacheSchema();
694
695} // namespace impl
696
697template <typename PostgreCachePolicy>
698yaml_config::Schema PostgreCache<PostgreCachePolicy>::GetStaticConfigSchema() {
699 using ParentType =
700 typename pg_cache::detail::PolicyChecker<PostgreCachePolicy>::BaseType;
701 return yaml_config::MergeSchemas<ParentType>(impl::GetPostgreCacheSchema());
702}
703
704} // namespace components
705
706namespace utils::impl::projected_set {
707
708template <typename Set, typename Value, typename KeyMember>
709void CacheInsertOrAssign(Set& set, Value&& value,
710 const KeyMember& /*key_member*/) {
711 DoInsert(set, std::forward<Value>(value));
712}
713
714} // namespace utils::impl::projected_set
715
716USERVER_NAMESPACE_END