userver: userver/cache/base_mongo_cache.hpp Source File
Loading...
Searching...
No Matches
base_mongo_cache.hpp
Go to the documentation of this file.
1#pragma once
2
3/// @file userver/cache/base_mongo_cache.hpp
4/// @brief @copybrief components::MongoCache
5
6#include <chrono>
7
8#include <fmt/format.h>
9
10#include <userver/cache/cache_statistics.hpp>
11#include <userver/cache/caching_component_base.hpp>
12#include <userver/cache/mongo_cache_type_traits.hpp>
13#include <userver/components/component_context.hpp>
14#include <userver/formats/bson/document.hpp>
15#include <userver/formats/bson/inline.hpp>
16#include <userver/formats/bson/value_builder.hpp>
17#include <userver/storages/mongo/collection.hpp>
18#include <userver/storages/mongo/operations.hpp>
19#include <userver/storages/mongo/options.hpp>
20#include <userver/tracing/span.hpp>
21#include <userver/utils/cpu_relax.hpp>
22#include <userver/yaml_config/merge_schemas.hpp>
23
24USERVER_NAMESPACE_BEGIN
25
26namespace components {
27
28inline const std::string kFetchAndParseStage = "fetch_and_parse";
29
30inline constexpr std::chrono::milliseconds kCpuRelaxThreshold{10};
31inline constexpr std::chrono::milliseconds kCpuRelaxInterval{2};
32
33namespace impl {
34
35std::chrono::milliseconds GetMongoCacheUpdateCorrection(const ComponentConfig&);
36
37}
38
39/// @ingroup userver_components
40///
41/// @brief %Base class for all caches polling mongo collection
42///
43/// You have to provide a traits class in order to use this.
44///
45/// For avoiding "memory leaks", see the respective section
46/// in @ref components::CachingComponentBase.
47///
48/// ## Static options of components::MongoCache :
49///
50/// @include{doc} scripts/docs/en/components_schema/mongo/src/cache/base_mongo_cache.md
51///
52/// Options inherited from @ref components::CachingComponentBase :
53/// @include{doc} scripts/docs/en/components_schema/core/src/cache/caching_component_base.md
54///
55/// Options inherited from @ref components::ComponentBase :
56/// @include{doc} scripts/docs/en/components_schema/core/src/components/impl/component_base.md
57///
58/// ## Traits example:
59/// All fields below (except for function overrides) are mandatory.
60///
61/// ```
62/// struct MongoCacheTraitsExample {
63/// // Component name for component
64/// static constexpr std::string_view kName = "mongo-dynamic-config";
65///
66/// // Collection to read from
67/// static constexpr auto kMongoCollectionsField =
68/// &storages::mongo::Collections::config;
69/// // Update field name to use for incremental update (optional).
70/// // When missing, incremental update is disabled.
71/// // Please use reference here to avoid global variables
72/// // initialization order issues.
73/// static constexpr const std::string& kMongoUpdateFieldName =
74/// mongo::db::taxi::config::kUpdated;
75///
76/// // Cache element type
77/// using ObjectType = CachedObject;
78/// // Cache element field name that is used as an index in the cache map
79/// static constexpr auto kKeyField = &CachedObject::name;
80/// // Type of kKeyField
81/// using KeyType = std::string;
82/// // Type of cache map, e.g. unordered_map, map, bimap
83/// using DataType = std::unordered_map<KeyType, ObjectType>;
84///
85/// // Whether the cache prefers to read from replica (if true, you might get stale data)
86/// static constexpr bool kIsSecondaryPreferred = true;
87///
88/// // Optional function that overrides BSON to ObjectType conversion
89/// static constexpr auto DeserializeObject = &CachedObject::FromBson;
90/// // or
91/// static ObjectType DeserializeObject(const formats::bson::Document& doc) {
92/// return doc["value"].As<ObjectType>();
93/// }
94/// // (default implementation calls doc.As<ObjectType>())
95/// // For using default implementation
96/// static constexpr bool kUseDefaultDeserializeObject = true;
97///
98/// // Optional function that overrides data retrieval operation
99/// static storages::mongo::operations::Find GetFindOperation(
100/// cache::UpdateType type,
101/// const std::chrono::system_clock::time_point& last_update,
102/// const std::chrono::system_clock::time_point& now,
103/// const std::chrono::system_clock::duration& correction) {
104/// mongo::operations::Find find_op({});
105/// find_op.SetOption(mongo::options::Projection{"key", "value"});
106/// return find_op;
107/// }
108/// // (default implementation queries kMongoUpdateFieldName: {$gt: last_update}
109/// // for incremental updates, and {} for full updates)
110/// // For using default implementation
111/// static constexpr bool kUseDefaultFindOperation = true;
112///
113/// // Whether update part of the cache even if failed to parse some documents
114/// static constexpr bool kAreInvalidDocumentsSkipped = false;
115///
116/// // Component to get the collections
117/// using MongoCollectionsComponent = components::MongoCollections;
118/// };
119/// ```
120template <class MongoCacheTraits>
121class MongoCache : public CachingComponentBase<typename MongoCacheTraits::DataType> {
122 using CollectionsType = mongo_cache::impl::CollectionsType<decltype(MongoCacheTraits::kMongoCollectionsField)>;
123
124public:
125 static constexpr std::string_view kName = MongoCacheTraits::kName;
126
127 MongoCache(const ComponentConfig&, const ComponentContext&);
128
129 static yaml_config::Schema GetStaticConfigSchema();
130
131private:
132 void Update(
133 cache::UpdateType type,
134 const std::chrono::system_clock::time_point& last_update,
135 const std::chrono::system_clock::time_point& now,
136 cache::UpdateStatisticsScope& stats_scope
137 ) override;
138
139 typename MongoCacheTraits::ObjectType DeserializeObject(const formats::bson::Document& doc) const;
140
141 storages::mongo::operations::Find GetFindOperation(
142 cache::UpdateType type,
143 const std::chrono::system_clock::time_point& last_update,
144 const std::chrono::system_clock::time_point& now,
145 const std::chrono::system_clock::duration& correction
146 );
147
148 std::unique_ptr<typename MongoCacheTraits::DataType> GetData(cache::UpdateType type);
149
150 const std::shared_ptr<CollectionsType> mongo_collections_;
151 const storages::mongo::Collection* const mongo_collection_;
152 const std::chrono::system_clock::duration correction_;
153 std::size_t cpu_relax_iterations_{0};
154};
155
156template <class MongoCacheTraits>
157inline constexpr bool kHasValidate<MongoCache<MongoCacheTraits>> = true;
158
159template <class MongoCacheTraits>
160MongoCache<MongoCacheTraits>::MongoCache(const ComponentConfig& config, const ComponentContext& context)
161 : CachingComponentBase<typename MongoCacheTraits::DataType>(config, context),
162 mongo_collections_(context.FindComponent<typename MongoCacheTraits::MongoCollectionsComponent>()
163 .template GetCollectionForLibrary<CollectionsType>()),
164 mongo_collection_(std::addressof(mongo_collections_.get()->*MongoCacheTraits::kMongoCollectionsField)),
165 correction_(impl::GetMongoCacheUpdateCorrection(config))
166{
167 [[maybe_unused]] mongo_cache::impl::CheckTraits<MongoCacheTraits> check_traits;
168
169 if (CachingComponentBase<typename MongoCacheTraits::DataType>::GetAllowedUpdateTypes() ==
170 cache::AllowedUpdateTypes::kFullAndIncremental &&
171 !mongo_cache::impl::kHasUpdateFieldName<MongoCacheTraits> &&
172 !mongo_cache::impl::kHasFindOperation<MongoCacheTraits>)
173 {
174 throw std::logic_error(fmt::format(
175 "Incremental update support is requested in config but no update field "
176 "name is specified in traits of '{}' cache",
177 components::GetCurrentComponentName(context)
178 ));
179 }
180 if (correction_.count() < 0) {
181 throw std::logic_error(fmt::format(
182 "Refusing to set forward (negative) update correction requested in "
183 "config for '{}' cache",
184 components::GetCurrentComponentName(context)
185 ));
186 }
187}
188
189template <class MongoCacheTraits>
190void MongoCache<MongoCacheTraits>::Update(
191 cache::UpdateType type,
192 const std::chrono::system_clock::time_point& last_update,
193 const std::chrono::system_clock::time_point& now,
194 cache::UpdateStatisticsScope& stats_scope
195) {
196 namespace sm = storages::mongo;
197
198 const auto* collection = mongo_collection_;
199 auto find_op = GetFindOperation(type, last_update, now, correction_);
200 auto cursor = collection->Execute(find_op);
201 if (type == cache::UpdateType::kIncremental && !cursor) {
202 // Don't touch the cache at all
203 LOG_INFO() << "No changes in cache " << MongoCacheTraits::kName;
204 stats_scope.FinishNoChanges();
205 return;
206 }
207
208 auto scope = tracing::Span::CurrentSpan().CreateScopeTime("copy_data");
209 auto new_cache = GetData(type);
210
211 // No good way to identify whether cursor accesses DB or reads buffed data
212 scope.Reset(kFetchAndParseStage);
213
214 utils::CpuRelax relax{cpu_relax_iterations_, &scope};
215 std::size_t doc_count = 0;
216
217 for (const auto& doc : cursor) {
218 ++doc_count;
219
220 relax.Relax();
221
223
224 try {
225 auto object = DeserializeObject(doc);
226 auto key = (object.*MongoCacheTraits::kKeyField);
227
228 if (type == cache::UpdateType::kIncremental || new_cache->count(key) == 0) {
229 (*new_cache)[key] = std::move(object);
230 } else {
231 LOG_LIMITED_ERROR()
232 << "Found duplicate key for 2 items in cache " << MongoCacheTraits::kName << ", key=" << key;
233 }
234 } catch (const std::exception& e) {
235 LOG_LIMITED_ERROR()
236 << "Failed to deserialize cache item of cache " << MongoCacheTraits::kName
237 << ", _id=" << doc["_id"].template ConvertTo<std::string>() << ", what(): " << e;
239
240 if (!MongoCacheTraits::kAreInvalidDocumentsSkipped) {
241 throw;
242 }
243 }
244 }
245
246 const auto elapsed_time = scope.ElapsedTotal(kFetchAndParseStage);
247 if (elapsed_time > kCpuRelaxThreshold) {
248 cpu_relax_iterations_ = static_cast<
249 std::size_t>(static_cast<double>(doc_count) / (elapsed_time / kCpuRelaxInterval));
250 LOG_TRACE() << fmt::format(
251 "Elapsed time for updating {} {} for {} data items is over threshold. "
252 "Will relax CPU every {} iterations",
253 kName,
254 elapsed_time.count(),
255 doc_count,
256 cpu_relax_iterations_
257 );
258 }
259
260 scope.Reset();
261
262 const auto size = new_cache->size();
263 this->Set(std::move(new_cache));
264 stats_scope.Finish(size);
265}
266
267template <class MongoCacheTraits>
268typename MongoCacheTraits::ObjectType MongoCache<MongoCacheTraits>::DeserializeObject(const formats::bson::Document& doc
269) const {
270 if constexpr (mongo_cache::impl::kHasDeserializeObject<MongoCacheTraits>) {
271 return MongoCacheTraits::DeserializeObject(doc);
272 }
273 if constexpr (mongo_cache::impl::kHasDefaultDeserializeObject<MongoCacheTraits>) {
274 return doc.As<typename MongoCacheTraits::ObjectType>();
275 }
276 UASSERT_MSG(false, "No deserialize operation defined but DeserializeObject invoked");
277}
278
279template <class MongoCacheTraits>
280storages::mongo::operations::Find MongoCache<MongoCacheTraits>::GetFindOperation(
281 cache::UpdateType type,
282 const std::chrono::system_clock::time_point& last_update,
283 const std::chrono::system_clock::time_point& now,
284 const std::chrono::system_clock::duration& correction
285) {
286 namespace bson = formats::bson;
287 namespace sm = storages::mongo;
288
289 auto find_op = [&]() -> sm::operations::Find {
290 if constexpr (mongo_cache::impl::kHasFindOperation<MongoCacheTraits>) {
291 return MongoCacheTraits::GetFindOperation(type, last_update, now, correction);
292 }
293 if constexpr (mongo_cache::impl::kHasDefaultFindOperation<MongoCacheTraits>) {
294 bson::ValueBuilder query_builder(bson::ValueBuilder::Type::kObject);
295 if constexpr (mongo_cache::impl::kHasUpdateFieldName<MongoCacheTraits>) {
296 if (type == cache::UpdateType::kIncremental) {
297 query_builder
298 [MongoCacheTraits::kMongoUpdateFieldName] = bson::MakeDoc("$gt", last_update - correction);
299 }
300 }
301 return sm::operations::Find(query_builder.ExtractValue());
302 }
303 UASSERT_MSG(false, "No find operation defined but GetFindOperation invoked");
304 }();
305
306 if (MongoCacheTraits::kIsSecondaryPreferred) {
308 }
309 return find_op;
310}
311
312template <class MongoCacheTraits>
313std::unique_ptr<typename MongoCacheTraits::DataType> MongoCache<MongoCacheTraits>::GetData(cache::UpdateType type) {
314 if (type == cache::UpdateType::kIncremental) {
315 auto ptr = this->Get();
316 return std::make_unique<typename MongoCacheTraits::DataType>(*ptr);
317 } else {
318 return std::make_unique<typename MongoCacheTraits::DataType>();
319 }
320}
321
322namespace impl {
323
324std::string GetMongoCacheSchema();
325
326} // namespace impl
327
328template <class MongoCacheTraits>
329yaml_config::Schema MongoCache<MongoCacheTraits>::GetStaticConfigSchema() {
330 return yaml_config::MergeSchemas<
331 CachingComponentBase<typename MongoCacheTraits::DataType>>(impl::GetMongoCacheSchema());
332}
333
334} // namespace components
335
336USERVER_NAMESPACE_END