10#include <userver/cache/cache_statistics.hpp> 
   11#include <userver/cache/caching_component_base.hpp> 
   12#include <userver/cache/mongo_cache_type_traits.hpp> 
   13#include <userver/components/component_context.hpp> 
   14#include <userver/formats/bson/document.hpp> 
   15#include <userver/formats/bson/inline.hpp> 
   16#include <userver/formats/bson/value_builder.hpp> 
   17#include <userver/storages/mongo/collection.hpp> 
   18#include <userver/storages/mongo/operations.hpp> 
   19#include <userver/storages/mongo/options.hpp> 
   20#include <userver/tracing/span.hpp> 
   21#include <userver/utils/cpu_relax.hpp> 
   22#include <userver/yaml_config/merge_schemas.hpp> 
   24USERVER_NAMESPACE_BEGIN
 
   28inline const std::string kFetchAndParseStage = 
"fetch_and_parse";
 
   30inline constexpr std::chrono::milliseconds kCpuRelaxThreshold{10};
 
   31inline constexpr std::chrono::milliseconds kCpuRelaxInterval{2};
 
   35std::chrono::milliseconds GetMongoCacheUpdateCorrection(
const ComponentConfig&);
 
  118template <
class MongoCacheTraits>
 
  127  MongoCache(
const ComponentConfig&, 
const ComponentContext&);
 
  131  static yaml_config::Schema GetStaticConfigSchema();
 
  135              const std::chrono::system_clock::time_point& last_update,
 
  136              const std::chrono::system_clock::time_point& now,
 
  137              cache::UpdateStatisticsScope& stats_scope) 
override;
 
  139  typename MongoCacheTraits::ObjectType DeserializeObject(
 
  144      const std::chrono::system_clock::time_point& last_update,
 
  145      const std::chrono::system_clock::time_point& now,
 
  146      const std::chrono::system_clock::duration& correction);
 
  148  std::unique_ptr<
typename MongoCacheTraits::DataType> GetData(
 
  149      cache::UpdateType type);
 
  151  const std::shared_ptr<CollectionsType> mongo_collections_;
 
  152  const storages::mongo::
Collection* 
const mongo_collection_;
 
  153  const std::chrono::system_clock::duration correction_;
 
  154  std::size_t cpu_relax_iterations_{0};
 
  157template <
class MongoCacheTraits>
 
  158inline constexpr bool kHasValidate<MongoCache<MongoCacheTraits>> = 
true;
 
  160template <
class MongoCacheTraits>
 
  161MongoCache<MongoCacheTraits>::MongoCache(
const ComponentConfig& config,
 
  162                                         const ComponentContext& context)
 
  168                  typename MongoCacheTraits::MongoCollectionsComponent>()
 
  169              .
template GetCollectionForLibrary<CollectionsType>()),
 
  170      mongo_collection_(std::addressof(
 
  171          mongo_collections_.get()->*MongoCacheTraits::kMongoCollectionsField)),
 
  172      correction_(impl::GetMongoCacheUpdateCorrection(config)) {
 
  173  [[maybe_unused]] mongo_cache::impl::CheckTraits<MongoCacheTraits>
 
  177          typename MongoCacheTraits::DataType>::GetAllowedUpdateTypes() ==
 
  179      !mongo_cache::impl::kHasUpdateFieldName<MongoCacheTraits> &&
 
  180      !mongo_cache::impl::kHasFindOperation<MongoCacheTraits>) {
 
  181    throw std::logic_error(
 
  182        "Incremental update support is requested in config but no update field " 
  183        "name is specified in traits of '" +
 
  186  if (correction_.count() < 0) {
 
  187    throw std::logic_error(
 
  188        "Refusing to set forward (negative) update correction requested in " 
  193  this->StartPeriodicUpdates();
 
  196template <
class MongoCacheTraits>
 
  198  this->StopPeriodicUpdates();
 
  201template <
class MongoCacheTraits>
 
  204    const std::chrono::system_clock::time_point& last_update,
 
  205    const std::chrono::system_clock::time_point& now,
 
  206    cache::UpdateStatisticsScope& stats_scope) {
 
  207  namespace sm = storages::mongo;
 
  209  const auto* collection = mongo_collection_;
 
  210  auto find_op = GetFindOperation(type, last_update, now, correction_);
 
  211  auto cursor = collection->Execute(find_op);
 
  212  if (type == cache::UpdateType::kIncremental && !cursor) {
 
  214    LOG_INFO() << 
"No changes in cache " << MongoCacheTraits::kName;
 
  220  auto new_cache = GetData(type);
 
  223  scope.Reset(kFetchAndParseStage);
 
  226  std::size_t doc_count = 0;
 
  228  for (
const auto& doc : cursor) {
 
  233    stats_scope.IncreaseDocumentsReadCount(1);
 
  236      auto object = DeserializeObject(doc);
 
  237      auto key = (object.*MongoCacheTraits::kKeyField);
 
  239      if (type == cache::UpdateType::kIncremental ||
 
  240          new_cache->count(key) == 0) {
 
  241        (*new_cache)[key] = std::move(object);
 
  244                            << MongoCacheTraits::kName << 
", key=" << key;
 
  246    } 
catch (
const std::exception& e) {
 
  248                          << MongoCacheTraits::kName << 
", _id=" 
  249                          << doc[
"_id"].
template ConvertTo<std::string>()
 
  250                          << 
", what(): " << e;
 
  251      stats_scope.IncreaseDocumentsParseFailures(1);
 
  253      if (!MongoCacheTraits::kAreInvalidDocumentsSkipped) 
throw;
 
  257  const auto elapsed_time = scope.ElapsedTotal(kFetchAndParseStage);
 
  258  if (elapsed_time > kCpuRelaxThreshold) {
 
  259    cpu_relax_iterations_ = 
static_cast<std::size_t>(
 
  260        static_cast<
double>(doc_count) / (elapsed_time / kCpuRelaxInterval));
 
  262        "Elapsed time for updating {} {} for {} data items is over threshold. " 
  263        "Will relax CPU every {} iterations",
 
  264        kName, elapsed_time.count(), doc_count, cpu_relax_iterations_);
 
  269  const auto size = new_cache->size();
 
  270  this->Set(std::move(new_cache));
 
  271  stats_scope.Finish(size);
 
  274template <
class MongoCacheTraits>
 
  275typename MongoCacheTraits::ObjectType
 
  276MongoCache<MongoCacheTraits>::DeserializeObject(
 
  278  if constexpr (mongo_cache::impl::kHasDeserializeObject<MongoCacheTraits>) {
 
  279    return MongoCacheTraits::DeserializeObject(doc);
 
  281  if constexpr (mongo_cache::impl::kHasDefaultDeserializeObject<
 
  283    return doc.As<
typename MongoCacheTraits::ObjectType>();
 
  286              "No deserialize operation defined but DeserializeObject invoked");
 
  289template <
class MongoCacheTraits>
 
  291MongoCache<MongoCacheTraits>::GetFindOperation(
 
  293    const std::chrono::system_clock::time_point& last_update,
 
  294    const std::chrono::system_clock::time_point& now,
 
  295    const std::chrono::system_clock::duration& correction) {
 
  296  namespace bson = formats::
bson;
 
  297  namespace sm = storages::mongo;
 
  300    if constexpr (mongo_cache::impl::kHasFindOperation<MongoCacheTraits>) {
 
  301      return MongoCacheTraits::GetFindOperation(type, last_update, now,
 
  304    if constexpr (mongo_cache::impl::kHasDefaultFindOperation<
 
  307      if constexpr (mongo_cache::impl::kHasUpdateFieldName<MongoCacheTraits>) {
 
  309          query_builder[MongoCacheTraits::kMongoUpdateFieldName] =
 
  310              bson::MakeDoc(
"$gt", last_update - correction);
 
  316                "No find operation defined but GetFindOperation invoked");
 
  319  if (MongoCacheTraits::kIsSecondaryPreferred) {
 
  325template <
class MongoCacheTraits>
 
  326std::unique_ptr<
typename MongoCacheTraits::DataType>
 
  327MongoCache<MongoCacheTraits>::GetData(cache::UpdateType type) {
 
  329    auto ptr = 
this->Get();
 
  330    return std::make_unique<
typename MongoCacheTraits::DataType>(*ptr);
 
  332    return std::make_unique<
typename MongoCacheTraits::DataType>();
 
  338std::string GetMongoCacheSchema();
 
  342template <
class MongoCacheTraits>
 
  346      impl::GetMongoCacheSchema());