userver: userver/utils/hedged_request.hpp Source File
Loading...
Searching...
No Matches
hedged_request.hpp
Go to the documentation of this file.
1#pragma once
2
3/// @file userver/utils/hedged_request.hpp
4/// @brief
5/// Classes and functions for performing hedged requests.
6///
7/// To perform hedged request you need to define RequestStrategy - a class
8/// similar to following ExampleStrategy:
9///
10/// class ExampleStrategy {
11/// public:
12/// /// Create request future.
13///
14/// /// Called at least once per hedged request. First time at the beginning
15/// /// of a hedged request and then every HedgingSettings::hedging_delay
16/// /// milliseconds if none of the previous requests are ready or
17/// /// ProcessReply returned non-nullopt result. If ProcessReply returned
18/// /// some interval of time, then additional request will be scheduled at
19/// /// this interval of time.
20/// /// @param attempt - increasing number for each try
21/// std::optional<RequestType> Create(std::size_t attempt);
22///
23/// /// ProcessReply is called when some request has finished. Method should
24/// /// evaluate request status and decide whether new attempt is required.
25/// /// If new attempt is required, then method must return delay for next
26/// /// attempt. If no other attempt is required, then method must return
27/// /// std::nullopt. It is expected that successful result will be stored
28/// /// somewhere internally and will be available with ExtractReply()
29/// /// method.
30/// std::optional<std::chrono::milliseconds> ProcessReply(RequestType&&);
31///
32/// std::optional<ReplyType> ExtractReply();
33///
34/// /// Method is called when system does not need request any more.
35/// /// For example, if one of the requests has been finished successfully,
36/// /// the system will finish all related subrequests.
37/// /// It is recommended to make this call as fast as possible in order
38/// /// not to block overall execution. For example, if you internally have
39/// /// some future, it is recommended to call TryCancel() here and wait
40/// /// for a cancellation in destructor, rather than call TryCancel() and
41/// /// immediately wait.
42/// void Finish(RequestType&&);
43/// };
44///
45/// Then call any of these functions:
46/// - HedgeRequest
47/// - HedgeRequestsBulk
48/// - HedgeRequestAsync
49/// - HedgeRequestsBulkAsync
50///
51
52#include <chrono>
53#include <functional>
54#include <optional>
55#include <queue>
56#include <tuple>
57#include <type_traits>
58
59#include <userver/engine/task/cancel.hpp>
60#include <userver/engine/wait_any.hpp>
61#include <userver/utils/assert.hpp>
62#include <userver/utils/async.hpp>
63#include <userver/utils/datetime.hpp>
64
65USERVER_NAMESPACE_BEGIN
66
67namespace utils::hedging {
68
69/// Class define hedging settings
71 /// Maximum requests to do
72 std::size_t max_attempts{3};
73 /// Delay between attempts
74 std::chrono::milliseconds hedging_delay{7};
75 /// Max time to wait for all requests
76 std::chrono::milliseconds timeout_all{100};
77};
78
79template <typename RequestStrategy>
81 using RequestType =
82 typename std::invoke_result_t<decltype(&RequestStrategy::Create), RequestStrategy, int>::value_type;
83 using ReplyType =
84 typename std::invoke_result_t<decltype(&RequestStrategy::ExtractReply), RequestStrategy>::value_type;
85};
86
87namespace impl {
88
89using Clock = utils::datetime::SteadyClock;
90using TimePoint = Clock::time_point;
91
92enum class Action { kStartTry, kStop };
93
94struct PlanEntry {
95public:
96 PlanEntry(TimePoint timepoint, std::size_t request_index, std::size_t attempt_id, Action action)
97 : timepoint(timepoint),
98 request_index(request_index),
99 attempt_id(attempt_id),
100 action(action)
101 {}
102
103 bool operator<(const PlanEntry& other) const noexcept { return Tie() < other.Tie(); }
104 bool operator>(const PlanEntry& other) const noexcept { return Tie() > other.Tie(); }
105 bool operator==(const PlanEntry& other) const noexcept { return Tie() == other.Tie(); }
106 bool operator<=(const PlanEntry& other) const noexcept { return Tie() <= other.Tie(); }
107 bool operator>=(const PlanEntry& other) const noexcept { return Tie() >= other.Tie(); }
108 bool operator!=(const PlanEntry& other) const noexcept { return Tie() != other.Tie(); }
109
110 TimePoint timepoint;
111 std::size_t request_index{0};
112 std::size_t attempt_id{0};
113 Action action;
114
115private:
116 std::tuple<const TimePoint&, const size_t&, const size_t&, const Action&> Tie() const noexcept {
117 return std::tie(timepoint, request_index, attempt_id, action);
118 }
119};
120
121/// This wrapper allows us to cancel subrequests without removing elements
122/// from vector of requests
123template <typename RequestStrategy>
124struct SubrequestWrapper {
125 using RequestType = typename RequestTraits<RequestStrategy>::RequestType;
126
127 SubrequestWrapper() = default;
128 SubrequestWrapper(SubrequestWrapper&&) noexcept = default;
129 explicit SubrequestWrapper(std::optional<RequestType>&& request)
130 : request(std::move(request))
131 {}
132
133 engine::impl::ContextAccessor* TryGetContextAccessor() {
134 if (!request) {
135 return nullptr;
136 }
137 return request->TryGetContextAccessor();
138 }
139
140 std::optional<RequestType> request;
141};
142
143struct RequestState {
144 std::vector<std::size_t> subrequest_indices;
145 std::size_t attempts_made = 0;
146 bool finished = false;
147};
148
149template <typename RequestStrategy>
150struct Context {
151 using RequestType = typename RequestTraits<RequestStrategy>::RequestType;
152 using ReplyType = typename RequestTraits<RequestStrategy>::ReplyType;
153
154 Context(std::vector<RequestStrategy> inputs, HedgingSettings settings)
155 : inputs_(std::move(inputs)),
156 settings_(std::move(settings))
157 {
158 const std::size_t size = this->inputs_.size();
159 request_states_.resize(size);
160 }
161 Context(Context&&) noexcept = default;
162
163 void Prepare(TimePoint start_time) {
164 const auto request_count = GetRequestsCount();
165 for (std::size_t request_id = 0; request_id < request_count; ++request_id) {
166 plan_.emplace(start_time, request_id, 0, Action::kStartTry);
167 }
168 plan_.emplace(start_time + settings_.timeout_all, 0, 0, Action::kStop);
169 subrequests_.reserve(settings_.max_attempts * request_count);
170 }
171
172 std::optional<TimePoint> NextEventTime() const {
173 if (plan_.empty()) {
174 return std::nullopt;
175 }
176 return plan_.top().timepoint;
177 }
178 std::optional<PlanEntry> PopPlan() {
179 if (plan_.empty()) {
180 return std::nullopt;
181 }
182 auto ret = plan_.top();
183 plan_.pop();
184 return ret;
185 }
186 bool IsStop() const { return stop_; }
187
188 void FinishAllSubrequests(std::size_t request_index) {
189 auto& request_state = request_states_[request_index];
190 request_state.finished = true;
191 const auto& subrequest_indices = request_state.subrequest_indices;
192 auto& strategy = inputs_[request_index];
193 for (auto i : subrequest_indices) {
194 auto& request = subrequests_[i].request;
195 if (request) {
196 strategy.Finish(std::move(*request));
197 request.reset();
198 }
199 }
200 }
201
202 const HedgingSettings& GetSettings() const { return settings_; }
203
204 size_t GetRequestsCount() const { return inputs_.size(); }
205
206 size_t GetRequestIdxBySubrequestIdx(size_t subrequest_idx) const {
207 return input_by_subrequests_.at(subrequest_idx);
208 }
209
210 RequestStrategy& GetStrategy(size_t index) { return inputs_[index]; }
211
212 auto& GetSubRequests() { return subrequests_; }
213
214 std::vector<std::optional<ReplyType>> ExtractAllReplies() {
215 std::vector<std::optional<ReplyType>> ret;
216 ret.reserve(GetRequestsCount());
217 for (auto&& strategy : inputs_) {
218 ret.emplace_back(strategy.ExtractReply());
219 }
220 return ret;
221 }
222
223 /// @name Reactions on events with WaitAny*
224 /// @{
225 /// Called on elapsed timeout of WaitAny when next event is Stop some
226 /// request
227 void OnActionStop() {
228 for (std::size_t i = 0; i < inputs_.size(); ++i) {
229 FinishAllSubrequests(i);
230 }
231 stop_ = true;
232 }
233
234 /// Called on elapsed timeout of WaitAny when next event is Start retry of
235 /// request with id equal @param request_index
236 void OnActionStartTry(std::size_t request_index, std::size_t attempt_id, TimePoint now) {
237 auto& request_state = request_states_[request_index];
238 if (request_state.finished) {
239 return;
240 }
241 auto& attempts_made = request_state.attempts_made;
242 // We could have already launched attempt with this number, for example if
243 // attempt number 2 failed with retryable code, we will add it to plan with
244 // number 3. This way, there are now two planned events, both with number=3
245 // - one from retry, one from hedging. We will execute the earliest one and
246 // skip the second one.
247 if (attempt_id < attempts_made) {
248 return;
249 }
250
251 if (attempts_made >= settings_.max_attempts) {
252 return;
253 }
254 auto& strategy = inputs_[request_index];
255 auto request_opt = strategy.Create(attempts_made);
256 if (!request_opt) {
257 request_state.finished = true;
258 // User do not want to make another request (maybe some retry budget is
259 // used)
260 return;
261 }
262 const auto idx = subrequests_.size();
263 subrequests_.emplace_back(std::move(request_opt));
264 request_state.subrequest_indices.push_back(idx);
265 input_by_subrequests_[idx] = request_index;
266 attempts_made++;
267 plan_.emplace(now + settings_.hedging_delay, request_index, attempts_made, Action::kStartTry);
268 }
269
270 /// Called on getting error in request with @param request_idx
271 void OnRetriableReply(std::size_t request_idx, std::chrono::milliseconds retry_delay, TimePoint now) {
272 const auto& request_state = request_states_[request_idx];
273 if (request_state.finished) {
274 return;
275 }
276 if (request_state.attempts_made >= settings_.max_attempts) {
277 return;
278 }
279
280 plan_.emplace(now + retry_delay, request_idx, request_state.attempts_made, Action::kStartTry);
281 }
282
283 void OnNonRetriableReply(std::size_t request_idx) { FinishAllSubrequests(request_idx); }
284 /// @}
285
286private:
287 /// user provided request strategies bulk
288 std::vector<RequestStrategy> inputs_;
289 HedgingSettings settings_;
290
291 /// Our plan of what we will do at what time
292 std::priority_queue<PlanEntry, std::vector<PlanEntry>, std::greater<>> plan_{};
293 std::vector<SubrequestWrapper<RequestStrategy>> subrequests_{};
294 /// Store index of input by subrequest index
295 std::unordered_map<std::size_t, std::size_t> input_by_subrequests_{};
296 std::vector<RequestState> request_states_{};
297 bool stop_{false};
298};
299
300} // namespace impl
301
302/// Future of hedged bulk request
303template <typename RequestStrategy>
305 using RequestType = typename RequestTraits<RequestStrategy>::RequestType;
306 using ReplyType = typename RequestTraits<RequestStrategy>::ReplyType;
307
308 HedgedRequestBulkFuture(HedgedRequestBulkFuture&&) noexcept = default;
309 ~HedgedRequestBulkFuture() { task_.SyncCancel(); }
310
311 /// @brief Wait for the request finish or for a caller task cancellation.
312 void Wait() { task_.Wait(); }
313
314 /// @copydoc engine::TaskWithResult::Get()
315 std::vector<std::optional<ReplyType>> Get() { return task_.Get(); }
316
317 engine::impl::ContextAccessor* TryGetContextAccessor() { return task_.TryGetContextAccessor(); }
318
319private:
320 template <typename TRequestStrategy>
321 friend auto HedgeRequestsBulkAsync(std::vector<TRequestStrategy> inputs, HedgingSettings settings);
322 using Task = engine::TaskWithResult<std::vector<std::optional<ReplyType>>>;
323 HedgedRequestBulkFuture(Task&& task)
324 : task_(std::move(task))
325 {}
326 Task task_;
327};
328
329/// Future of hedged request
330template <typename RequestStrategy>
332 using RequestType = typename RequestTraits<RequestStrategy>::RequestType;
333 using ReplyType = typename RequestTraits<RequestStrategy>::ReplyType;
334
335 HedgedRequestFuture(HedgedRequestFuture&&) noexcept = default;
336 ~HedgedRequestFuture() { task_.SyncCancel(); }
337
338 /// @brief Wait for the request finish or for a caller task cancellation.
339 void Wait() { task_.Wait(); }
340
341 /// @copydoc engine::TaskWithResult::Get()
342 std::optional<ReplyType> Get() { return task_.Get(); }
343
344 void IgnoreResult() {}
345
346 engine::impl::ContextAccessor* TryGetContextAccessor() { return task_.TryGetContextAccessor(); }
347
348private:
349 template <typename TRequestStrategy>
350 friend auto HedgeRequestAsync(TRequestStrategy input, HedgingSettings settings);
351 using Task = engine::TaskWithResult<std::optional<ReplyType>>;
352 HedgedRequestFuture(Task&& task)
353 : task_(std::move(task))
354 {}
355 Task task_;
356};
357
358/// @brief Synchronously perform bulk hedged requests described by `RequestStrategy` and
359/// return result of type `std::vector<std::optional<ResultType>>`.
360///
361/// Result contains replies for each element in `inputs` or `std::nullopt` in case
362/// of either timeouts or bad replies (`RequestStrategy::ProcessReply(RequestType&&)`
363/// returned `std::nullopt` and `RequestStrategy::ExtractReply()` returned
364/// `std::nullopt`).
365template <typename RequestStrategy>
366auto HedgeRequestsBulk(std::vector<RequestStrategy> inputs, HedgingSettings hedging_settings) {
367 {
368 using Action = impl::Action;
369 using Clock = impl::Clock;
370 auto context = impl::Context(std::move(inputs), std::move(hedging_settings));
371
372 auto& sub_requests = context.GetSubRequests();
373
374 auto wakeup_time = Clock::now();
375 context.Prepare(wakeup_time);
376
377 while (!context.IsStop()) {
378 auto wait_result = engine::WaitAnyUntil(wakeup_time, sub_requests);
379 if (!wait_result.has_value()) {
381 context.OnActionStop();
382 break;
383 }
384 /// timeout - need to process plan
385 auto plan_entry = context.PopPlan();
386 if (!plan_entry.has_value()) {
387 /// Timeout but we don't have planed actions any more
388 break;
389 }
390 const auto [timestamp, request_index, attempt_id, action] = *plan_entry;
391 switch (action) {
392 case Action::kStartTry:
393 context.OnActionStartTry(request_index, attempt_id, timestamp);
394 break;
395 case Action::kStop:
396 context.OnActionStop();
397 break;
398 }
399 auto next_wakeup_time = context.NextEventTime();
400 if (!next_wakeup_time.has_value()) {
401 break;
402 }
403 wakeup_time = *next_wakeup_time;
404 continue;
405 }
406 const auto result_idx = *wait_result;
407 UASSERT(result_idx < sub_requests.size());
408 const auto request_idx = context.GetRequestIdxBySubrequestIdx(result_idx);
409 auto& strategy = context.GetStrategy(request_idx);
410
411 auto& request = sub_requests[result_idx].request;
412 UASSERT_MSG(request, "Finished requests must not be empty");
413 auto reply = strategy.ProcessReply(std::move(*request));
414 if (reply.has_value()) {
415 /// Got reply but it's not OK and user wants to retry over
416 /// some delay
417 context.OnRetriableReply(request_idx, *reply, Clock::now());
418 /// No need to check. we just added one entry
419 wakeup_time = *context.NextEventTime();
420 } else {
421 context.OnNonRetriableReply(request_idx);
422 }
423 }
424 return context.ExtractAllReplies();
425 }
426}
427
428/// @brief Asynchronously perform bulk hedged requests described by `RequestStrategy` and
429/// return future which returns Result of type `std::vector<std::optional<ResultType>>`.
430///
431/// Result contains replies for each
432/// element in `inputs` or `std::nullopt` in case of either timeouts or bad
433/// replies (`RequestStrategy::ProcessReply(RequestType&&)` returned `std::nullopt`
434/// and `RequestStrategy::ExtractReply()` returned `std::nullopt`).
435template <typename RequestStrategy>
436auto HedgeRequestsBulkAsync(std::vector<RequestStrategy> inputs, HedgingSettings settings) {
437 return HedgedRequestBulkFuture<RequestStrategy>(utils::Async(
438 "hedged-bulk-request",
439 [inputs{std::move(inputs)}, settings{std::move(settings)}]() mutable {
440 return HedgeRequestsBulk(std::move(inputs), std::move(settings));
441 }
442 ));
443}
444
445/// Synchronously Perform hedged request described by RequestStrategy and return
446/// result or throw runtime_error. Exception can be thrown in case of timeout or
447/// if request was denied by strategy e.g. ProcessReply always returned
448/// std::nullopt or ExtractReply returned std::nullopt
449template <typename RequestStrategy>
450std::optional<typename RequestTraits<RequestStrategy>::ReplyType> HedgeRequest(
451 RequestStrategy input,
452 HedgingSettings settings
453) {
454 std::vector<RequestStrategy> inputs;
455 inputs.emplace_back(std::move(input));
456 auto bulk_ret = HedgeRequestsBulk(std::move(inputs), std::move(settings));
457 if (bulk_ret.size() != 1) {
458 return std::nullopt;
459 }
460 return bulk_ret[0];
461}
462
463/// Create future which perform hedged request described by RequestStrategy and
464/// return result or throw runtime_error. Exception can be thrown in case of
465/// timeout or if request was denied by strategy e.g. ProcessReply always
466/// returned std::nullopt or ExtractReply returned std::nullopt
467template <typename RequestStrategy>
468auto HedgeRequestAsync(RequestStrategy input, HedgingSettings settings) {
469 return HedgedRequestFuture<RequestStrategy>(utils::Async(
470 "hedged-request",
471 [input{std::move(input)}, settings{std::move(settings)}]() mutable {
472 return HedgeRequest(std::move(input), std::move(settings));
473 }
474 ));
475}
476
477} // namespace utils::hedging
478
479USERVER_NAMESPACE_END