userver: userver/utils/hedged_request.hpp Source File
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages Concepts
hedged_request.hpp
Go to the documentation of this file.
1#pragma once
2
3/// @file userver/utils/hedged_request.hpp
4/// @brief
5/// Classes and functions for performing hedged requests.
6///
7/// To perform hedged request you need to define RequestStrategy - a class
8/// similar to following ExampleStrategy:
9///
10/// class ExampleStrategy {
11/// public:
12/// /// Create request future.
13///
14/// /// Called at least once per hedged request. First time at the beginning
15/// /// of a hedged request and then every HedgingSettings::hedging_delay
16/// /// milliseconds if none of the previous requests are ready or
17/// /// ProcessReply returned non-nullopt result. If ProcessReply returned
18/// /// some interval of time, then additional request will be scheduled at
19/// /// this interval of time.
20/// /// @param attempt - increasing number for each try
21/// std::optional<RequestType> Create(std::size_t attempt);
22///
23/// /// ProcessReply is called when some request has finished. Method should
24/// /// evaluate request status and decide whether new attempt is required.
25/// /// If new attempt is required, then method must return delay for next
26/// /// attempt. If no other attempt is required, then method must return
27/// /// std::nullopt. It is expected that successful result will be stored
28/// /// somewhere internally and will be available with ExtractReply()
29/// /// method.
30/// std::optional<std::chrono::milliseconds> ProcessReply(RequestType&&);
31///
32/// std::optional<ReplyType> ExtractReply();
33///
34/// /// Method is called when system does not need request any more.
35/// /// For example, if one of the requests has been finished successfully,
36/// /// the system will finish all related subrequests.
37/// /// It is recommended to make this call as fast as possible in order
38/// /// not to block overall execution. For example, if you internally have
39/// /// some future, it is recommended to call TryCancel() here and wait
40/// /// for a cancellation in destructor, rather than call TryCancel() and
41/// /// immediately wait.
42/// void Finish(RequestType&&);
43/// };
44///
45/// Then call any of these functions:
46/// - HedgeRequest
47/// - HedgeRequestsBulk
48/// - HedgeRequestAsync
49/// - HedgeRequestsBulkAsync
50///
51
52#include <chrono>
53#include <functional>
54#include <optional>
55#include <queue>
56#include <tuple>
57#include <type_traits>
58
59#include <userver/engine/task/cancel.hpp>
60#include <userver/engine/wait_any.hpp>
61#include <userver/utils/assert.hpp>
62#include <userver/utils/async.hpp>
63#include <userver/utils/datetime.hpp>
64
65USERVER_NAMESPACE_BEGIN
66
67namespace utils::hedging {
68
69/// Class define hedging settings
71 /// Maximum requests to do
72 std::size_t max_attempts{3};
73 /// Delay between attempts
74 std::chrono::milliseconds hedging_delay{7};
75 /// Max time to wait for all requests
76 std::chrono::milliseconds timeout_all{100};
77};
78
79template <typename RequestStrategy>
81 using RequestType =
82 typename std::invoke_result_t<decltype(&RequestStrategy::Create), RequestStrategy, int>::value_type;
83 using ReplyType =
84 typename std::invoke_result_t<decltype(&RequestStrategy::ExtractReply), RequestStrategy>::value_type;
85};
86
87namespace impl {
88
89using Clock = utils::datetime::SteadyClock;
90using TimePoint = Clock::time_point;
91
92enum class Action { StartTry, Stop };
93
94struct PlanEntry {
95public:
96 PlanEntry(TimePoint timepoint, std::size_t request_index, std::size_t attempt_id, Action action)
97 : timepoint(timepoint), request_index(request_index), attempt_id(attempt_id), action(action) {}
98
99 bool operator<(const PlanEntry& other) const noexcept { return tie() < other.tie(); }
100 bool operator>(const PlanEntry& other) const noexcept { return tie() > other.tie(); }
101 bool operator==(const PlanEntry& other) const noexcept { return tie() == other.tie(); }
102 bool operator<=(const PlanEntry& other) const noexcept { return tie() <= other.tie(); }
103 bool operator>=(const PlanEntry& other) const noexcept { return tie() >= other.tie(); }
104 bool operator!=(const PlanEntry& other) const noexcept { return tie() != other.tie(); }
105
106 TimePoint timepoint;
107 std::size_t request_index{0};
108 std::size_t attempt_id{0};
109 Action action;
110
111private:
112 std::tuple<const TimePoint&, const size_t&, const size_t&, const Action&> tie() const noexcept {
113 return std::tie(timepoint, request_index, attempt_id, action);
114 }
115};
116
117/// This wrapper allows us to cancel subrequests without removing elements
118/// from vector of requests
119template <typename RequestStrategy>
120struct SubrequestWrapper {
121 using RequestType = typename RequestTraits<RequestStrategy>::RequestType;
122
123 SubrequestWrapper() = default;
124 SubrequestWrapper(SubrequestWrapper&&) noexcept = default;
125 explicit SubrequestWrapper(std::optional<RequestType>&& request) : request(std::move(request)) {}
126
127 engine::impl::ContextAccessor* TryGetContextAccessor() {
128 if (!request) return nullptr;
129 return request->TryGetContextAccessor();
130 }
131
132 std::optional<RequestType> request;
133};
134
135struct RequestState {
136 std::vector<std::size_t> subrequest_indices;
137 std::size_t attempts_made = 0;
138 bool finished = false;
139};
140
141template <typename RequestStrategy>
142struct Context {
143 using RequestType = typename RequestTraits<RequestStrategy>::RequestType;
144 using ReplyType = typename RequestTraits<RequestStrategy>::ReplyType;
145
146 Context(std::vector<RequestStrategy> inputs, HedgingSettings settings)
147 : inputs_(std::move(inputs)), settings(std::move(settings)) {
148 const std::size_t size = this->inputs_.size();
149 request_states_.resize(size);
150 }
151 Context(Context&&) noexcept = default;
152
153 void Prepare(TimePoint start_time) {
154 const auto request_count = GetRequestsCount();
155 for (std::size_t request_id = 0; request_id < request_count; ++request_id) {
156 plan_.emplace(start_time, request_id, 0, Action::StartTry);
157 }
158 plan_.emplace(start_time + settings.timeout_all, 0, 0, Action::Stop);
159 subrequests_.reserve(settings.max_attempts * request_count);
160 }
161
162 std::optional<TimePoint> NextEventTime() const {
163 if (plan_.empty()) return std::nullopt;
164 return plan_.top().timepoint;
165 }
166 std::optional<PlanEntry> PopPlan() {
167 if (plan_.empty()) return std::nullopt;
168 auto ret = plan_.top();
169 plan_.pop();
170 return ret;
171 }
172 bool IsStop() const { return stop_; }
173
174 void FinishAllSubrequests(std::size_t request_index) {
175 auto& request_state = request_states_[request_index];
176 request_state.finished = true;
177 const auto& subrequest_indices = request_state.subrequest_indices;
178 auto& strategy = inputs_[request_index];
179 for (auto i : subrequest_indices) {
180 auto& request = subrequests_[i].request;
181 if (request) {
182 strategy.Finish(std::move(*request));
183 request.reset();
184 }
185 }
186 }
187
188 const HedgingSettings& GetSettings() const { return settings; }
189
190 size_t GetRequestsCount() const { return inputs_.size(); }
191
192 size_t GetRequestIdxBySubrequestIdx(size_t subrequest_idx) const {
193 return input_by_subrequests_.at(subrequest_idx);
194 }
195
196 RequestStrategy& GetStrategy(size_t index) { return inputs_[index]; }
197
198 auto& GetSubRequests() { return subrequests_; }
199
200 std::vector<std::optional<ReplyType>> ExtractAllReplies() {
201 std::vector<std::optional<ReplyType>> ret;
202 ret.reserve(GetRequestsCount());
203 for (auto&& strategy : inputs_) {
204 ret.emplace_back(strategy.ExtractReply());
205 }
206 return ret;
207 }
208
209 /// @name Reactions on events with WaitAny*
210 /// @{
211 /// Called on elapsed timeout of WaitAny when next event is Stop some
212 /// request
213 void OnActionStop() {
214 for (std::size_t i = 0; i < inputs_.size(); ++i) FinishAllSubrequests(i);
215 stop_ = true;
216 }
217
218 /// Called on elapsed timeout of WaitAny when next event is Start retry of
219 /// request with id equal @param request_index
220 void OnActionStartTry(std::size_t request_index, std::size_t attempt_id, TimePoint now) {
221 auto& request_state = request_states_[request_index];
222 if (request_state.finished) {
223 return;
224 }
225 auto& attempts_made = request_state.attempts_made;
226 // We could have already launched attempt with this number, for example if
227 // attempt number 2 failed with retryable code, we will add it to plan with
228 // number 3. This way, there are now two planned events, both with number=3
229 // - one from retry, one from hedging. We will execute the earliest one and
230 // skip the second one.
231 if (attempt_id < attempts_made) {
232 return;
233 }
234
235 if (attempts_made >= settings.max_attempts) {
236 return;
237 }
238 auto& strategy = inputs_[request_index];
239 auto request_opt = strategy.Create(attempts_made);
240 if (!request_opt) {
241 request_state.finished = true;
242 // User do not want to make another request (maybe some retry budget is
243 // used)
244 return;
245 }
246 const auto idx = subrequests_.size();
247 subrequests_.emplace_back(std::move(request_opt));
248 request_state.subrequest_indices.push_back(idx);
249 input_by_subrequests_[idx] = request_index;
250 attempts_made++;
251 plan_.emplace(now + settings.hedging_delay, request_index, attempts_made, Action::StartTry);
252 }
253
254 /// Called on getting error in request with @param request_idx
255 void OnRetriableReply(std::size_t request_idx, std::chrono::milliseconds retry_delay, TimePoint now) {
256 const auto& request_state = request_states_[request_idx];
257 if (request_state.finished) return;
258 if (request_state.attempts_made >= settings.max_attempts) return;
259
260 plan_.emplace(now + retry_delay, request_idx, request_state.attempts_made, Action::StartTry);
261 }
262
263 void OnNonRetriableReply(std::size_t request_idx) { FinishAllSubrequests(request_idx); }
264 /// @}
265
266private:
267 /// user provided request strategies bulk
268 std::vector<RequestStrategy> inputs_;
269 HedgingSettings settings;
270
271 /// Our plan of what we will do at what time
272 std::priority_queue<PlanEntry, std::vector<PlanEntry>, std::greater<>> plan_{};
273 std::vector<SubrequestWrapper<RequestStrategy>> subrequests_{};
274 /// Store index of input by subrequest index
275 std::unordered_map<std::size_t, std::size_t> input_by_subrequests_{};
276 std::vector<RequestState> request_states_{};
277 bool stop_{false};
278};
279
280} // namespace impl
281
282/// Future of hedged bulk request
283template <typename RequestStrategy>
285 using RequestType = typename RequestTraits<RequestStrategy>::RequestType;
286 using ReplyType = typename RequestTraits<RequestStrategy>::ReplyType;
287
288 HedgedRequestBulkFuture(HedgedRequestBulkFuture&&) noexcept = default;
289 ~HedgedRequestBulkFuture() { task_.SyncCancel(); }
290
291 void Wait() { task_.Wait(); }
292 std::vector<std::optional<ReplyType>> Get() { return task_.Get(); }
293 engine::impl::ContextAccessor* TryGetContextAccessor() { return task_.TryGetContextAccessor(); }
294
295private:
296 template <typename RequestStrategy_>
297 friend auto HedgeRequestsBulkAsync(std::vector<RequestStrategy_> inputs, HedgingSettings settings);
298 using Task = engine::TaskWithResult<std::vector<std::optional<ReplyType>>>;
299 HedgedRequestBulkFuture(Task&& task) : task_(std::move(task)) {}
300 Task task_;
301};
302
303/// Future of hedged request
304template <typename RequestStrategy>
306 using RequestType = typename RequestTraits<RequestStrategy>::RequestType;
307 using ReplyType = typename RequestTraits<RequestStrategy>::ReplyType;
308
309 HedgedRequestFuture(HedgedRequestFuture&&) noexcept = default;
310 ~HedgedRequestFuture() { task_.SyncCancel(); }
311
312 void Wait() { task_.Wait(); }
313 std::optional<ReplyType> Get() { return task_.Get(); }
314 void IgnoreResult() {}
315 engine::impl::ContextAccessor* TryGetContextAccessor() { return task_.TryGetContextAccessor(); }
316
317private:
318 template <typename RequestStrategy_>
319 friend auto HedgeRequestAsync(RequestStrategy_ input, HedgingSettings settings);
320 using Task = engine::TaskWithResult<std::optional<ReplyType>>;
321 HedgedRequestFuture(Task&& task) : task_(std::move(task)) {}
322 Task task_;
323};
324
325/// @brief Synchronously perform bulk hedged requests described by `RequestStrategy` and
326/// return result of type `std::vector<std::optional<ResultType>>`.
327///
328/// Result contains replies for each element in `inputs` or `std::nullopt` in case
329/// of either timeouts or bad replies (`RequestStrategy::ProcessReply(RequestType&&)`
330/// returned `std::nullopt` and `RequestStrategy::ExtractReply()` returned
331/// `std::nullopt`).
332template <typename RequestStrategy>
333auto HedgeRequestsBulk(std::vector<RequestStrategy> inputs, HedgingSettings hedging_settings) {
334 {
335 using Action = impl::Action;
336 using Clock = impl::Clock;
337 auto context = impl::Context(std::move(inputs), std::move(hedging_settings));
338
339 auto& sub_requests = context.GetSubRequests();
340
341 auto wakeup_time = Clock::now();
342 context.Prepare(wakeup_time);
343
344 while (!context.IsStop()) {
345 auto wait_result = engine::WaitAnyUntil(wakeup_time, sub_requests);
346 if (!wait_result.has_value()) {
348 context.OnActionStop();
349 break;
350 }
351 /// timeout - need to process plan
352 auto plan_entry = context.PopPlan();
353 if (!plan_entry.has_value()) {
354 /// Timeout but we don't have planed actions any more
355 break;
356 }
357 const auto [timestamp, request_index, attempt_id, action] = *plan_entry;
358 switch (action) {
359 case Action::StartTry:
360 context.OnActionStartTry(request_index, attempt_id, timestamp);
361 break;
362 case Action::Stop:
363 context.OnActionStop();
364 break;
365 }
366 auto next_wakeup_time = context.NextEventTime();
367 if (!next_wakeup_time.has_value()) {
368 break;
369 }
370 wakeup_time = *next_wakeup_time;
371 continue;
372 }
373 const auto result_idx = *wait_result;
374 UASSERT(result_idx < sub_requests.size());
375 const auto request_idx = context.GetRequestIdxBySubrequestIdx(result_idx);
376 auto& strategy = context.GetStrategy(request_idx);
377
378 auto& request = sub_requests[result_idx].request;
379 UASSERT_MSG(request, "Finished requests must not be empty");
380 auto reply = strategy.ProcessReply(std::move(*request));
381 if (reply.has_value()) {
382 /// Got reply but it's not OK and user wants to retry over
383 /// some delay
384 context.OnRetriableReply(request_idx, *reply, Clock::now());
385 /// No need to check. we just added one entry
386 wakeup_time = *context.NextEventTime();
387 } else {
388 context.OnNonRetriableReply(request_idx);
389 }
390 }
391 return context.ExtractAllReplies();
392 }
393}
394
395/// @brief Asynchronously perform bulk hedged requests described by `RequestStrategy` and
396/// return future which returns Result of type `std::vector<std::optional<ResultType>>`.
397///
398/// Result contains replies for each
399/// element in `inputs` or `std::nullopt` in case of either timeouts or bad
400/// replies (`RequestStrategy::ProcessReply(RequestType&&)` returned `std::nullopt`
401/// and `RequestStrategy::ExtractReply()` returned `std::nullopt`).
402template <typename RequestStrategy>
403auto HedgeRequestsBulkAsync(std::vector<RequestStrategy> inputs, HedgingSettings settings) {
404 return HedgedRequestBulkFuture<RequestStrategy>(utils::Async(
405 "hedged-bulk-request",
406 [inputs{std::move(inputs)}, settings{std::move(settings)}]() mutable {
407 return HedgeRequestsBulk(std::move(inputs), std::move(settings));
408 }
409 ));
410}
411
412/// Synchronously Perform hedged request described by RequestStrategy and return
413/// result or throw runtime_error. Exception can be thrown in case of timeout or
414/// if request was denied by strategy e.g. ProcessReply always returned
415/// std::nullopt or ExtractReply returned std::nullopt
416template <typename RequestStrategy>
417std::optional<typename RequestTraits<RequestStrategy>::ReplyType>
418HedgeRequest(RequestStrategy input, HedgingSettings settings) {
419 std::vector<RequestStrategy> inputs;
420 inputs.emplace_back(std::move(input));
421 auto bulk_ret = HedgeRequestsBulk(std::move(inputs), std::move(settings));
422 if (bulk_ret.size() != 1) {
423 return std::nullopt;
424 }
425 return bulk_ret[0];
426}
427
428/// Create future which perform hedged request described by RequestStrategy and
429/// return result or throw runtime_error. Exception can be thrown in case of
430/// timeout or if request was denied by strategy e.g. ProcessReply always
431/// returned std::nullopt or ExtractReply returned std::nullopt
432template <typename RequestStrategy>
433auto HedgeRequestAsync(RequestStrategy input, HedgingSettings settings) {
434 return HedgedRequestFuture<RequestStrategy>(utils::Async(
435 "hedged-request",
436 [input{std::move(input)}, settings{std::move(settings)}]() mutable {
437 return HedgeRequest(std::move(input), std::move(settings));
438 }
439 ));
440}
441
442} // namespace utils::hedging
443
444USERVER_NAMESPACE_END