userver: userver/cache/base_mongo_cache.hpp Source File
⚠️ This is the documentation for an old userver version. Click here to switch to the latest version.
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages Concepts
base_mongo_cache.hpp
Go to the documentation of this file.
1#pragma once
2
3/// @file userver/cache/base_mongo_cache.hpp
4/// @brief @copybrief components::MongoCache
5
6#include <chrono>
7
8#include <fmt/format.h>
9
10#include <userver/cache/cache_statistics.hpp>
11#include <userver/cache/caching_component_base.hpp>
12#include <userver/cache/mongo_cache_type_traits.hpp>
13#include <userver/components/component_context.hpp>
14#include <userver/formats/bson/document.hpp>
15#include <userver/formats/bson/inline.hpp>
16#include <userver/formats/bson/value_builder.hpp>
17#include <userver/storages/mongo/collection.hpp>
18#include <userver/storages/mongo/operations.hpp>
19#include <userver/storages/mongo/options.hpp>
20#include <userver/tracing/span.hpp>
21#include <userver/utils/cpu_relax.hpp>
22#include <userver/yaml_config/merge_schemas.hpp>
23
24USERVER_NAMESPACE_BEGIN
25
26namespace components {
27
28inline const std::string kFetchAndParseStage = "fetch_and_parse";
29
30inline constexpr std::chrono::milliseconds kCpuRelaxThreshold{10};
31inline constexpr std::chrono::milliseconds kCpuRelaxInterval{2};
32
33namespace impl {
34
35std::chrono::milliseconds GetMongoCacheUpdateCorrection(const ComponentConfig&);
36
37}
38
39// clang-format off
40
41/// @ingroup userver_components
42///
43/// @brief %Base class for all caches polling mongo collection
44///
45/// You have to provide a traits class in order to use this.
46///
47/// ## Static options:
48/// All options of CachingComponentBase and
49/// Name | Description | Default value
50/// ---- | ----------- | -------------
51/// update-correction | adjusts incremental updates window to overlap with previous update | 0
52///
53/// ## Traits example:
54/// All fields below (except for function overrides) are mandatory.
55///
56/// ```
57/// struct MongoCacheTraitsExample {
58/// // Component name for component
59/// static constexpr auto kName = "mongo-dynamic-config";
60///
61/// // Collection to read from
62/// static constexpr auto kMongoCollectionsField =
63/// &storages::mongo::Collections::config;
64/// // Update field name to use for incremental update (optional).
65/// // When missing, incremental update is disabled.
66/// // Please use reference here to avoid global variables
67/// // initialization order issues.
68/// static constexpr const std::string& kMongoUpdateFieldName =
69/// mongo::db::taxi::config::kUpdated;
70///
71/// // Cache element type
72/// using ObjectType = CachedObject;
73/// // Cache element field name that is used as an index in the cache map
74/// static constexpr auto kKeyField = &CachedObject::name;
75/// // Type of kKeyField
76/// using KeyType = std::string;
77/// // Type of cache map, e.g. unordered_map, map, bimap
78/// using DataType = std::unordered_map<KeyType, ObjectType>;
79///
80/// // Whether the cache prefers to read from replica (if true, you might get stale data)
81/// static constexpr bool kIsSecondaryPreferred = true;
82///
83/// // Optional function that overrides BSON to ObjectType conversion
84/// static constexpr auto DeserializeObject = &CachedObject::FromBson;
85/// // or
86/// static ObjectType DeserializeObject(const formats::bson::Document& doc) {
87/// return doc["value"].As<ObjectType>();
88/// }
89/// // (default implementation calls doc.As<ObjectType>())
90/// // For using default implementation
91/// static constexpr bool kUseDefaultDeserializeObject = true;
92///
93/// // Optional function that overrides data retrieval operation
94/// static storages::mongo::operations::Find GetFindOperation(
95/// cache::UpdateType type,
96/// const std::chrono::system_clock::time_point& last_update,
97/// const std::chrono::system_clock::time_point& now,
98/// const std::chrono::system_clock::duration& correction) {
99/// mongo::operations::Find find_op({});
100/// find_op.SetOption(mongo::options::Projection{"key", "value"});
101/// return find_op;
102/// }
103/// // (default implementation queries kMongoUpdateFieldName: {$gt: last_update}
104/// // for incremental updates, and {} for full updates)
105/// // For using default implementation
106/// static constexpr bool kUseDefaultFindOperation = true;
107///
108/// // Whether update part of the cache even if failed to parse some documents
109/// static constexpr bool kAreInvalidDocumentsSkipped = false;
110///
111/// // Component to get the collections
112/// using MongoCollectionsComponent = components::MongoCollections;
113/// };
114/// ```
115
116// clang-format on
117
118template <class MongoCacheTraits>
120 : public CachingComponentBase<typename MongoCacheTraits::DataType> {
123
124 public:
125 static constexpr std::string_view kName = MongoCacheTraits::kName;
126
127 MongoCache(const ComponentConfig&, const ComponentContext&);
128
129 ~MongoCache();
130
131 static yaml_config::Schema GetStaticConfigSchema();
132
133 private:
134 void Update(cache::UpdateType type,
135 const std::chrono::system_clock::time_point& last_update,
136 const std::chrono::system_clock::time_point& now,
137 cache::UpdateStatisticsScope& stats_scope) override;
138
139 typename MongoCacheTraits::ObjectType DeserializeObject(
140 const formats::bson::Document& doc) const;
141
142 storages::mongo::operations::Find GetFindOperation(
143 cache::UpdateType type,
144 const std::chrono::system_clock::time_point& last_update,
145 const std::chrono::system_clock::time_point& now,
146 const std::chrono::system_clock::duration& correction);
147
148 std::unique_ptr<typename MongoCacheTraits::DataType> GetData(
149 cache::UpdateType type);
150
151 const std::shared_ptr<CollectionsType> mongo_collections_;
152 const storages::mongo::Collection* const mongo_collection_;
153 const std::chrono::system_clock::duration correction_;
154 std::size_t cpu_relax_iterations_{0};
155};
156
157template <class MongoCacheTraits>
158inline constexpr bool kHasValidate<MongoCache<MongoCacheTraits>> = true;
159
160template <class MongoCacheTraits>
161MongoCache<MongoCacheTraits>::MongoCache(const ComponentConfig& config,
162 const ComponentContext& context)
163 : CachingComponentBase<typename MongoCacheTraits::DataType>(config,
164 context),
165 mongo_collections_(
166 context
167 .FindComponent<
168 typename MongoCacheTraits::MongoCollectionsComponent>()
169 .template GetCollectionForLibrary<CollectionsType>()),
170 mongo_collection_(std::addressof(
171 mongo_collections_.get()->*MongoCacheTraits::kMongoCollectionsField)),
172 correction_(impl::GetMongoCacheUpdateCorrection(config)) {
173 [[maybe_unused]] mongo_cache::impl::CheckTraits<MongoCacheTraits>
174 check_traits;
175
177 typename MongoCacheTraits::DataType>::GetAllowedUpdateTypes() ==
178 cache::AllowedUpdateTypes::kFullAndIncremental &&
179 !mongo_cache::impl::kHasUpdateFieldName<MongoCacheTraits> &&
180 !mongo_cache::impl::kHasFindOperation<MongoCacheTraits>) {
181 throw std::logic_error(
182 "Incremental update support is requested in config but no update field "
183 "name is specified in traits of '" +
184 components::GetCurrentComponentName(config) + "' cache");
185 }
186 if (correction_.count() < 0) {
187 throw std::logic_error(
188 "Refusing to set forward (negative) update correction requested in "
189 "config for '" +
190 components::GetCurrentComponentName(config) + "' cache");
191 }
192
193 this->StartPeriodicUpdates();
194}
195
196template <class MongoCacheTraits>
197MongoCache<MongoCacheTraits>::~MongoCache() {
198 this->StopPeriodicUpdates();
199}
200
201template <class MongoCacheTraits>
202void MongoCache<MongoCacheTraits>::Update(
203 cache::UpdateType type,
204 const std::chrono::system_clock::time_point& last_update,
205 const std::chrono::system_clock::time_point& now,
206 cache::UpdateStatisticsScope& stats_scope) {
207 namespace sm = storages::mongo;
208
209 const auto* collection = mongo_collection_;
210 auto find_op = GetFindOperation(type, last_update, now, correction_);
211 auto cursor = collection->Execute(find_op);
212 if (type == cache::UpdateType::kIncremental && !cursor) {
213 // Don't touch the cache at all
214 LOG_INFO() << "No changes in cache " << MongoCacheTraits::kName;
215 stats_scope.FinishNoChanges();
216 return;
217 }
218
219 auto scope = tracing::Span::CurrentSpan().CreateScopeTime("copy_data");
220 auto new_cache = GetData(type);
221
222 // No good way to identify whether cursor accesses DB or reads buffed data
223 scope.Reset(kFetchAndParseStage);
224
225 utils::CpuRelax relax{cpu_relax_iterations_, &scope};
226 std::size_t doc_count = 0;
227
228 for (const auto& doc : cursor) {
229 ++doc_count;
230
231 relax.Relax();
232
233 stats_scope.IncreaseDocumentsReadCount(1);
234
235 try {
236 auto object = DeserializeObject(doc);
237 auto key = (object.*MongoCacheTraits::kKeyField);
238
239 if (type == cache::UpdateType::kIncremental ||
240 new_cache->count(key) == 0) {
241 (*new_cache)[key] = std::move(object);
242 } else {
243 LOG_LIMITED_ERROR() << "Found duplicate key for 2 items in cache "
244 << MongoCacheTraits::kName << ", key=" << key;
245 }
246 } catch (const std::exception& e) {
247 LOG_LIMITED_ERROR() << "Failed to deserialize cache item of cache "
248 << MongoCacheTraits::kName << ", _id="
249 << doc["_id"].template ConvertTo<std::string>()
250 << ", what(): " << e;
251 stats_scope.IncreaseDocumentsParseFailures(1);
252
253 if (!MongoCacheTraits::kAreInvalidDocumentsSkipped) throw;
254 }
255 }
256
257 const auto elapsed_time = scope.ElapsedTotal(kFetchAndParseStage);
258 if (elapsed_time > kCpuRelaxThreshold) {
259 cpu_relax_iterations_ = static_cast<std::size_t>(
260 static_cast<double>(doc_count) / (elapsed_time / kCpuRelaxInterval));
261 LOG_TRACE() << fmt::format(
262 "Elapsed time for updating {} {} for {} data items is over threshold. "
263 "Will relax CPU every {} iterations",
264 kName, elapsed_time.count(), doc_count, cpu_relax_iterations_);
265 }
266
267 scope.Reset();
268
269 const auto size = new_cache->size();
270 this->Set(std::move(new_cache));
271 stats_scope.Finish(size);
272}
273
274template <class MongoCacheTraits>
275typename MongoCacheTraits::ObjectType
276MongoCache<MongoCacheTraits>::DeserializeObject(
277 const formats::bson::Document& doc) const {
278 if constexpr (mongo_cache::impl::kHasDeserializeObject<MongoCacheTraits>) {
279 return MongoCacheTraits::DeserializeObject(doc);
280 }
281 if constexpr (mongo_cache::impl::kHasDefaultDeserializeObject<
282 MongoCacheTraits>) {
283 return doc.As<typename MongoCacheTraits::ObjectType>();
284 }
285 UASSERT_MSG(false,
286 "No deserialize operation defined but DeserializeObject invoked");
287}
288
289template <class MongoCacheTraits>
290storages::mongo::operations::Find
291MongoCache<MongoCacheTraits>::GetFindOperation(
292 cache::UpdateType type,
293 const std::chrono::system_clock::time_point& last_update,
294 const std::chrono::system_clock::time_point& now,
295 const std::chrono::system_clock::duration& correction) {
296 namespace bson = formats::bson;
297 namespace sm = storages::mongo;
298
299 auto find_op = [&]() -> sm::operations::Find {
300 if constexpr (mongo_cache::impl::kHasFindOperation<MongoCacheTraits>) {
301 return MongoCacheTraits::GetFindOperation(type, last_update, now,
302 correction);
303 }
304 if constexpr (mongo_cache::impl::kHasDefaultFindOperation<
305 MongoCacheTraits>) {
306 bson::ValueBuilder query_builder(bson::ValueBuilder::Type::kObject);
307 if constexpr (mongo_cache::impl::kHasUpdateFieldName<MongoCacheTraits>) {
308 if (type == cache::UpdateType::kIncremental) {
309 query_builder[MongoCacheTraits::kMongoUpdateFieldName] =
310 bson::MakeDoc("$gt", last_update - correction);
311 }
312 }
313 return sm::operations::Find(query_builder.ExtractValue());
314 }
315 UASSERT_MSG(false,
316 "No find operation defined but GetFindOperation invoked");
317 }();
318
319 if (MongoCacheTraits::kIsSecondaryPreferred) {
321 }
322 return find_op;
323}
324
325template <class MongoCacheTraits>
326std::unique_ptr<typename MongoCacheTraits::DataType>
327MongoCache<MongoCacheTraits>::GetData(cache::UpdateType type) {
328 if (type == cache::UpdateType::kIncremental) {
329 auto ptr = this->Get();
330 return std::make_unique<typename MongoCacheTraits::DataType>(*ptr);
331 } else {
332 return std::make_unique<typename MongoCacheTraits::DataType>();
333 }
334}
335
336namespace impl {
337
338std::string GetMongoCacheSchema();
339
340} // namespace impl
341
342template <class MongoCacheTraits>
343yaml_config::Schema MongoCache<MongoCacheTraits>::GetStaticConfigSchema() {
344 return yaml_config::MergeSchemas<
345 CachingComponentBase<typename MongoCacheTraits::DataType>>(
346 impl::GetMongoCacheSchema());
347}
348
349} // namespace components
350
351USERVER_NAMESPACE_END