userver: userver/cache/base_mongo_cache.hpp Source File
Loading...
Searching...
No Matches
base_mongo_cache.hpp
Go to the documentation of this file.
1#pragma once
2
3/// @file userver/cache/base_mongo_cache.hpp
4/// @brief @copybrief components::MongoCache
5
6#include <chrono>
7
8#include <fmt/format.h>
9
10#include <userver/cache/cache_statistics.hpp>
11#include <userver/cache/caching_component_base.hpp>
12#include <userver/cache/mongo_cache_type_traits.hpp>
13#include <userver/components/component_context.hpp>
14#include <userver/formats/bson/document.hpp>
15#include <userver/formats/bson/inline.hpp>
16#include <userver/formats/bson/value_builder.hpp>
17#include <userver/storages/mongo/collection.hpp>
18#include <userver/storages/mongo/operations.hpp>
19#include <userver/storages/mongo/options.hpp>
20#include <userver/tracing/span.hpp>
21#include <userver/utils/cpu_relax.hpp>
22#include <userver/yaml_config/merge_schemas.hpp>
23
24USERVER_NAMESPACE_BEGIN
25
26namespace components {
27
28inline const std::string kFetchAndParseStage = "fetch_and_parse";
29
30inline constexpr std::chrono::milliseconds kCpuRelaxThreshold{10};
31inline constexpr std::chrono::milliseconds kCpuRelaxInterval{2};
32
33namespace impl {
34
35std::chrono::milliseconds GetMongoCacheUpdateCorrection(const ComponentConfig&);
36
37}
38
39// clang-format off
40
41/// @ingroup userver_components
42///
43/// @brief %Base class for all caches polling mongo collection
44///
45/// You have to provide a traits class in order to use this.
46///
47/// ## Static options:
48/// All options of CachingComponentBase and
49/// Name | Description | Default value
50/// ---- | ----------- | -------------
51/// update-correction | adjusts incremental updates window to overlap with previous update | 0
52///
53/// ## Traits example:
54/// All fields below (except for function overrides) are mandatory.
55///
56/// ```
57/// struct MongoCacheTraitsExample {
58/// // Component name for component
59/// static constexpr auto kName = "mongo-dynamic-config";
60///
61/// // Collection to read from
62/// static constexpr auto kMongoCollectionsField =
63/// &storages::mongo::Collections::config;
64/// // Update field name to use for incremental update (optional).
65/// // When missing, incremental update is disabled.
66/// // Please use reference here to avoid global variables
67/// // initialization order issues.
68/// static constexpr const std::string& kMongoUpdateFieldName =
69/// mongo::db::taxi::config::kUpdated;
70///
71/// // Cache element type
72/// using ObjectType = CachedObject;
73/// // Cache element field name that is used as an index in the cache map
74/// static constexpr auto kKeyField = &CachedObject::name;
75/// // Type of kKeyField
76/// using KeyType = std::string;
77/// // Type of cache map, e.g. unordered_map, map, bimap
78/// using DataType = std::unordered_map<KeyType, ObjectType>;
79///
80/// // Whether the cache prefers to read from replica (if true, you might get stale data)
81/// static constexpr bool kIsSecondaryPreferred = true;
82///
83/// // Optional function that overrides BSON to ObjectType conversion
84/// static constexpr auto DeserializeObject = &CachedObject::FromBson;
85/// // or
86/// static ObjectType DeserializeObject(const formats::bson::Document& doc) {
87/// return doc["value"].As<ObjectType>();
88/// }
89/// // (default implementation calls doc.As<ObjectType>())
90/// // For using default implementation
91/// static constexpr bool kUseDefaultDeserializeObject = true;
92///
93/// // Optional function that overrides data retrieval operation
94/// static storages::mongo::operations::Find GetFindOperation(
95/// cache::UpdateType type,
96/// const std::chrono::system_clock::time_point& last_update,
97/// const std::chrono::system_clock::time_point& now,
98/// const std::chrono::system_clock::duration& correction) {
99/// mongo::operations::Find find_op({});
100/// find_op.SetOption(mongo::options::Projection{"key", "value"});
101/// return find_op;
102/// }
103/// // (default implementation queries kMongoUpdateFieldName: {$gt: last_update}
104/// // for incremental updates, and {} for full updates)
105/// // For using default implementation
106/// static constexpr bool kUseDefaultFindOperation = true;
107///
108/// // Whether update part of the cache even if failed to parse some documents
109/// static constexpr bool kAreInvalidDocumentsSkipped = false;
110///
111/// // Component to get the collections
112/// using MongoCollectionsComponent = components::MongoCollections;
113/// };
114/// ```
115
116// clang-format on
117
118template <class MongoCacheTraits>
120 : public CachingComponentBase<typename MongoCacheTraits::DataType> {
123
124 public:
125 static constexpr std::string_view kName = MongoCacheTraits::kName;
126
127 MongoCache(const ComponentConfig&, const ComponentContext&);
128
129 ~MongoCache();
130
131 static yaml_config::Schema GetStaticConfigSchema();
132
133 private:
134 void Update(cache::UpdateType type,
135 const std::chrono::system_clock::time_point& last_update,
136 const std::chrono::system_clock::time_point& now,
137 cache::UpdateStatisticsScope& stats_scope) override;
138
139 typename MongoCacheTraits::ObjectType DeserializeObject(
140 const formats::bson::Document& doc) const;
141
142 storages::mongo::operations::Find GetFindOperation(
143 cache::UpdateType type,
144 const std::chrono::system_clock::time_point& last_update,
145 const std::chrono::system_clock::time_point& now,
146 const std::chrono::system_clock::duration& correction);
147
148 std::unique_ptr<typename MongoCacheTraits::DataType> GetData(
149 cache::UpdateType type);
150
151 const std::shared_ptr<CollectionsType> mongo_collections_;
152 const storages::mongo::Collection* const mongo_collection_;
153 const std::chrono::system_clock::duration correction_;
154 std::size_t cpu_relax_iterations_{0};
155};
156
157template <class MongoCacheTraits>
158inline constexpr bool kHasValidate<MongoCache<MongoCacheTraits>> = true;
159
160template <class MongoCacheTraits>
161MongoCache<MongoCacheTraits>::MongoCache(const ComponentConfig& config,
162 const ComponentContext& context)
163 : CachingComponentBase<typename MongoCacheTraits::DataType>(config,
164 context),
165 mongo_collections_(
166 context
167 .FindComponent<
168 typename MongoCacheTraits::MongoCollectionsComponent>()
169 .template GetCollectionForLibrary<CollectionsType>()),
170 mongo_collection_(std::addressof(
171 mongo_collections_.get()->*MongoCacheTraits::kMongoCollectionsField)),
172 correction_(impl::GetMongoCacheUpdateCorrection(config)) {
173 [[maybe_unused]] mongo_cache::impl::CheckTraits<MongoCacheTraits>
174 check_traits;
175
177 typename MongoCacheTraits::DataType>::GetAllowedUpdateTypes() ==
178 cache::AllowedUpdateTypes::kFullAndIncremental &&
179 !mongo_cache::impl::kHasUpdateFieldName<MongoCacheTraits> &&
180 !mongo_cache::impl::kHasFindOperation<MongoCacheTraits>) {
181 throw std::logic_error(
182 "Incremental update support is requested in config but no update field "
183 "name is specified in traits of '" +
184 components::GetCurrentComponentName(config) + "' cache");
185 }
186 if (correction_.count() < 0) {
187 throw std::logic_error(
188 "Refusing to set forward (negative) update correction requested in "
189 "config for '" +
190 components::GetCurrentComponentName(config) + "' cache");
191 }
192
193 this->StartPeriodicUpdates();
194}
195
196template <class MongoCacheTraits>
197MongoCache<MongoCacheTraits>::~MongoCache() {
198 this->StopPeriodicUpdates();
199}
200
201template <class MongoCacheTraits>
202void MongoCache<MongoCacheTraits>::Update(
203 cache::UpdateType type,
204 const std::chrono::system_clock::time_point& last_update,
205 const std::chrono::system_clock::time_point& now,
206 cache::UpdateStatisticsScope& stats_scope) {
207 namespace sm = storages::mongo;
208
209 const auto* collection = mongo_collection_;
210 auto find_op = GetFindOperation(type, last_update, now, correction_);
211 auto cursor = collection->Execute(find_op);
212 if (type == cache::UpdateType::kIncremental && !cursor) {
213 // Don't touch the cache at all
214 LOG_INFO() << "No changes in cache " << MongoCacheTraits::kName;
215 stats_scope.FinishNoChanges();
216 return;
217 }
218
219 auto scope = tracing::Span::CurrentSpan().CreateScopeTime("copy_data");
220 auto new_cache = GetData(type);
221
222 // No good way to identify whether cursor accesses DB or reads buffed data
223 scope.Reset(kFetchAndParseStage);
224
225 utils::CpuRelax relax{cpu_relax_iterations_, &scope};
226 std::size_t doc_count = 0;
227
228 for (const auto& doc : cursor) {
229 ++doc_count;
230
231 relax.Relax();
232
233 stats_scope.IncreaseDocumentsReadCount(1);
234
235 try {
236 auto object = DeserializeObject(doc);
237 auto key = (object.*MongoCacheTraits::kKeyField);
238
239 if (type == cache::UpdateType::kIncremental ||
240 new_cache->count(key) == 0) {
241 (*new_cache)[key] = std::move(object);
242 } else {
243 LOG_LIMITED_ERROR() << "Found duplicate key for 2 items in cache "
244 << MongoCacheTraits::kName << ", key=" << key;
245 }
246 } catch (const std::exception& e) {
247 LOG_LIMITED_ERROR() << "Failed to deserialize cache item of cache "
248 << MongoCacheTraits::kName << ", _id="
249 << doc["_id"].template ConvertTo<std::string>()
250 << ", what(): " << e;
251 stats_scope.IncreaseDocumentsParseFailures(1);
252
253 if (!MongoCacheTraits::kAreInvalidDocumentsSkipped) throw;
254 }
255 }
256
257 const auto elapsed_time = scope.ElapsedTotal(kFetchAndParseStage);
258 if (elapsed_time > kCpuRelaxThreshold) {
259 cpu_relax_iterations_ = static_cast<std::size_t>(
260 static_cast<double>(doc_count) / (elapsed_time / kCpuRelaxInterval));
261 LOG_TRACE() << fmt::format(
262 "Elapsed time for updating {} {} for {} data items is over threshold. "
263 "Will relax CPU every {} iterations",
264 kName, elapsed_time.count(), doc_count, cpu_relax_iterations_);
265 }
266
267 scope.Reset();
268
269 const auto size = new_cache->size();
270 this->Set(std::move(new_cache));
271 stats_scope.Finish(size);
272}
273
274template <class MongoCacheTraits>
275typename MongoCacheTraits::ObjectType
276MongoCache<MongoCacheTraits>::DeserializeObject(
277 const formats::bson::Document& doc) const {
278 if constexpr (mongo_cache::impl::kHasDeserializeObject<MongoCacheTraits>) {
279 return MongoCacheTraits::DeserializeObject(doc);
280 }
281 if constexpr (mongo_cache::impl::kHasDefaultDeserializeObject<
282 MongoCacheTraits>) {
283 return doc.As<typename MongoCacheTraits::ObjectType>();
284 }
285 UASSERT_MSG(false,
286 "No deserialize operation defined but DeserializeObject invoked");
287}
288
289template <class MongoCacheTraits>
290storages::mongo::operations::Find
291MongoCache<MongoCacheTraits>::GetFindOperation(
292 cache::UpdateType type,
293 const std::chrono::system_clock::time_point& last_update,
294 const std::chrono::system_clock::time_point& now,
295 const std::chrono::system_clock::duration& correction) {
296 namespace bson = formats::bson;
297 namespace sm = storages::mongo;
298
299 auto find_op = [&]() -> sm::operations::Find {
300 if constexpr (mongo_cache::impl::kHasFindOperation<MongoCacheTraits>) {
301 return MongoCacheTraits::GetFindOperation(type, last_update, now,
302 correction);
303 }
304 if constexpr (mongo_cache::impl::kHasDefaultFindOperation<
305 MongoCacheTraits>) {
306 bson::ValueBuilder query_builder(bson::ValueBuilder::Type::kObject);
307 if constexpr (mongo_cache::impl::kHasUpdateFieldName<MongoCacheTraits>) {
308 if (type == cache::UpdateType::kIncremental) {
309 query_builder[MongoCacheTraits::kMongoUpdateFieldName] =
310 bson::MakeDoc("$gt", last_update - correction);
311 }
312 }
313 return sm::operations::Find(query_builder.ExtractValue());
314 }
315 UASSERT_MSG(false,
316 "No find operation defined but GetFindOperation invoked");
317 }();
318
319 if (MongoCacheTraits::kIsSecondaryPreferred) {
321 }
322 return find_op;
323}
324
325template <class MongoCacheTraits>
326std::unique_ptr<typename MongoCacheTraits::DataType>
327MongoCache<MongoCacheTraits>::GetData(cache::UpdateType type) {
328 if (type == cache::UpdateType::kIncremental) {
329 auto ptr = this->Get();
330 return std::make_unique<typename MongoCacheTraits::DataType>(*ptr);
331 } else {
332 return std::make_unique<typename MongoCacheTraits::DataType>();
333 }
334}
335
336namespace impl {
337
338std::string GetMongoCacheSchema();
339
340} // namespace impl
341
342template <class MongoCacheTraits>
343yaml_config::Schema MongoCache<MongoCacheTraits>::GetStaticConfigSchema() {
344 return yaml_config::MergeSchemas<
345 CachingComponentBase<typename MongoCacheTraits::DataType>>(
346 impl::GetMongoCacheSchema());
347}
348
349} // namespace components
350
351USERVER_NAMESPACE_END