10#include <userver/cache/cache_statistics.hpp>
11#include <userver/cache/caching_component_base.hpp>
12#include <userver/cache/mongo_cache_type_traits.hpp>
13#include <userver/components/component_context.hpp>
14#include <userver/formats/bson/document.hpp>
15#include <userver/formats/bson/inline.hpp>
16#include <userver/formats/bson/value_builder.hpp>
17#include <userver/storages/mongo/collection.hpp>
18#include <userver/storages/mongo/operations.hpp>
19#include <userver/storages/mongo/options.hpp>
20#include <userver/tracing/span.hpp>
21#include <userver/utils/cpu_relax.hpp>
22#include <userver/yaml_config/merge_schemas.hpp>
24USERVER_NAMESPACE_BEGIN
28inline const std::string kFetchAndParseStage =
"fetch_and_parse";
30inline constexpr std::chrono::milliseconds kCpuRelaxThreshold{10};
31inline constexpr std::chrono::milliseconds kCpuRelaxInterval{2};
35std::chrono::milliseconds GetMongoCacheUpdateCorrection(
const ComponentConfig&);
118template <
class MongoCacheTraits>
127 MongoCache(
const ComponentConfig&,
const ComponentContext&);
131 static yaml_config::Schema GetStaticConfigSchema();
135 const std::chrono::system_clock::time_point& last_update,
136 const std::chrono::system_clock::time_point& now,
137 cache::UpdateStatisticsScope& stats_scope)
override;
139 typename MongoCacheTraits::ObjectType DeserializeObject(
144 const std::chrono::system_clock::time_point& last_update,
145 const std::chrono::system_clock::time_point& now,
146 const std::chrono::system_clock::duration& correction);
148 std::unique_ptr<
typename MongoCacheTraits::DataType> GetData(
149 cache::UpdateType type);
151 const std::shared_ptr<CollectionsType> mongo_collections_;
152 const storages::mongo::
Collection*
const mongo_collection_;
153 const std::chrono::system_clock::duration correction_;
154 std::size_t cpu_relax_iterations_{0};
157template <
class MongoCacheTraits>
158inline constexpr bool kHasValidate<MongoCache<MongoCacheTraits>> =
true;
160template <
class MongoCacheTraits>
161MongoCache<MongoCacheTraits>::MongoCache(
const ComponentConfig& config,
162 const ComponentContext& context)
168 typename MongoCacheTraits::MongoCollectionsComponent>()
169 .
template GetCollectionForLibrary<CollectionsType>()),
170 mongo_collection_(std::addressof(
171 mongo_collections_.get()->*MongoCacheTraits::kMongoCollectionsField)),
172 correction_(impl::GetMongoCacheUpdateCorrection(config)) {
173 [[maybe_unused]] mongo_cache::impl::CheckTraits<MongoCacheTraits>
177 typename MongoCacheTraits::DataType>::GetAllowedUpdateTypes() ==
179 !mongo_cache::impl::kHasUpdateFieldName<MongoCacheTraits> &&
180 !mongo_cache::impl::kHasFindOperation<MongoCacheTraits>) {
181 throw std::logic_error(
182 "Incremental update support is requested in config but no update field "
183 "name is specified in traits of '" +
186 if (correction_.count() < 0) {
187 throw std::logic_error(
188 "Refusing to set forward (negative) update correction requested in "
193 this->StartPeriodicUpdates();
196template <
class MongoCacheTraits>
198 this->StopPeriodicUpdates();
201template <
class MongoCacheTraits>
204 const std::chrono::system_clock::time_point& last_update,
205 const std::chrono::system_clock::time_point& now,
206 cache::UpdateStatisticsScope& stats_scope) {
207 namespace sm = storages::mongo;
209 const auto* collection = mongo_collection_;
210 auto find_op = GetFindOperation(type, last_update, now, correction_);
211 auto cursor = collection->Execute(find_op);
212 if (type == cache::UpdateType::kIncremental && !cursor) {
214 LOG_INFO() <<
"No changes in cache " << MongoCacheTraits::kName;
220 auto new_cache = GetData(type);
223 scope.Reset(kFetchAndParseStage);
226 std::size_t doc_count = 0;
228 for (
const auto& doc : cursor) {
233 stats_scope.IncreaseDocumentsReadCount(1);
236 auto object = DeserializeObject(doc);
237 auto key = (object.*MongoCacheTraits::kKeyField);
239 if (type == cache::UpdateType::kIncremental ||
240 new_cache->count(key) == 0) {
241 (*new_cache)[key] = std::move(object);
244 << MongoCacheTraits::kName <<
", key=" << key;
246 }
catch (
const std::exception& e) {
248 << MongoCacheTraits::kName <<
", _id="
249 << doc[
"_id"].
template ConvertTo<std::string>()
250 <<
", what(): " << e;
251 stats_scope.IncreaseDocumentsParseFailures(1);
253 if (!MongoCacheTraits::kAreInvalidDocumentsSkipped)
throw;
257 const auto elapsed_time = scope.ElapsedTotal(kFetchAndParseStage);
258 if (elapsed_time > kCpuRelaxThreshold) {
259 cpu_relax_iterations_ =
static_cast<std::size_t>(
260 static_cast<
double>(doc_count) / (elapsed_time / kCpuRelaxInterval));
262 "Elapsed time for updating {} {} for {} data items is over threshold. "
263 "Will relax CPU every {} iterations",
264 kName, elapsed_time.count(), doc_count, cpu_relax_iterations_);
269 const auto size = new_cache->size();
270 this->Set(std::move(new_cache));
271 stats_scope.Finish(size);
274template <
class MongoCacheTraits>
275typename MongoCacheTraits::ObjectType
276MongoCache<MongoCacheTraits>::DeserializeObject(
278 if constexpr (mongo_cache::impl::kHasDeserializeObject<MongoCacheTraits>) {
279 return MongoCacheTraits::DeserializeObject(doc);
281 if constexpr (mongo_cache::impl::kHasDefaultDeserializeObject<
283 return doc.As<
typename MongoCacheTraits::ObjectType>();
286 "No deserialize operation defined but DeserializeObject invoked");
289template <
class MongoCacheTraits>
291MongoCache<MongoCacheTraits>::GetFindOperation(
293 const std::chrono::system_clock::time_point& last_update,
294 const std::chrono::system_clock::time_point& now,
295 const std::chrono::system_clock::duration& correction) {
296 namespace bson = formats::
bson;
297 namespace sm = storages::mongo;
300 if constexpr (mongo_cache::impl::kHasFindOperation<MongoCacheTraits>) {
301 return MongoCacheTraits::GetFindOperation(type, last_update, now,
304 if constexpr (mongo_cache::impl::kHasDefaultFindOperation<
307 if constexpr (mongo_cache::impl::kHasUpdateFieldName<MongoCacheTraits>) {
309 query_builder[MongoCacheTraits::kMongoUpdateFieldName] =
310 bson::MakeDoc(
"$gt", last_update - correction);
316 "No find operation defined but GetFindOperation invoked");
319 if (MongoCacheTraits::kIsSecondaryPreferred) {
325template <
class MongoCacheTraits>
326std::unique_ptr<
typename MongoCacheTraits::DataType>
327MongoCache<MongoCacheTraits>::GetData(cache::UpdateType type) {
329 auto ptr =
this->Get();
330 return std::make_unique<
typename MongoCacheTraits::DataType>(*ptr);
332 return std::make_unique<
typename MongoCacheTraits::DataType>();
338std::string GetMongoCacheSchema();
342template <
class MongoCacheTraits>
346 impl::GetMongoCacheSchema());