userver: userver/cache/base_mongo_cache.hpp Source File
Loading...
Searching...
No Matches
base_mongo_cache.hpp
Go to the documentation of this file.
1#pragma once
2
3/// @file userver/cache/base_mongo_cache.hpp
4/// @brief @copybrief components::MongoCache
5
6#include <chrono>
7
8#include <fmt/format.h>
9
10#include <userver/cache/cache_statistics.hpp>
11#include <userver/cache/caching_component_base.hpp>
12#include <userver/cache/mongo_cache_type_traits.hpp>
13#include <userver/components/component_context.hpp>
14#include <userver/formats/bson/document.hpp>
15#include <userver/formats/bson/inline.hpp>
16#include <userver/formats/bson/value_builder.hpp>
17#include <userver/storages/mongo/collection.hpp>
18#include <userver/storages/mongo/operations.hpp>
19#include <userver/storages/mongo/options.hpp>
20#include <userver/tracing/span.hpp>
21#include <userver/utils/cpu_relax.hpp>
22#include <userver/yaml_config/merge_schemas.hpp>
23
24USERVER_NAMESPACE_BEGIN
25
26namespace components {
27
28inline const std::string kFetchAndParseStage = "fetch_and_parse";
29
30inline constexpr std::chrono::milliseconds kCpuRelaxThreshold{10};
31inline constexpr std::chrono::milliseconds kCpuRelaxInterval{2};
32
33namespace impl {
34
35std::chrono::milliseconds GetMongoCacheUpdateCorrection(const ComponentConfig&);
36
37}
38
39// clang-format off
40
41/// @ingroup userver_components
42///
43/// @brief %Base class for all caches polling mongo collection
44///
45/// You have to provide a traits class in order to use this.
46///
47/// ### Avoiding memory leaks
48/// See components::CachingComponentBase
49///
50/// ## Static options:
51/// All options of CachingComponentBase and
52/// Name | Description | Default value
53/// ---- | ----------- | -------------
54/// update-correction | adjusts incremental updates window to overlap with previous update | 0
55///
56/// ## Traits example:
57/// All fields below (except for function overrides) are mandatory.
58///
59/// ```
60/// struct MongoCacheTraitsExample {
61/// // Component name for component
62/// static constexpr std::string_view kName = "mongo-dynamic-config";
63///
64/// // Collection to read from
65/// static constexpr auto kMongoCollectionsField =
66/// &storages::mongo::Collections::config;
67/// // Update field name to use for incremental update (optional).
68/// // When missing, incremental update is disabled.
69/// // Please use reference here to avoid global variables
70/// // initialization order issues.
71/// static constexpr const std::string& kMongoUpdateFieldName =
72/// mongo::db::taxi::config::kUpdated;
73///
74/// // Cache element type
75/// using ObjectType = CachedObject;
76/// // Cache element field name that is used as an index in the cache map
77/// static constexpr auto kKeyField = &CachedObject::name;
78/// // Type of kKeyField
79/// using KeyType = std::string;
80/// // Type of cache map, e.g. unordered_map, map, bimap
81/// using DataType = std::unordered_map<KeyType, ObjectType>;
82///
83/// // Whether the cache prefers to read from replica (if true, you might get stale data)
84/// static constexpr bool kIsSecondaryPreferred = true;
85///
86/// // Optional function that overrides BSON to ObjectType conversion
87/// static constexpr auto DeserializeObject = &CachedObject::FromBson;
88/// // or
89/// static ObjectType DeserializeObject(const formats::bson::Document& doc) {
90/// return doc["value"].As<ObjectType>();
91/// }
92/// // (default implementation calls doc.As<ObjectType>())
93/// // For using default implementation
94/// static constexpr bool kUseDefaultDeserializeObject = true;
95///
96/// // Optional function that overrides data retrieval operation
97/// static storages::mongo::operations::Find GetFindOperation(
98/// cache::UpdateType type,
99/// const std::chrono::system_clock::time_point& last_update,
100/// const std::chrono::system_clock::time_point& now,
101/// const std::chrono::system_clock::duration& correction) {
102/// mongo::operations::Find find_op({});
103/// find_op.SetOption(mongo::options::Projection{"key", "value"});
104/// return find_op;
105/// }
106/// // (default implementation queries kMongoUpdateFieldName: {$gt: last_update}
107/// // for incremental updates, and {} for full updates)
108/// // For using default implementation
109/// static constexpr bool kUseDefaultFindOperation = true;
110///
111/// // Whether update part of the cache even if failed to parse some documents
112/// static constexpr bool kAreInvalidDocumentsSkipped = false;
113///
114/// // Component to get the collections
115/// using MongoCollectionsComponent = components::MongoCollections;
116/// };
117/// ```
118
119// clang-format on
120
121template <class MongoCacheTraits>
123 : public CachingComponentBase<typename MongoCacheTraits::DataType> {
126
127 public:
128 static constexpr std::string_view kName = MongoCacheTraits::kName;
129
130 MongoCache(const ComponentConfig&, const ComponentContext&);
131
132 ~MongoCache();
133
134 static yaml_config::Schema GetStaticConfigSchema();
135
136 private:
137 void Update(cache::UpdateType type,
138 const std::chrono::system_clock::time_point& last_update,
139 const std::chrono::system_clock::time_point& now,
140 cache::UpdateStatisticsScope& stats_scope) override;
141
142 typename MongoCacheTraits::ObjectType DeserializeObject(
143 const formats::bson::Document& doc) const;
144
145 storages::mongo::operations::Find GetFindOperation(
146 cache::UpdateType type,
147 const std::chrono::system_clock::time_point& last_update,
148 const std::chrono::system_clock::time_point& now,
149 const std::chrono::system_clock::duration& correction);
150
151 std::unique_ptr<typename MongoCacheTraits::DataType> GetData(
152 cache::UpdateType type);
153
154 const std::shared_ptr<CollectionsType> mongo_collections_;
155 const storages::mongo::Collection* const mongo_collection_;
156 const std::chrono::system_clock::duration correction_;
157 std::size_t cpu_relax_iterations_{0};
158};
159
160template <class MongoCacheTraits>
161inline constexpr bool kHasValidate<MongoCache<MongoCacheTraits>> = true;
162
163template <class MongoCacheTraits>
164MongoCache<MongoCacheTraits>::MongoCache(const ComponentConfig& config,
165 const ComponentContext& context)
166 : CachingComponentBase<typename MongoCacheTraits::DataType>(config,
167 context),
168 mongo_collections_(
169 context
170 .FindComponent<
171 typename MongoCacheTraits::MongoCollectionsComponent>()
172 .template GetCollectionForLibrary<CollectionsType>()),
173 mongo_collection_(std::addressof(
174 mongo_collections_.get()->*MongoCacheTraits::kMongoCollectionsField)),
175 correction_(impl::GetMongoCacheUpdateCorrection(config)) {
176 [[maybe_unused]] mongo_cache::impl::CheckTraits<MongoCacheTraits>
177 check_traits;
178
180 typename MongoCacheTraits::DataType>::GetAllowedUpdateTypes() ==
181 cache::AllowedUpdateTypes::kFullAndIncremental &&
182 !mongo_cache::impl::kHasUpdateFieldName<MongoCacheTraits> &&
183 !mongo_cache::impl::kHasFindOperation<MongoCacheTraits>) {
184 throw std::logic_error(
185 "Incremental update support is requested in config but no update field "
186 "name is specified in traits of '" +
187 components::GetCurrentComponentName(config) + "' cache");
188 }
189 if (correction_.count() < 0) {
190 throw std::logic_error(
191 "Refusing to set forward (negative) update correction requested in "
192 "config for '" +
193 components::GetCurrentComponentName(config) + "' cache");
194 }
195
196 this->StartPeriodicUpdates();
197}
198
199template <class MongoCacheTraits>
200MongoCache<MongoCacheTraits>::~MongoCache() {
201 this->StopPeriodicUpdates();
202}
203
204template <class MongoCacheTraits>
205void MongoCache<MongoCacheTraits>::Update(
206 cache::UpdateType type,
207 const std::chrono::system_clock::time_point& last_update,
208 const std::chrono::system_clock::time_point& now,
209 cache::UpdateStatisticsScope& stats_scope) {
210 namespace sm = storages::mongo;
211
212 const auto* collection = mongo_collection_;
213 auto find_op = GetFindOperation(type, last_update, now, correction_);
214 auto cursor = collection->Execute(find_op);
215 if (type == cache::UpdateType::kIncremental && !cursor) {
216 // Don't touch the cache at all
217 LOG_INFO() << "No changes in cache " << MongoCacheTraits::kName;
218 stats_scope.FinishNoChanges();
219 return;
220 }
221
222 auto scope = tracing::Span::CurrentSpan().CreateScopeTime("copy_data");
223 auto new_cache = GetData(type);
224
225 // No good way to identify whether cursor accesses DB or reads buffed data
226 scope.Reset(kFetchAndParseStage);
227
228 utils::CpuRelax relax{cpu_relax_iterations_, &scope};
229 std::size_t doc_count = 0;
230
231 for (const auto& doc : cursor) {
232 ++doc_count;
233
234 relax.Relax();
235
236 stats_scope.IncreaseDocumentsReadCount(1);
237
238 try {
239 auto object = DeserializeObject(doc);
240 auto key = (object.*MongoCacheTraits::kKeyField);
241
242 if (type == cache::UpdateType::kIncremental ||
243 new_cache->count(key) == 0) {
244 (*new_cache)[key] = std::move(object);
245 } else {
246 LOG_LIMITED_ERROR() << "Found duplicate key for 2 items in cache "
247 << MongoCacheTraits::kName << ", key=" << key;
248 }
249 } catch (const std::exception& e) {
250 LOG_LIMITED_ERROR() << "Failed to deserialize cache item of cache "
251 << MongoCacheTraits::kName << ", _id="
252 << doc["_id"].template ConvertTo<std::string>()
253 << ", what(): " << e;
254 stats_scope.IncreaseDocumentsParseFailures(1);
255
256 if (!MongoCacheTraits::kAreInvalidDocumentsSkipped) throw;
257 }
258 }
259
260 const auto elapsed_time = scope.ElapsedTotal(kFetchAndParseStage);
261 if (elapsed_time > kCpuRelaxThreshold) {
262 cpu_relax_iterations_ = static_cast<std::size_t>(
263 static_cast<double>(doc_count) / (elapsed_time / kCpuRelaxInterval));
264 LOG_TRACE() << fmt::format(
265 "Elapsed time for updating {} {} for {} data items is over threshold. "
266 "Will relax CPU every {} iterations",
267 kName, elapsed_time.count(), doc_count, cpu_relax_iterations_);
268 }
269
270 scope.Reset();
271
272 const auto size = new_cache->size();
273 this->Set(std::move(new_cache));
274 stats_scope.Finish(size);
275}
276
277template <class MongoCacheTraits>
278typename MongoCacheTraits::ObjectType
279MongoCache<MongoCacheTraits>::DeserializeObject(
280 const formats::bson::Document& doc) const {
281 if constexpr (mongo_cache::impl::kHasDeserializeObject<MongoCacheTraits>) {
282 return MongoCacheTraits::DeserializeObject(doc);
283 }
284 if constexpr (mongo_cache::impl::kHasDefaultDeserializeObject<
285 MongoCacheTraits>) {
286 return doc.As<typename MongoCacheTraits::ObjectType>();
287 }
288 UASSERT_MSG(false,
289 "No deserialize operation defined but DeserializeObject invoked");
290}
291
292template <class MongoCacheTraits>
293storages::mongo::operations::Find
294MongoCache<MongoCacheTraits>::GetFindOperation(
295 cache::UpdateType type,
296 const std::chrono::system_clock::time_point& last_update,
297 const std::chrono::system_clock::time_point& now,
298 const std::chrono::system_clock::duration& correction) {
299 namespace bson = formats::bson;
300 namespace sm = storages::mongo;
301
302 auto find_op = [&]() -> sm::operations::Find {
303 if constexpr (mongo_cache::impl::kHasFindOperation<MongoCacheTraits>) {
304 return MongoCacheTraits::GetFindOperation(type, last_update, now,
305 correction);
306 }
307 if constexpr (mongo_cache::impl::kHasDefaultFindOperation<
308 MongoCacheTraits>) {
309 bson::ValueBuilder query_builder(bson::ValueBuilder::Type::kObject);
310 if constexpr (mongo_cache::impl::kHasUpdateFieldName<MongoCacheTraits>) {
311 if (type == cache::UpdateType::kIncremental) {
312 query_builder[MongoCacheTraits::kMongoUpdateFieldName] =
313 bson::MakeDoc("$gt", last_update - correction);
314 }
315 }
316 return sm::operations::Find(query_builder.ExtractValue());
317 }
318 UASSERT_MSG(false,
319 "No find operation defined but GetFindOperation invoked");
320 }();
321
322 if (MongoCacheTraits::kIsSecondaryPreferred) {
324 }
325 return find_op;
326}
327
328template <class MongoCacheTraits>
329std::unique_ptr<typename MongoCacheTraits::DataType>
330MongoCache<MongoCacheTraits>::GetData(cache::UpdateType type) {
331 if (type == cache::UpdateType::kIncremental) {
332 auto ptr = this->Get();
333 return std::make_unique<typename MongoCacheTraits::DataType>(*ptr);
334 } else {
335 return std::make_unique<typename MongoCacheTraits::DataType>();
336 }
337}
338
339namespace impl {
340
341std::string GetMongoCacheSchema();
342
343} // namespace impl
344
345template <class MongoCacheTraits>
346yaml_config::Schema MongoCache<MongoCacheTraits>::GetStaticConfigSchema() {
347 return yaml_config::MergeSchemas<
348 CachingComponentBase<typename MongoCacheTraits::DataType>>(
349 impl::GetMongoCacheSchema());
350}
351
352} // namespace components
353
354USERVER_NAMESPACE_END