Github   Telegram
Loading...
Searching...
No Matches
base_postgres_cache.hpp
Go to the documentation of this file.
1#pragma once
2
5
7
8#include <chrono>
9#include <string_view>
10#include <type_traits>
11#include <unordered_map>
12
13#include <fmt/format.h>
14
17#include <userver/components/component_config.hpp>
19
23
32
33USERVER_NAMESPACE_BEGIN
34
35namespace components {
36
37// clang-format off
38
109
110// clang-format on
111
112namespace pg_cache::detail {
113
114template <typename T>
115using ValueType = typename T::ValueType;
116template <typename T>
117inline constexpr bool kHasValueType = meta::kIsDetected<ValueType, T>;
118
119template <typename T>
120using RawValueTypeImpl = typename T::RawValueType;
121template <typename T>
122inline constexpr bool kHasRawValueType = meta::kIsDetected<RawValueTypeImpl, T>;
123template <typename T>
124using RawValueType = meta::DetectedOr<ValueType<T>, RawValueTypeImpl, T>;
125
126template <typename PostgreCachePolicy>
127auto ExtractValue(RawValueType<PostgreCachePolicy>&& raw) {
128 if constexpr (kHasRawValueType<PostgreCachePolicy>) {
129 return Convert(std::move(raw),
130 formats::parse::To<ValueType<PostgreCachePolicy>>());
131 } else {
132 return std::move(raw);
133 }
134}
135
136// Component name in policy
137template <typename T>
138using HasNameImpl = std::enable_if_t<!std::string_view{T::kName}.empty()>;
139template <typename T>
140inline constexpr bool kHasName = meta::kIsDetected<HasNameImpl, T>;
141
142// Component query in policy
143template <typename T>
144using HasQueryImpl = decltype(T::kQuery);
145template <typename T>
146inline constexpr bool kHasQuery = meta::kIsDetected<HasQueryImpl, T>;
147
148// Component GetQuery in policy
149template <typename T>
150using HasGetQueryImpl = decltype(T::GetQuery());
151template <typename T>
152inline constexpr bool kHasGetQuery = meta::kIsDetected<HasGetQueryImpl, T>;
153
154// Component kWhere in policy
155template <typename T>
156using HasWhere = decltype(T::kWhere);
157template <typename T>
158inline constexpr bool kHasWhere = meta::kIsDetected<HasWhere, T>;
159
160// Update field
161template <typename T>
162using HasUpdatedField = decltype(T::kUpdatedField);
163template <typename T>
164inline constexpr bool kHasUpdatedField = meta::kIsDetected<HasUpdatedField, T>;
165
166template <typename T>
167using WantIncrementalUpdates =
168 std::enable_if_t<!std::string_view{T::kUpdatedField}.empty()>;
169template <typename T>
170inline constexpr bool kWantIncrementalUpdates =
171 meta::kIsDetected<WantIncrementalUpdates, T>;
172
173// Key member in policy
174template <typename T>
175using KeyMemberTypeImpl =
176 std::decay_t<std::invoke_result_t<decltype(T::kKeyMember), ValueType<T>>>;
177template <typename T>
178inline constexpr bool kHasKeyMember = meta::kIsDetected<KeyMemberTypeImpl, T>;
179template <typename T>
180using KeyMemberType = meta::DetectedType<KeyMemberTypeImpl, T>;
181
182// Data container for cache
183template <typename T, typename = USERVER_NAMESPACE::utils::void_t<>>
184struct DataCacheContainer {
185 static_assert(meta::kIsStdHashable<KeyMemberType<T>>,
186 "With default CacheContainer, key type must be std::hash-able");
187
188 using type = std::unordered_map<KeyMemberType<T>, ValueType<T>>;
189};
190
191template <typename T>
192struct DataCacheContainer<
193 T, USERVER_NAMESPACE::utils::void_t<typename T::CacheContainer>> {
194 using type = typename T::CacheContainer;
195};
196
197template <typename T>
198using DataCacheContainerType = typename DataCacheContainer<T>::type;
199
200// We have to whitelist container types, for which we perform by-element
201// copying, because it's not correct for certain custom containers.
202template <typename T>
203inline constexpr bool kIsContainerCopiedByElement =
204 meta::kIsInstantiationOf<std::unordered_map, T> ||
205 meta::kIsInstantiationOf<std::map, T>;
206
207template <typename T>
208std::unique_ptr<T> CopyContainer(
209 const T& container, [[maybe_unused]] std::size_t cpu_relax_iterations,
210 tracing::ScopeTime& scope) {
211 if constexpr (kIsContainerCopiedByElement<T>) {
212 auto copy = std::make_unique<T>();
213 if constexpr (meta::kIsReservable<T>) {
214 copy->reserve(container.size());
215 }
216
217 utils::CpuRelax relax{cpu_relax_iterations, &scope};
218 for (const auto& kv : container) {
219 relax.Relax();
220 copy->insert(kv);
221 }
222 return copy;
223 } else {
224 return std::make_unique<T>(container);
225 }
226}
227
228template <typename Container, typename Value, typename KeyMember,
229 typename... Args>
230void CacheInsertOrAssign(Container& container, Value&& value,
231 const KeyMember& key_member, Args&&... /*args*/) {
232 // Args are only used to de-prioritize this default overload.
233 static_assert(sizeof...(Args) == 0);
234 // Copy 'key' to avoid aliasing issues in 'insert_or_assign'.
235 auto key = std::invoke(key_member, value);
236 container.insert_or_assign(std::move(key), std::forward<Value>(value));
237}
238
239template <typename T>
240using HasOnWritesDoneImpl = decltype(std::declval<T&>().OnWritesDone());
241
242template <typename T>
243void OnWritesDone(T& container) {
244 if constexpr (meta::kIsDetected<HasOnWritesDoneImpl, T>) {
245 container.OnWritesDone();
246 }
247}
248
249template <typename T>
250using HasCustomUpdatedImpl =
251 decltype(T::GetLastKnownUpdated(std::declval<DataCacheContainerType<T>>()));
252
253template <typename T>
254inline constexpr bool kHasCustomUpdated =
255 meta::kIsDetected<HasCustomUpdatedImpl, T>;
256
257template <typename T>
258using UpdatedFieldTypeImpl = typename T::UpdatedFieldType;
259template <typename T>
260inline constexpr bool kHasUpdatedFieldType =
261 meta::kIsDetected<UpdatedFieldTypeImpl, T>;
262template <typename T>
263using UpdatedFieldType =
265
266template <typename T>
267constexpr bool CheckUpdatedFieldType() {
268 if constexpr (kHasUpdatedFieldType<T>) {
269 static_assert(
270 std::is_same_v<typename T::UpdatedFieldType,
272 std::is_same_v<typename T::UpdatedFieldType,
274 kHasCustomUpdated<T>,
275 "Invalid UpdatedFieldType, must be either TimePointTz or TimePoint");
276 } else {
277 static_assert(!kWantIncrementalUpdates<T>,
278 "UpdatedFieldType must be explicitly specified when using "
279 "incremental updates");
280 }
281 return true;
282}
283
284// Cluster host type policy
285template <typename T>
286using HasClusterHostTypeImpl = decltype(T::kClusterHostType);
287
288template <typename T>
290 if constexpr (meta::kIsDetected<HasClusterHostTypeImpl, T>) {
291 return T::kClusterHostType;
292 } else {
294 }
295}
296
297// May return null policy
298template <typename T>
299using HasMayReturnNull = decltype(T::kMayReturnNull);
300
301template <typename T>
302constexpr bool MayReturnNull() {
303 if constexpr (meta::kIsDetected<HasMayReturnNull, T>) {
304 return T::kMayReturnNull;
305 } else {
306 return false;
307 }
308}
309
310template <typename PostgreCachePolicy>
311struct PolicyChecker {
312 // Static assertions for cache traits
313 static_assert(
314 kHasName<PostgreCachePolicy>,
315 "The PosgreSQL cache policy must contain a static member `kName`");
316 static_assert(
317 kHasValueType<PostgreCachePolicy>,
318 "The PosgreSQL cache policy must define a type alias `ValueType`");
319 static_assert(
320 kHasKeyMember<PostgreCachePolicy>,
321 "The PostgreSQL cache policy must contain a static member `kKeyMember` "
322 "with a pointer to a data or a function member with the object's key");
323 static_assert(kHasQuery<PostgreCachePolicy> ||
324 kHasGetQuery<PostgreCachePolicy>,
325 "The PosgreSQL cache policy must contain a static data member "
326 "`kQuery` with a select statement or a static member function "
327 "`GetQuery` returning the query");
328 static_assert(!(kHasQuery<PostgreCachePolicy> &&
329 kHasGetQuery<PostgreCachePolicy>),
330 "The PosgreSQL cache policy must define `kQuery` or "
331 "`GetQuery`, not both");
332 static_assert(
333 kHasUpdatedField<PostgreCachePolicy>,
334 "The PosgreSQL cache policy must contain a static member "
335 "`kUpdatedField`. If you don't want to use incremental updates, "
336 "please set its value to `nullptr`");
337 static_assert(CheckUpdatedFieldType<PostgreCachePolicy>());
338
339 static_assert(ClusterHostType<PostgreCachePolicy>() &
340 storages::postgres::kClusterHostRolesMask,
341 "Cluster host role must be specified for caching component, "
342 "please be more specific");
343
344 static storages::postgres::Query GetQuery() {
345 if constexpr (kHasGetQuery<PostgreCachePolicy>) {
346 return PostgreCachePolicy::GetQuery();
347 } else {
348 return PostgreCachePolicy::kQuery;
349 }
350 }
351
352 using BaseType =
353 CachingComponentBase<DataCacheContainerType<PostgreCachePolicy>>;
354};
355
356inline constexpr std::chrono::minutes kDefaultFullUpdateTimeout{1};
357inline constexpr std::chrono::seconds kDefaultIncrementalUpdateTimeout{1};
358inline constexpr std::chrono::milliseconds kStatementTimeoutOff{0};
359inline constexpr std::chrono::milliseconds kCpuRelaxThreshold{10};
360inline constexpr std::chrono::milliseconds kCpuRelaxInterval{2};
361
362inline constexpr std::string_view kCopyStage = "copy_data";
363inline constexpr std::string_view kFetchStage = "fetch";
364inline constexpr std::string_view kParseStage = "parse";
365
366inline constexpr std::size_t kDefaultChunkSize = 1000;
367} // namespace pg_cache::detail
368
374template <typename PostgreCachePolicy>
375class PostgreCache final
376 : public pg_cache::detail::PolicyChecker<PostgreCachePolicy>::BaseType {
377 public:
378 // Type aliases
379 using PolicyType = PostgreCachePolicy;
380 using ValueType = pg_cache::detail::ValueType<PolicyType>;
381 using RawValueType = pg_cache::detail::RawValueType<PolicyType>;
382 using DataType = pg_cache::detail::DataCacheContainerType<PolicyType>;
383 using PolicyCheckerType = pg_cache::detail::PolicyChecker<PostgreCachePolicy>;
384 using UpdatedFieldType =
385 pg_cache::detail::UpdatedFieldType<PostgreCachePolicy>;
386 using BaseType = typename PolicyCheckerType::BaseType;
387
388 // Calculated constants
389 constexpr static bool kIncrementalUpdates =
390 pg_cache::detail::kWantIncrementalUpdates<PolicyType>;
391 constexpr static auto kClusterHostTypeFlags =
392 pg_cache::detail::ClusterHostType<PolicyType>();
393 constexpr static auto kName = PolicyType::kName;
394
396 ~PostgreCache() override;
397
398 static yaml_config::Schema GetStaticConfigSchema();
399
400 private:
401 using CachedData = std::unique_ptr<DataType>;
402
403 UpdatedFieldType GetLastUpdated(
404 std::chrono::system_clock::time_point last_update,
405 const DataType& cache) const;
406
407 void Update(cache::UpdateType type,
408 const std::chrono::system_clock::time_point& last_update,
409 const std::chrono::system_clock::time_point& now,
410 cache::UpdateStatisticsScope& stats_scope) override;
411
412 bool MayReturnNull() const override;
413
414 CachedData GetDataSnapshot(cache::UpdateType type, tracing::ScopeTime& scope);
415 void CacheResults(storages::postgres::ResultSet res, CachedData& data_cache,
416 cache::UpdateStatisticsScope& stats_scope,
417 tracing::ScopeTime& scope);
418
419 static storages::postgres::Query GetAllQuery();
420 static storages::postgres::Query GetDeltaQuery();
421
422 std::chrono::milliseconds ParseCorrection(const ComponentConfig& config);
423
424 std::vector<storages::postgres::ClusterPtr> clusters_;
425
426 const std::chrono::system_clock::duration correction_;
427 const std::chrono::milliseconds full_update_timeout_;
428 const std::chrono::milliseconds incremental_update_timeout_;
429 const std::size_t chunk_size_;
430 std::size_t cpu_relax_iterations_parse_{0};
431 std::size_t cpu_relax_iterations_copy_{0};
432};
433
434template <typename PostgreCachePolicy>
435inline constexpr bool kHasValidate<PostgreCache<PostgreCachePolicy>> = true;
436
437template <typename PostgreCachePolicy>
439 const ComponentContext& context)
440 : BaseType{config, context},
441 correction_{ParseCorrection(config)},
442 full_update_timeout_{
443 config["full-update-op-timeout"].As<std::chrono::milliseconds>(
444 pg_cache::detail::kDefaultFullUpdateTimeout)},
445 incremental_update_timeout_{
446 config["incremental-update-op-timeout"].As<std::chrono::milliseconds>(
447 pg_cache::detail::kDefaultIncrementalUpdateTimeout)},
448 chunk_size_{config["chunk-size"].As<size_t>(
449 pg_cache::detail::kDefaultChunkSize)} {
450 if (this->GetAllowedUpdateTypes() ==
451 cache::AllowedUpdateTypes::kFullAndIncremental &&
452 !kIncrementalUpdates) {
453 throw std::logic_error(
454 "Incremental update support is requested in config but no update field "
455 "name is specified in traits of '" +
456 config.Name() + "' cache");
457 }
458 if (correction_.count() < 0) {
459 throw std::logic_error(
460 "Refusing to set forward (negative) update correction requested in "
461 "config for '" +
462 config.Name() + "' cache");
463 }
464
465 const auto pg_alias = config["pgcomponent"].As<std::string>("");
466 if (pg_alias.empty()) {
468 "No `pgcomponent` entry in configuration"};
469 }
470 auto& pg_cluster_comp = context.FindComponent<components::Postgres>(pg_alias);
471 const auto shard_count = pg_cluster_comp.GetShardCount();
472 clusters_.resize(shard_count);
473 for (size_t i = 0; i < shard_count; ++i) {
474 clusters_[i] = pg_cluster_comp.GetClusterForShard(i);
475 }
476
477 LOG_INFO() << "Cache " << kName << " full update query `"
478 << GetAllQuery().Statement() << "` incremental update query `"
479 << GetDeltaQuery().Statement() << "`";
480
481 this->StartPeriodicUpdates();
482}
483
484template <typename PostgreCachePolicy>
485PostgreCache<PostgreCachePolicy>::~PostgreCache() {
486 this->StopPeriodicUpdates();
487}
488
489template <typename PostgreCachePolicy>
490storages::postgres::Query PostgreCache<PostgreCachePolicy>::GetAllQuery() {
491 storages::postgres::Query query = PolicyCheckerType::GetQuery();
492 if constexpr (pg_cache::detail::kHasWhere<PostgreCachePolicy>) {
493 return {fmt::format("{} where {}", query.Statement(),
494 PostgreCachePolicy::kWhere),
495 query.GetName()};
496 } else {
497 return query;
498 }
499}
500
501template <typename PostgreCachePolicy>
502storages::postgres::Query PostgreCache<PostgreCachePolicy>::GetDeltaQuery() {
503 if constexpr (kIncrementalUpdates) {
504 storages::postgres::Query query = PolicyCheckerType::GetQuery();
505
506 if constexpr (pg_cache::detail::kHasWhere<PostgreCachePolicy>) {
507 return {
508 fmt::format("{} where ({}) and {} >= $1", query.Statement(),
509 PostgreCachePolicy::kWhere, PolicyType::kUpdatedField),
510 query.GetName()};
511 } else {
512 return {fmt::format("{} where {} >= $1", query.Statement(),
513 PolicyType::kUpdatedField),
514 query.GetName()};
515 }
516 } else {
517 return GetAllQuery();
518 }
519}
520
521template <typename PostgreCachePolicy>
522std::chrono::milliseconds PostgreCache<PostgreCachePolicy>::ParseCorrection(
523 const ComponentConfig& config) {
524 static constexpr std::string_view kUpdateCorrection = "update-correction";
525 if (pg_cache::detail::kHasCustomUpdated<PostgreCachePolicy> ||
526 this->GetAllowedUpdateTypes() == cache::AllowedUpdateTypes::kOnlyFull) {
527 return config[kUpdateCorrection].As<std::chrono::milliseconds>(0);
528 } else {
529 return config[kUpdateCorrection].As<std::chrono::milliseconds>();
530 }
531}
532
533template <typename PostgreCachePolicy>
534typename PostgreCache<PostgreCachePolicy>::UpdatedFieldType
535PostgreCache<PostgreCachePolicy>::GetLastUpdated(
536 [[maybe_unused]] std::chrono::system_clock::time_point last_update,
537 const DataType& cache) const {
538 if constexpr (pg_cache::detail::kHasCustomUpdated<PostgreCachePolicy>) {
539 return PostgreCachePolicy::GetLastKnownUpdated(cache);
540 } else {
541 return UpdatedFieldType{last_update - correction_};
542 }
543}
544
545template <typename PostgreCachePolicy>
546void PostgreCache<PostgreCachePolicy>::Update(
548 const std::chrono::system_clock::time_point& last_update,
549 const std::chrono::system_clock::time_point& /*now*/,
550 cache::UpdateStatisticsScope& stats_scope) {
551 namespace pg = storages::postgres;
552 if constexpr (!kIncrementalUpdates) {
554 }
555 const auto query =
556 (type == cache::UpdateType::kFull) ? GetAllQuery() : GetDeltaQuery();
557 const std::chrono::milliseconds timeout = (type == cache::UpdateType::kFull)
558 ? full_update_timeout_
559 : incremental_update_timeout_;
560
561 // COPY current cached data
563 std::string{pg_cache::detail::kCopyStage});
564 auto data_cache = GetDataSnapshot(type, scope);
565 [[maybe_unused]] const auto old_size = data_cache->size();
566
567 scope.Reset(std::string{pg_cache::detail::kFetchStage});
568
569 size_t changes = 0;
570 // Iterate clusters
571 for (auto& cluster : clusters_) {
572 if (chunk_size_ > 0) {
573 auto trx = cluster->Begin(
574 kClusterHostTypeFlags, pg::Transaction::RO,
575 pg::CommandControl{timeout, pg_cache::detail::kStatementTimeoutOff});
576 auto portal =
577 trx.MakePortal(query, GetLastUpdated(last_update, *data_cache));
578 while (portal) {
579 scope.Reset(std::string{pg_cache::detail::kFetchStage});
580 auto res = portal.Fetch(chunk_size_);
581 stats_scope.IncreaseDocumentsReadCount(res.Size());
582
583 scope.Reset(std::string{pg_cache::detail::kParseStage});
584 CacheResults(res, data_cache, stats_scope, scope);
585 changes += res.Size();
586 }
587 trx.Commit();
588 } else {
589 bool has_parameter = query.Statement().find('$') != std::string::npos;
590 auto res = has_parameter
591 ? cluster->Execute(
592 kClusterHostTypeFlags,
593 pg::CommandControl{
594 timeout, pg_cache::detail::kStatementTimeoutOff},
595 query, GetLastUpdated(last_update, *data_cache))
596 : cluster->Execute(
597 kClusterHostTypeFlags,
598 pg::CommandControl{
599 timeout, pg_cache::detail::kStatementTimeoutOff},
600 query);
601 stats_scope.IncreaseDocumentsReadCount(res.Size());
602
603 scope.Reset(std::string{pg_cache::detail::kParseStage});
604 CacheResults(res, data_cache, stats_scope, scope);
605 changes += res.Size();
606 }
607 }
608
609 scope.Reset();
610
611 if constexpr (pg_cache::detail::kIsContainerCopiedByElement<DataType>) {
612 if (old_size > 0) {
613 const auto elapsed_copy =
614 scope.ElapsedTotal(std::string{pg_cache::detail::kCopyStage});
615 if (elapsed_copy > pg_cache::detail::kCpuRelaxThreshold) {
616 cpu_relax_iterations_copy_ = static_cast<std::size_t>(
617 static_cast<double>(old_size) /
618 (elapsed_copy / pg_cache::detail::kCpuRelaxInterval));
619 LOG_TRACE() << "Elapsed time for copying " << kName << " "
620 << elapsed_copy.count() << " for " << changes
621 << " data items is over threshold. Will relax CPU every "
622 << cpu_relax_iterations_parse_ << " iterations";
623 }
624 }
625 }
626
627 if (changes > 0) {
628 const auto elapsed_parse =
629 scope.ElapsedTotal(std::string{pg_cache::detail::kParseStage});
630 if (elapsed_parse > pg_cache::detail::kCpuRelaxThreshold) {
631 cpu_relax_iterations_parse_ = static_cast<std::size_t>(
632 static_cast<double>(changes) /
633 (elapsed_parse / pg_cache::detail::kCpuRelaxInterval));
634 LOG_TRACE() << "Elapsed time for parsing " << kName << " "
635 << elapsed_parse.count() << " for " << changes
636 << " data items is over threshold. Will relax CPU every "
637 << cpu_relax_iterations_parse_ << " iterations";
638 }
639 }
640 if (changes > 0 || type == cache::UpdateType::kFull) {
641 // Set current cache
642 stats_scope.Finish(data_cache->size());
643 pg_cache::detail::OnWritesDone(*data_cache);
644 this->Set(std::move(data_cache));
645 } else {
646 stats_scope.FinishNoChanges();
647 }
648}
649
650template <typename PostgreCachePolicy>
651bool PostgreCache<PostgreCachePolicy>::MayReturnNull() const {
652 return pg_cache::detail::MayReturnNull<PolicyType>();
653}
654
655template <typename PostgreCachePolicy>
656void PostgreCache<PostgreCachePolicy>::CacheResults(
657 storages::postgres::ResultSet res, CachedData& data_cache,
659 auto values = res.AsSetOf<RawValueType>(storages::postgres::kRowTag);
660 utils::CpuRelax relax{cpu_relax_iterations_parse_, &scope};
661 for (auto p = values.begin(); p != values.end(); ++p) {
662 relax.Relax();
663 try {
664 using pg_cache::detail::CacheInsertOrAssign;
665 CacheInsertOrAssign(
666 *data_cache, pg_cache::detail::ExtractValue<PostgreCachePolicy>(*p),
667 PostgreCachePolicy::kKeyMember);
668 } catch (const std::exception& e) {
669 stats_scope.IncreaseDocumentsParseFailures(1);
670 LOG_ERROR() << "Error parsing data row in cache '" << kName << "' to '"
671 << compiler::GetTypeName<ValueType>() << "': " << e.what();
672 }
673 }
674}
675
676template <typename PostgreCachePolicy>
677typename PostgreCache<PostgreCachePolicy>::CachedData
678PostgreCache<PostgreCachePolicy>::GetDataSnapshot(cache::UpdateType type,
679 tracing::ScopeTime& scope) {
681 auto data = this->Get();
682 if (data) {
683 return pg_cache::detail::CopyContainer(*data, cpu_relax_iterations_copy_,
684 scope);
685 }
686 }
687 return std::make_unique<DataType>();
688}
689
690namespace impl {
691
692std::string GetPostgreCacheSchema();
693
694} // namespace impl
695
696template <typename PostgreCachePolicy>
697yaml_config::Schema PostgreCache<PostgreCachePolicy>::GetStaticConfigSchema() {
698 using ParentType =
699 typename pg_cache::detail::PolicyChecker<PostgreCachePolicy>::BaseType;
700 return yaml_config::MergeSchemas<ParentType>(impl::GetPostgreCacheSchema());
701}
702
703} // namespace components
704
705namespace utils::impl::projected_set {
706
707template <typename Set, typename Value, typename KeyMember>
708void CacheInsertOrAssign(Set& set, Value&& value,
709 const KeyMember& /*key_member*/) {
710 DoInsert(set, std::forward<Value>(value));
711}
712
713} // namespace utils::impl::projected_set
714
715USERVER_NAMESPACE_END