userver: userver/cache/base_mongo_cache.hpp Source File
⚠️ This is the documentation for an old userver version. Click here to switch to the latest version.
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages Concepts
base_mongo_cache.hpp
Go to the documentation of this file.
1#pragma once
2
3/// @file userver/cache/base_mongo_cache.hpp
4/// @brief @copybrief components::MongoCache
5
6#include <chrono>
7
8#include <fmt/format.h>
9
10#include <userver/cache/cache_statistics.hpp>
11#include <userver/cache/caching_component_base.hpp>
12#include <userver/cache/mongo_cache_type_traits.hpp>
13#include <userver/components/component_context.hpp>
14#include <userver/formats/bson/document.hpp>
15#include <userver/formats/bson/inline.hpp>
16#include <userver/formats/bson/value_builder.hpp>
17#include <userver/storages/mongo/collection.hpp>
18#include <userver/storages/mongo/operations.hpp>
19#include <userver/storages/mongo/options.hpp>
20#include <userver/tracing/span.hpp>
21#include <userver/utils/cpu_relax.hpp>
22#include <userver/yaml_config/merge_schemas.hpp>
23
24USERVER_NAMESPACE_BEGIN
25
26namespace components {
27
28inline const std::string kFetchAndParseStage = "fetch_and_parse";
29
30inline constexpr std::chrono::milliseconds kCpuRelaxThreshold{10};
31inline constexpr std::chrono::milliseconds kCpuRelaxInterval{2};
32
33namespace impl {
34
35std::chrono::milliseconds GetMongoCacheUpdateCorrection(const ComponentConfig&);
36
37}
38
39// clang-format off
40
41/// @ingroup userver_components
42///
43/// @brief %Base class for all caches polling mongo collection
44///
45/// You have to provide a traits class in order to use this.
46///
47/// ### Avoiding memory leaks
48/// See components::CachingComponentBase
49///
50/// ## Static options:
51/// All options of CachingComponentBase and
52/// Name | Description | Default value
53/// ---- | ----------- | -------------
54/// update-correction | adjusts incremental updates window to overlap with previous update | 0
55///
56/// ## Traits example:
57/// All fields below (except for function overrides) are mandatory.
58///
59/// ```
60/// struct MongoCacheTraitsExample {
61/// // Component name for component
62/// static constexpr std::string_view kName = "mongo-dynamic-config";
63///
64/// // Collection to read from
65/// static constexpr auto kMongoCollectionsField =
66/// &storages::mongo::Collections::config;
67/// // Update field name to use for incremental update (optional).
68/// // When missing, incremental update is disabled.
69/// // Please use reference here to avoid global variables
70/// // initialization order issues.
71/// static constexpr const std::string& kMongoUpdateFieldName =
72/// mongo::db::taxi::config::kUpdated;
73///
74/// // Cache element type
75/// using ObjectType = CachedObject;
76/// // Cache element field name that is used as an index in the cache map
77/// static constexpr auto kKeyField = &CachedObject::name;
78/// // Type of kKeyField
79/// using KeyType = std::string;
80/// // Type of cache map, e.g. unordered_map, map, bimap
81/// using DataType = std::unordered_map<KeyType, ObjectType>;
82///
83/// // Whether the cache prefers to read from replica (if true, you might get stale data)
84/// static constexpr bool kIsSecondaryPreferred = true;
85///
86/// // Optional function that overrides BSON to ObjectType conversion
87/// static constexpr auto DeserializeObject = &CachedObject::FromBson;
88/// // or
89/// static ObjectType DeserializeObject(const formats::bson::Document& doc) {
90/// return doc["value"].As<ObjectType>();
91/// }
92/// // (default implementation calls doc.As<ObjectType>())
93/// // For using default implementation
94/// static constexpr bool kUseDefaultDeserializeObject = true;
95///
96/// // Optional function that overrides data retrieval operation
97/// static storages::mongo::operations::Find GetFindOperation(
98/// cache::UpdateType type,
99/// const std::chrono::system_clock::time_point& last_update,
100/// const std::chrono::system_clock::time_point& now,
101/// const std::chrono::system_clock::duration& correction) {
102/// mongo::operations::Find find_op({});
103/// find_op.SetOption(mongo::options::Projection{"key", "value"});
104/// return find_op;
105/// }
106/// // (default implementation queries kMongoUpdateFieldName: {$gt: last_update}
107/// // for incremental updates, and {} for full updates)
108/// // For using default implementation
109/// static constexpr bool kUseDefaultFindOperation = true;
110///
111/// // Whether update part of the cache even if failed to parse some documents
112/// static constexpr bool kAreInvalidDocumentsSkipped = false;
113///
114/// // Component to get the collections
115/// using MongoCollectionsComponent = components::MongoCollections;
116/// };
117/// ```
118
119// clang-format on
120
121template <class MongoCacheTraits>
123 : public CachingComponentBase<typename MongoCacheTraits::DataType> {
126
127 public:
128 static constexpr std::string_view kName = MongoCacheTraits::kName;
129
130 MongoCache(const ComponentConfig&, const ComponentContext&);
131
132 ~MongoCache();
133
134 static yaml_config::Schema GetStaticConfigSchema();
135
136 private:
137 void Update(cache::UpdateType type,
138 const std::chrono::system_clock::time_point& last_update,
139 const std::chrono::system_clock::time_point& now,
140 cache::UpdateStatisticsScope& stats_scope) override;
141
142 typename MongoCacheTraits::ObjectType DeserializeObject(
143 const formats::bson::Document& doc) const;
144
145 storages::mongo::operations::Find GetFindOperation(
146 cache::UpdateType type,
147 const std::chrono::system_clock::time_point& last_update,
148 const std::chrono::system_clock::time_point& now,
149 const std::chrono::system_clock::duration& correction);
150
151 std::unique_ptr<typename MongoCacheTraits::DataType> GetData(
152 cache::UpdateType type);
153
154 const std::shared_ptr<CollectionsType> mongo_collections_;
155 const storages::mongo::Collection* const mongo_collection_;
156 const std::chrono::system_clock::duration correction_;
157 std::size_t cpu_relax_iterations_{0};
158};
159
160template <class MongoCacheTraits>
161inline constexpr bool kHasValidate<MongoCache<MongoCacheTraits>> = true;
162
163template <class MongoCacheTraits>
164MongoCache<MongoCacheTraits>::MongoCache(const ComponentConfig& config,
165 const ComponentContext& context)
166 : CachingComponentBase<typename MongoCacheTraits::DataType>(config,
167 context),
168 mongo_collections_(
169 context
170 .FindComponent<
171 typename MongoCacheTraits::MongoCollectionsComponent>()
172 .template GetCollectionForLibrary<CollectionsType>()),
173 mongo_collection_(std::addressof(
174 mongo_collections_.get()->*MongoCacheTraits::kMongoCollectionsField)),
175 correction_(impl::GetMongoCacheUpdateCorrection(config)) {
176 [[maybe_unused]] mongo_cache::impl::CheckTraits<MongoCacheTraits>
177 check_traits;
178
180 typename MongoCacheTraits::DataType>::GetAllowedUpdateTypes() ==
181 cache::AllowedUpdateTypes::kFullAndIncremental &&
182 !mongo_cache::impl::kHasUpdateFieldName<MongoCacheTraits> &&
183 !mongo_cache::impl::kHasFindOperation<MongoCacheTraits>) {
184 throw std::logic_error(
185 "Incremental update support is requested in config but no update field "
186 "name is specified in traits of '" +
187 components::GetCurrentComponentName(config) + "' cache");
188 }
189 if (correction_.count() < 0) {
190 throw std::logic_error(
191 "Refusing to set forward (negative) update correction requested in "
192 "config for '" +
193 components::GetCurrentComponentName(config) + "' cache");
194 }
195
196 this->StartPeriodicUpdates();
197}
198
199template <class MongoCacheTraits>
200MongoCache<MongoCacheTraits>::~MongoCache() {
201 this->StopPeriodicUpdates();
202}
203
204template <class MongoCacheTraits>
205void MongoCache<MongoCacheTraits>::Update(
206 cache::UpdateType type,
207 const std::chrono::system_clock::time_point& last_update,
208 const std::chrono::system_clock::time_point& now,
209 cache::UpdateStatisticsScope& stats_scope) {
210 namespace sm = storages::mongo;
211
212 const auto* collection = mongo_collection_;
213 auto find_op = GetFindOperation(type, last_update, now, correction_);
214 auto cursor = collection->Execute(find_op);
215 if (type == cache::UpdateType::kIncremental && !cursor) {
216 // Don't touch the cache at all
217 LOG_INFO() << "No changes in cache " << MongoCacheTraits::kName;
218 stats_scope.FinishNoChanges();
219 return;
220 }
221
222 auto scope = tracing::Span::CurrentSpan().CreateScopeTime("copy_data");
223 auto new_cache = GetData(type);
224
225 // No good way to identify whether cursor accesses DB or reads buffed data
226 scope.Reset(kFetchAndParseStage);
227
228 utils::CpuRelax relax{cpu_relax_iterations_, &scope};
229 std::size_t doc_count = 0;
230
231 for (const auto& doc : cursor) {
232 ++doc_count;
233
234 relax.Relax();
235
236 stats_scope.IncreaseDocumentsReadCount(1);
237
238 try {
239 auto object = DeserializeObject(doc);
240 auto key = (object.*MongoCacheTraits::kKeyField);
241
242 if (type == cache::UpdateType::kIncremental ||
243 new_cache->count(key) == 0) {
244 (*new_cache)[key] = std::move(object);
245 } else {
246 LOG_LIMITED_ERROR() << "Found duplicate key for 2 items in cache "
247 << MongoCacheTraits::kName << ", key=" << key;
248 }
249 } catch (const std::exception& e) {
250 LOG_LIMITED_ERROR() << "Failed to deserialize cache item of cache "
251 << MongoCacheTraits::kName << ", _id="
252 << doc["_id"].template ConvertTo<std::string>()
253 << ", what(): " << e;
254 stats_scope.IncreaseDocumentsParseFailures(1);
255
256 if (!MongoCacheTraits::kAreInvalidDocumentsSkipped) throw;
257 }
258 }
259
260 const auto elapsed_time = scope.ElapsedTotal(kFetchAndParseStage);
261 if (elapsed_time > kCpuRelaxThreshold) {
262 cpu_relax_iterations_ = static_cast<std::size_t>(
263 static_cast<double>(doc_count) / (elapsed_time / kCpuRelaxInterval));
264 LOG_TRACE() << fmt::format(
265 "Elapsed time for updating {} {} for {} data items is over threshold. "
266 "Will relax CPU every {} iterations",
267 kName, elapsed_time.count(), doc_count, cpu_relax_iterations_);
268 }
269
270 scope.Reset();
271
272 const auto size = new_cache->size();
273 this->Set(std::move(new_cache));
274 stats_scope.Finish(size);
275}
276
277template <class MongoCacheTraits>
278typename MongoCacheTraits::ObjectType
279MongoCache<MongoCacheTraits>::DeserializeObject(
280 const formats::bson::Document& doc) const {
281 if constexpr (mongo_cache::impl::kHasDeserializeObject<MongoCacheTraits>) {
282 return MongoCacheTraits::DeserializeObject(doc);
283 }
284 if constexpr (mongo_cache::impl::kHasDefaultDeserializeObject<
285 MongoCacheTraits>) {
286 return doc.As<typename MongoCacheTraits::ObjectType>();
287 }
288 UASSERT_MSG(false,
289 "No deserialize operation defined but DeserializeObject invoked");
290}
291
292template <class MongoCacheTraits>
293storages::mongo::operations::Find
294MongoCache<MongoCacheTraits>::GetFindOperation(
295 cache::UpdateType type,
296 const std::chrono::system_clock::time_point& last_update,
297 const std::chrono::system_clock::time_point& now,
298 const std::chrono::system_clock::duration& correction) {
299 namespace bson = formats::bson;
300 namespace sm = storages::mongo;
301
302 auto find_op = [&]() -> sm::operations::Find {
303 if constexpr (mongo_cache::impl::kHasFindOperation<MongoCacheTraits>) {
304 return MongoCacheTraits::GetFindOperation(type, last_update, now,
305 correction);
306 }
307 if constexpr (mongo_cache::impl::kHasDefaultFindOperation<
308 MongoCacheTraits>) {
309 bson::ValueBuilder query_builder(bson::ValueBuilder::Type::kObject);
310 if constexpr (mongo_cache::impl::kHasUpdateFieldName<MongoCacheTraits>) {
311 if (type == cache::UpdateType::kIncremental) {
312 query_builder[MongoCacheTraits::kMongoUpdateFieldName] =
313 bson::MakeDoc("$gt", last_update - correction);
314 }
315 }
316 return sm::operations::Find(query_builder.ExtractValue());
317 }
318 UASSERT_MSG(false,
319 "No find operation defined but GetFindOperation invoked");
320 }();
321
322 if (MongoCacheTraits::kIsSecondaryPreferred) {
324 }
325 return find_op;
326}
327
328template <class MongoCacheTraits>
329std::unique_ptr<typename MongoCacheTraits::DataType>
330MongoCache<MongoCacheTraits>::GetData(cache::UpdateType type) {
331 if (type == cache::UpdateType::kIncremental) {
332 auto ptr = this->Get();
333 return std::make_unique<typename MongoCacheTraits::DataType>(*ptr);
334 } else {
335 return std::make_unique<typename MongoCacheTraits::DataType>();
336 }
337}
338
339namespace impl {
340
341std::string GetMongoCacheSchema();
342
343} // namespace impl
344
345template <class MongoCacheTraits>
346yaml_config::Schema MongoCache<MongoCacheTraits>::GetStaticConfigSchema() {
347 return yaml_config::MergeSchemas<
348 CachingComponentBase<typename MongoCacheTraits::DataType>>(
349 impl::GetMongoCacheSchema());
350}
351
352} // namespace components
353
354USERVER_NAMESPACE_END