10#include <userver/cache/cache_statistics.hpp> 
   11#include <userver/cache/caching_component_base.hpp> 
   12#include <userver/cache/mongo_cache_type_traits.hpp> 
   13#include <userver/components/component_context.hpp> 
   14#include <userver/formats/bson/document.hpp> 
   15#include <userver/formats/bson/inline.hpp> 
   16#include <userver/formats/bson/value_builder.hpp> 
   17#include <userver/storages/mongo/collection.hpp> 
   18#include <userver/storages/mongo/operations.hpp> 
   19#include <userver/storages/mongo/options.hpp> 
   20#include <userver/tracing/span.hpp> 
   21#include <userver/utils/cpu_relax.hpp> 
   22#include <userver/yaml_config/merge_schemas.hpp> 
   24USERVER_NAMESPACE_BEGIN
 
   28inline const std::string kFetchAndParseStage = 
"fetch_and_parse";
 
   30inline constexpr std::chrono::milliseconds kCpuRelaxThreshold{10};
 
   31inline constexpr std::chrono::milliseconds kCpuRelaxInterval{2};
 
   35std::chrono::milliseconds GetMongoCacheUpdateCorrection(
const ComponentConfig&);
 
  121template <
class MongoCacheTraits>
 
  130  MongoCache(
const ComponentConfig&, 
const ComponentContext&);
 
  134  static yaml_config::Schema GetStaticConfigSchema();
 
  138              const std::chrono::system_clock::time_point& last_update,
 
  139              const std::chrono::system_clock::time_point& now,
 
  140              cache::UpdateStatisticsScope& stats_scope) 
override;
 
  142  typename MongoCacheTraits::ObjectType DeserializeObject(
 
  147      const std::chrono::system_clock::time_point& last_update,
 
  148      const std::chrono::system_clock::time_point& now,
 
  149      const std::chrono::system_clock::duration& correction);
 
  151  std::unique_ptr<
typename MongoCacheTraits::DataType> GetData(
 
  152      cache::UpdateType type);
 
  154  const std::shared_ptr<CollectionsType> mongo_collections_;
 
  156  const std::chrono::system_clock::duration correction_;
 
  157  std::size_t cpu_relax_iterations_{0};
 
  160template <
class MongoCacheTraits>
 
  161inline constexpr bool kHasValidate<MongoCache<MongoCacheTraits>> = 
true;
 
  163template <
class MongoCacheTraits>
 
  164MongoCache<MongoCacheTraits>::MongoCache(
const ComponentConfig& config,
 
  165                                         const ComponentContext& context)
 
  171                  typename MongoCacheTraits::MongoCollectionsComponent>()
 
  172              .
template GetCollectionForLibrary<CollectionsType>()),
 
  173      mongo_collection_(std::addressof(
 
  174          mongo_collections_.get()->*MongoCacheTraits::kMongoCollectionsField)),
 
  175      correction_(impl::GetMongoCacheUpdateCorrection(config)) {
 
  176  [[maybe_unused]] mongo_cache::impl::CheckTraits<MongoCacheTraits>
 
  180          typename MongoCacheTraits::DataType>::GetAllowedUpdateTypes() ==
 
  182      !mongo_cache::impl::kHasUpdateFieldName<MongoCacheTraits> &&
 
  183      !mongo_cache::impl::kHasFindOperation<MongoCacheTraits>) {
 
  184    throw std::logic_error(
 
  185        "Incremental update support is requested in config but no update field " 
  186        "name is specified in traits of '" +
 
  189  if (correction_.count() < 0) {
 
  190    throw std::logic_error(
 
  191        "Refusing to set forward (negative) update correction requested in " 
  196  this->StartPeriodicUpdates();
 
  199template <
class MongoCacheTraits>
 
  201  this->StopPeriodicUpdates();
 
  204template <
class MongoCacheTraits>
 
  207    const std::chrono::system_clock::time_point& last_update,
 
  208    const std::chrono::system_clock::time_point& now,
 
  209    cache::UpdateStatisticsScope& stats_scope) {
 
  212  const auto* collection = mongo_collection_;
 
  213  auto find_op = GetFindOperation(type, last_update, now, correction_);
 
  214  auto cursor = collection->Execute(find_op);
 
  215  if (type == cache::UpdateType::kIncremental && !cursor) {
 
  217    LOG_INFO() << 
"No changes in cache " << MongoCacheTraits::kName;
 
  223  auto new_cache = GetData(type);
 
  226  scope.Reset(kFetchAndParseStage);
 
  228  utils::
CpuRelax relax{cpu_relax_iterations_, &scope};
 
  229  std::size_t doc_count = 0;
 
  231  for (
const auto& doc : cursor) {
 
  236    stats_scope.IncreaseDocumentsReadCount(1);
 
  239      auto object = DeserializeObject(doc);
 
  240      auto key = (object.*MongoCacheTraits::kKeyField);
 
  242      if (type == cache::UpdateType::kIncremental ||
 
  243          new_cache->count(key) == 0) {
 
  244        (*new_cache)[key] = std::move(object);
 
  247                            << MongoCacheTraits::kName << 
", key=" << key;
 
  249    } 
catch (
const std::exception& e) {
 
  251                          << MongoCacheTraits::kName << 
", _id=" 
  252                          << doc[
"_id"].
template ConvertTo<std::string>()
 
  253                          << 
", what(): " << e;
 
  254      stats_scope.IncreaseDocumentsParseFailures(1);
 
  256      if (!MongoCacheTraits::kAreInvalidDocumentsSkipped) 
throw;
 
  260  const auto elapsed_time = scope.ElapsedTotal(kFetchAndParseStage);
 
  261  if (elapsed_time > kCpuRelaxThreshold) {
 
  262    cpu_relax_iterations_ = 
static_cast<std::size_t>(
 
  263        static_cast<
double>(doc_count) / (elapsed_time / kCpuRelaxInterval));
 
  265        "Elapsed time for updating {} {} for {} data items is over threshold. " 
  266        "Will relax CPU every {} iterations",
 
  267        kName, elapsed_time.count(), doc_count, cpu_relax_iterations_);
 
  272  const auto size = new_cache->size();
 
  273  this->Set(std::move(new_cache));
 
  274  stats_scope.Finish(size);
 
  277template <
class MongoCacheTraits>
 
  278typename MongoCacheTraits::ObjectType
 
  279MongoCache<MongoCacheTraits>::DeserializeObject(
 
  281  if constexpr (mongo_cache::impl::kHasDeserializeObject<MongoCacheTraits>) {
 
  282    return MongoCacheTraits::DeserializeObject(doc);
 
  284  if constexpr (mongo_cache::impl::kHasDefaultDeserializeObject<
 
  286    return doc.As<
typename MongoCacheTraits::ObjectType>();
 
  289              "No deserialize operation defined but DeserializeObject invoked");
 
  292template <
class MongoCacheTraits>
 
  294MongoCache<MongoCacheTraits>::GetFindOperation(
 
  296    const std::chrono::system_clock::time_point& last_update,
 
  297    const std::chrono::system_clock::time_point& now,
 
  298    const std::chrono::system_clock::duration& correction) {
 
  299  namespace bson = formats::
bson;
 
  303    if constexpr (mongo_cache::impl::kHasFindOperation<MongoCacheTraits>) {
 
  304      return MongoCacheTraits::GetFindOperation(type, last_update, now,
 
  307    if constexpr (mongo_cache::impl::kHasDefaultFindOperation<
 
  310      if constexpr (mongo_cache::impl::kHasUpdateFieldName<MongoCacheTraits>) {
 
  312          query_builder[MongoCacheTraits::kMongoUpdateFieldName] =
 
  313              bson::MakeDoc(
"$gt", last_update - correction);
 
  319                "No find operation defined but GetFindOperation invoked");
 
  322  if (MongoCacheTraits::kIsSecondaryPreferred) {
 
  328template <
class MongoCacheTraits>
 
  329std::unique_ptr<
typename MongoCacheTraits::DataType>
 
  330MongoCache<MongoCacheTraits>::GetData(cache::UpdateType type) {
 
  332    auto ptr = 
this->Get();
 
  333    return std::make_unique<
typename MongoCacheTraits::DataType>(*ptr);
 
  335    return std::make_unique<
typename MongoCacheTraits::DataType>();
 
  341std::string GetMongoCacheSchema();
 
  345template <
class MongoCacheTraits>
 
  349      impl::GetMongoCacheSchema());