10#include <userver/cache/cache_statistics.hpp>
11#include <userver/cache/caching_component_base.hpp>
12#include <userver/cache/mongo_cache_type_traits.hpp>
13#include <userver/components/component_context.hpp>
14#include <userver/formats/bson/document.hpp>
15#include <userver/formats/bson/inline.hpp>
16#include <userver/formats/bson/value_builder.hpp>
17#include <userver/storages/mongo/collection.hpp>
18#include <userver/storages/mongo/operations.hpp>
19#include <userver/storages/mongo/options.hpp>
20#include <userver/tracing/span.hpp>
21#include <userver/utils/cpu_relax.hpp>
22#include <userver/yaml_config/merge_schemas.hpp>
24USERVER_NAMESPACE_BEGIN
28inline const std::string kFetchAndParseStage =
"fetch_and_parse";
30inline constexpr std::chrono::milliseconds kCpuRelaxThreshold{10};
31inline constexpr std::chrono::milliseconds kCpuRelaxInterval{2};
35std::chrono::milliseconds GetMongoCacheUpdateCorrection(
const ComponentConfig&);
120template <
class MongoCacheTraits>
122 using CollectionsType = mongo_cache::impl::CollectionsType<
decltype(MongoCacheTraits::kMongoCollectionsField)>;
125 static constexpr std::string_view kName = MongoCacheTraits::kName;
127 MongoCache(
const ComponentConfig&,
const ComponentContext&);
129 static yaml_config::Schema GetStaticConfigSchema();
133 cache::UpdateType type,
134 const std::chrono::system_clock::time_point& last_update,
135 const std::chrono::system_clock::time_point& now,
136 cache::UpdateStatisticsScope& stats_scope
139 typename MongoCacheTraits::ObjectType DeserializeObject(
const formats::
bson::
Document& doc)
const;
142 cache::UpdateType type,
143 const std::chrono::system_clock::time_point& last_update,
144 const std::chrono::system_clock::time_point& now,
145 const std::chrono::system_clock::duration& correction
148 std::unique_ptr<
typename MongoCacheTraits::DataType> GetData(cache::UpdateType type);
150 const std::shared_ptr<CollectionsType> mongo_collections_;
151 const storages::mongo::
Collection*
const mongo_collection_;
152 const std::chrono::system_clock::duration correction_;
153 std::size_t cpu_relax_iterations_{0};
156template <
class MongoCacheTraits>
157inline constexpr bool kHasValidate<MongoCache<MongoCacheTraits>> =
true;
159template <
class MongoCacheTraits>
160MongoCache<MongoCacheTraits>::MongoCache(
const ComponentConfig& config,
const ComponentContext& context)
162 mongo_collections_(context.FindComponent<
typename MongoCacheTraits::MongoCollectionsComponent>()
163 .
template GetCollectionForLibrary<CollectionsType>()),
164 mongo_collection_(std::addressof(mongo_collections_.get()->*MongoCacheTraits::kMongoCollectionsField)),
165 correction_(impl::GetMongoCacheUpdateCorrection(config))
167 [[maybe_unused]] mongo_cache::impl::CheckTraits<MongoCacheTraits> check_traits;
170 cache::AllowedUpdateTypes::kFullAndIncremental &&
171 !mongo_cache::impl::kHasUpdateFieldName<MongoCacheTraits> &&
172 !mongo_cache::impl::kHasFindOperation<MongoCacheTraits>)
174 throw std::logic_error(fmt::format(
175 "Incremental update support is requested in config but no update field "
176 "name is specified in traits of '{}' cache",
177 components::GetCurrentComponentName(context)
180 if (correction_.count() < 0) {
181 throw std::logic_error(fmt::format(
182 "Refusing to set forward (negative) update correction requested in "
183 "config for '{}' cache",
184 components::GetCurrentComponentName(context)
189template <
class MongoCacheTraits>
191 cache::UpdateType type,
192 const std::chrono::system_clock::time_point& last_update,
193 const std::chrono::system_clock::time_point& now,
194 cache::UpdateStatisticsScope& stats_scope
196 namespace sm = storages::mongo;
198 const auto* collection = mongo_collection_;
199 auto find_op = GetFindOperation(type, last_update, now, correction_);
200 auto cursor = collection->Execute(find_op);
201 if (type == cache::UpdateType::kIncremental && !cursor) {
203 LOG_INFO() <<
"No changes in cache " << MongoCacheTraits::kName;
208 auto scope = tracing::Span::CurrentSpan().CreateScopeTime(
"copy_data");
209 auto new_cache = GetData(type);
212 scope.Reset(kFetchAndParseStage);
214 utils::
CpuRelax relax
{cpu_relax_iterations_
, &scope
};
215 std::size_t doc_count = 0;
217 for (
const auto& doc : cursor) {
225 auto object = DeserializeObject(doc);
226 auto key = (object.*MongoCacheTraits::kKeyField);
228 if (type == cache::UpdateType::kIncremental || new_cache->count(key) == 0) {
229 (*new_cache)[key] = std::move(object);
232 <<
"Found duplicate key for 2 items in cache " << MongoCacheTraits::kName <<
", key=" << key;
234 }
catch (
const std::exception& e) {
236 <<
"Failed to deserialize cache item of cache " << MongoCacheTraits::kName
237 <<
", _id=" << doc[
"_id"].
template ConvertTo<std::string>() <<
", what(): " << e;
240 if (!MongoCacheTraits::kAreInvalidDocumentsSkipped) {
246 const auto elapsed_time = scope.ElapsedTotal(kFetchAndParseStage);
247 if (elapsed_time > kCpuRelaxThreshold) {
248 cpu_relax_iterations_ =
static_cast<
249 std::size_t>(
static_cast<
double>(doc_count) / (elapsed_time / kCpuRelaxInterval));
250 LOG_TRACE() << fmt::format(
251 "Elapsed time for updating {} {} for {} data items is over threshold. "
252 "Will relax CPU every {} iterations",
254 elapsed_time.count(),
256 cpu_relax_iterations_
262 const auto size = new_cache->size();
263 this->Set(std::move(new_cache));
267template <
class MongoCacheTraits>
268typename MongoCacheTraits::ObjectType
MongoCache<MongoCacheTraits>::DeserializeObject(
const formats::
bson::
Document& doc
270 if constexpr (mongo_cache::impl::kHasDeserializeObject<MongoCacheTraits>) {
271 return MongoCacheTraits::DeserializeObject(doc);
273 if constexpr (mongo_cache::impl::kHasDefaultDeserializeObject<MongoCacheTraits>) {
274 return doc.As<
typename MongoCacheTraits::ObjectType>();
276 UASSERT_MSG(
false,
"No deserialize operation defined but DeserializeObject invoked");
279template <
class MongoCacheTraits>
281 cache::UpdateType type,
282 const std::chrono::system_clock::time_point& last_update,
283 const std::chrono::system_clock::time_point& now,
284 const std::chrono::system_clock::duration& correction
286 namespace bson = formats::
bson;
287 namespace sm = storages::mongo;
290 if constexpr (mongo_cache::impl::kHasFindOperation<MongoCacheTraits>) {
291 return MongoCacheTraits::GetFindOperation(type, last_update, now, correction);
293 if constexpr (mongo_cache::impl::kHasDefaultFindOperation<MongoCacheTraits>) {
295 if constexpr (mongo_cache::impl::kHasUpdateFieldName<MongoCacheTraits>) {
296 if (type == cache::UpdateType::kIncremental) {
298 [MongoCacheTraits::kMongoUpdateFieldName] = bson
::MakeDoc("$gt", last_update - correction
);
303 UASSERT_MSG(
false,
"No find operation defined but GetFindOperation invoked");
306 if (MongoCacheTraits::kIsSecondaryPreferred) {
312template <
class MongoCacheTraits>
313std::unique_ptr<
typename MongoCacheTraits::DataType>
MongoCache<MongoCacheTraits>::GetData(cache::UpdateType type) {
314 if (type == cache::UpdateType::kIncremental) {
315 auto ptr =
this->Get();
316 return std::make_unique<
typename MongoCacheTraits::DataType>(*ptr);
318 return std::make_unique<
typename MongoCacheTraits::DataType>();
324std::string GetMongoCacheSchema();
328template <
class MongoCacheTraits>
329yaml_config::Schema
MongoCache<MongoCacheTraits>::GetStaticConfigSchema() {
330 return yaml_config::MergeSchemas<