userver: userver/utils/hedged_request.hpp Source File
Loading...
Searching...
No Matches
hedged_request.hpp
Go to the documentation of this file.
1#pragma once
2
3/// @file userver/utils/hedged_request.hpp
4/// @brief
5/// Classes and functions for performing hedged requests.
6///
7/// To perform hedged request you need to define RequestStrategy - a class
8/// similar to following ExampleStrategy:
9///
10/// class ExampleStrategy {
11/// public:
12/// /// Create request future.
13///
14/// /// Called at least once per hedged request. First time at the beginning
15/// /// of a hedged request and then every HedgingSettings::hedging_delay
16/// /// milliseconds if none of the previous requests are ready or
17/// /// ProcessReply returned non-nullopt result. If ProcessReply returned
18/// /// some interval of time, then additional request will be scheduled at
19/// /// this interval of time.
20/// /// @param attempt - increasing number for each try
21/// std::optional<RequestType> Create(std::size_t attempt);
22///
23/// /// ProcessReply is called when some request has finished. Method should
24/// /// evaluate request status and decide whether new attempt is required.
25/// /// If new attempt is required, then method must return delay for next
26/// /// attempt. If no other attempt is required, then method must return
27/// /// std::nullopt. It is expected that successful result will be stored
28/// /// somewhere internally and will be available with ExtractReply()
29/// /// method.
30/// std::optional<std::chrono::milliseconds> ProcessReply(RequestType&&);
31///
32/// std::optional<ReplyType> ExtractReply();
33///
34/// /// Method is called when system does not need request any more.
35/// /// For example, if one of the requests has been finished successfully,
36/// /// the system will finish all related subrequests.
37/// /// It is recommended to make this call as fast as possible in order
38/// /// not to block overall execution. For example, if you internally have
39/// /// some future, it is recommended to call TryCancel() here and wait
40/// /// for a cancellation in destructor, rather than call TryCancel() and
41/// /// immediately wait.
42/// void Finish(RequestType&&);
43/// };
44///
45/// Then call any of these functions:
46/// - HedgeRequest
47/// - HedgeRequestsBulk
48/// - HedgeRequestAsync
49/// - HedgeRequestsBulkAsync
50///
51
52#include <chrono>
53#include <functional>
54#include <optional>
55#include <queue>
56#include <tuple>
57#include <type_traits>
58
59#include <userver/engine/task/cancel.hpp>
60#include <userver/engine/wait_any.hpp>
61#include <userver/utils/assert.hpp>
62#include <userver/utils/async.hpp>
63#include <userver/utils/datetime.hpp>
64
65USERVER_NAMESPACE_BEGIN
66
67namespace utils::hedging {
68
69/// Class define hedging settings
71 /// Maximum requests to do
72 std::size_t max_attempts{3};
73 /// Delay between attempts
74 std::chrono::milliseconds hedging_delay{7};
75 /// Max time to wait for all requests
76 std::chrono::milliseconds timeout_all{100};
77};
78
79template <typename RequestStrategy>
81 using RequestType =
82 typename std::invoke_result_t<decltype(&RequestStrategy::Create), RequestStrategy, int>::value_type;
83 using ReplyType =
84 typename std::invoke_result_t<decltype(&RequestStrategy::ExtractReply), RequestStrategy>::value_type;
85};
86
87namespace impl {
88
89using Clock = utils::datetime::SteadyClock;
90using TimePoint = Clock::time_point;
91
92enum class Action { kStartTry, kStop };
93
94struct PlanEntry {
95 PlanEntry(TimePoint timepoint, std::size_t request_index, std::size_t attempt_id, Action action)
96 : timepoint(timepoint),
97 request_index(request_index),
98 attempt_id(attempt_id),
99 action(action)
100 {}
101
102 bool operator<(const PlanEntry& other) const noexcept { return Tie() < other.Tie(); }
103 bool operator>(const PlanEntry& other) const noexcept { return Tie() > other.Tie(); }
104 bool operator==(const PlanEntry& other) const noexcept { return Tie() == other.Tie(); }
105 bool operator<=(const PlanEntry& other) const noexcept { return Tie() <= other.Tie(); }
106 bool operator>=(const PlanEntry& other) const noexcept { return Tie() >= other.Tie(); }
107 bool operator!=(const PlanEntry& other) const noexcept { return Tie() != other.Tie(); }
108
109 std::tuple<const TimePoint&, const size_t&, const size_t&, const Action&> Tie() const noexcept {
110 return std::tie(timepoint, request_index, attempt_id, action);
111 }
112
113 TimePoint timepoint;
114 std::size_t request_index{0};
115 std::size_t attempt_id{0};
116 Action action;
117};
118
119/// This wrapper allows us to cancel subrequests without removing elements
120/// from vector of requests
121template <typename RequestStrategy>
122struct SubrequestWrapper {
123 using RequestType = typename RequestTraits<RequestStrategy>::RequestType;
124
125 SubrequestWrapper() = default;
126 SubrequestWrapper(SubrequestWrapper&&) noexcept = default;
127 explicit SubrequestWrapper(std::optional<RequestType>&& request)
128 : request(std::move(request))
129 {}
130
131 engine::impl::ContextAccessor* TryGetContextAccessor() {
132 if (!request) {
133 return nullptr;
134 }
135 return request->TryGetContextAccessor();
136 }
137
138 std::optional<RequestType> request;
139};
140
141struct RequestState {
142 std::vector<std::size_t> subrequest_indices;
143 std::size_t attempts_made = 0;
144 bool finished = false;
145};
146
147template <typename RequestStrategy>
148struct Context {
149 using RequestType = typename RequestTraits<RequestStrategy>::RequestType;
150 using ReplyType = typename RequestTraits<RequestStrategy>::ReplyType;
151
152 Context(std::vector<RequestStrategy> inputs, HedgingSettings settings)
153 : inputs_(std::move(inputs)),
154 settings_(std::move(settings))
155 {
156 const std::size_t size = this->inputs_.size();
157 request_states_.resize(size);
158 }
159 Context(Context&&) noexcept = default;
160
161 void Prepare(TimePoint start_time) {
162 const auto request_count = GetRequestsCount();
163 for (std::size_t request_id = 0; request_id < request_count; ++request_id) {
164 plan_.emplace(start_time, request_id, 0, Action::kStartTry);
165 }
166 plan_.emplace(start_time + settings_.timeout_all, 0, 0, Action::kStop);
167 subrequests_.reserve(settings_.max_attempts * request_count);
168 }
169
170 std::optional<TimePoint> NextEventTime() const {
171 if (plan_.empty()) {
172 return std::nullopt;
173 }
174 return plan_.top().timepoint;
175 }
176 std::optional<PlanEntry> PopPlan() {
177 if (plan_.empty()) {
178 return std::nullopt;
179 }
180 auto ret = plan_.top();
181 plan_.pop();
182 return ret;
183 }
184 bool IsStop() const { return stop_; }
185
186 void FinishAllSubrequests(std::size_t request_index) {
187 auto& request_state = request_states_[request_index];
188 request_state.finished = true;
189 const auto& subrequest_indices = request_state.subrequest_indices;
190 auto& strategy = inputs_[request_index];
191 for (auto i : subrequest_indices) {
192 auto& request = subrequests_[i].request;
193 if (request) {
194 strategy.Finish(std::move(*request));
195 request.reset();
196 }
197 }
198 }
199
200 const HedgingSettings& GetSettings() const { return settings_; }
201
202 size_t GetRequestsCount() const { return inputs_.size(); }
203
204 size_t GetRequestIdxBySubrequestIdx(size_t subrequest_idx) const {
205 return input_by_subrequests_.at(subrequest_idx);
206 }
207
208 RequestStrategy& GetStrategy(size_t index) { return inputs_[index]; }
209
210 auto& GetSubRequests() { return subrequests_; }
211
212 std::vector<std::optional<ReplyType>> ExtractAllReplies() {
213 std::vector<std::optional<ReplyType>> ret;
214 ret.reserve(GetRequestsCount());
215 for (auto&& strategy : inputs_) {
216 ret.emplace_back(strategy.ExtractReply());
217 }
218 return ret;
219 }
220
221 /// @name Reactions on events with WaitAny*
222 /// @{
223 /// Called on elapsed timeout of WaitAny when next event is Stop some
224 /// request
225 void OnActionStop() {
226 for (std::size_t i = 0; i < inputs_.size(); ++i) {
227 FinishAllSubrequests(i);
228 }
229 stop_ = true;
230 }
231
232 /// Called on elapsed timeout of WaitAny when next event is Start retry of
233 /// request with id equal @param request_index
234 void OnActionStartTry(std::size_t request_index, std::size_t attempt_id, TimePoint now) {
235 auto& request_state = request_states_[request_index];
236 if (request_state.finished) {
237 return;
238 }
239 auto& attempts_made = request_state.attempts_made;
240 // We could have already launched attempt with this number, for example if
241 // attempt number 2 failed with retryable code, we will add it to plan with
242 // number 3. This way, there are now two planned events, both with number=3
243 // - one from retry, one from hedging. We will execute the earliest one and
244 // skip the second one.
245 if (attempt_id < attempts_made) {
246 return;
247 }
248
249 if (attempts_made >= settings_.max_attempts) {
250 return;
251 }
252 auto& strategy = inputs_[request_index];
253 auto request_opt = strategy.Create(attempts_made);
254 if (!request_opt) {
255 request_state.finished = true;
256 // User do not want to make another request (maybe some retry budget is
257 // used)
258 return;
259 }
260 const auto idx = subrequests_.size();
261 subrequests_.emplace_back(std::move(request_opt));
262 request_state.subrequest_indices.push_back(idx);
263 input_by_subrequests_[idx] = request_index;
264 attempts_made++;
265 plan_.emplace(now + settings_.hedging_delay, request_index, attempts_made, Action::kStartTry);
266 }
267
268 /// Called on getting error in request with @param request_idx
269 void OnRetriableReply(std::size_t request_idx, std::chrono::milliseconds retry_delay, TimePoint now) {
270 const auto& request_state = request_states_[request_idx];
271 if (request_state.finished) {
272 return;
273 }
274 if (request_state.attempts_made >= settings_.max_attempts) {
275 return;
276 }
277
278 plan_.emplace(now + retry_delay, request_idx, request_state.attempts_made, Action::kStartTry);
279 }
280
281 void OnNonRetriableReply(std::size_t request_idx) { FinishAllSubrequests(request_idx); }
282 /// @}
283
284private:
285 /// user provided request strategies bulk
286 std::vector<RequestStrategy> inputs_;
287 HedgingSettings settings_;
288
289 /// Our plan of what we will do at what time
290 std::priority_queue<PlanEntry, std::vector<PlanEntry>, std::greater<>> plan_{};
291 std::vector<SubrequestWrapper<RequestStrategy>> subrequests_{};
292 /// Store index of input by subrequest index
293 std::unordered_map<std::size_t, std::size_t> input_by_subrequests_{};
294 std::vector<RequestState> request_states_{};
295 bool stop_{false};
296};
297
298} // namespace impl
299
300/// Future of hedged bulk request
301template <typename RequestStrategy>
303 using RequestType = typename RequestTraits<RequestStrategy>::RequestType;
304 using ReplyType = typename RequestTraits<RequestStrategy>::ReplyType;
305
306 HedgedRequestBulkFuture(HedgedRequestBulkFuture&&) noexcept = default;
307 ~HedgedRequestBulkFuture() { task_.SyncCancel(); }
308
309 /// @brief Wait for the request finish or for a caller task cancellation.
310 void Wait() { task_.Wait(); }
311
312 /// @copydoc engine::TaskWithResult::Get()
313 std::vector<std::optional<ReplyType>> Get() { return task_.Get(); }
314
315 engine::impl::ContextAccessor* TryGetContextAccessor() { return task_.TryGetContextAccessor(); }
316
317private:
318 template <typename TRequestStrategy>
319 friend auto HedgeRequestsBulkAsync(std::vector<TRequestStrategy> inputs, HedgingSettings settings);
320 using Task = engine::TaskWithResult<std::vector<std::optional<ReplyType>>>;
321 HedgedRequestBulkFuture(Task&& task)
322 : task_(std::move(task))
323 {}
324 Task task_;
325};
326
327/// Future of hedged request
328template <typename RequestStrategy>
330 using RequestType = typename RequestTraits<RequestStrategy>::RequestType;
331 using ReplyType = typename RequestTraits<RequestStrategy>::ReplyType;
332
333 HedgedRequestFuture(HedgedRequestFuture&&) noexcept = default;
334 ~HedgedRequestFuture() { task_.SyncCancel(); }
335
336 /// @brief Wait for the request finish or for a caller task cancellation.
337 void Wait() { task_.Wait(); }
338
339 /// @copydoc engine::TaskWithResult::Get()
340 std::optional<ReplyType> Get() { return task_.Get(); }
341
342 void IgnoreResult() {}
343
344 engine::impl::ContextAccessor* TryGetContextAccessor() { return task_.TryGetContextAccessor(); }
345
346private:
347 template <typename TRequestStrategy>
348 friend auto HedgeRequestAsync(TRequestStrategy input, HedgingSettings settings);
349 using Task = engine::TaskWithResult<std::optional<ReplyType>>;
350 HedgedRequestFuture(Task&& task)
351 : task_(std::move(task))
352 {}
353 Task task_;
354};
355
356/// @brief Synchronously perform bulk hedged requests described by `RequestStrategy` and
357/// return result of type `std::vector<std::optional<ResultType>>`.
358///
359/// Result contains replies for each element in `inputs` or `std::nullopt` in case
360/// of either timeouts or bad replies (`RequestStrategy::ProcessReply(RequestType&&)`
361/// returned `std::nullopt` and `RequestStrategy::ExtractReply()` returned
362/// `std::nullopt`).
363template <typename RequestStrategy>
364auto HedgeRequestsBulk(std::vector<RequestStrategy> inputs, HedgingSettings hedging_settings) {
365 {
366 using Action = impl::Action;
367 using Clock = impl::Clock;
368 auto context = impl::Context(std::move(inputs), std::move(hedging_settings));
369
370 auto& sub_requests = context.GetSubRequests();
371
372 auto wakeup_time = Clock::now();
373 context.Prepare(wakeup_time);
374
375 while (!context.IsStop()) {
376 auto wait_result = engine::WaitAnyUntil(wakeup_time, sub_requests);
377 if (!wait_result.has_value()) {
379 context.OnActionStop();
380 break;
381 }
382 /// timeout - need to process plan
383 auto plan_entry = context.PopPlan();
384 if (!plan_entry.has_value()) {
385 /// Timeout but we don't have planned actions any more
386 break;
387 }
388 const auto [timestamp, request_index, attempt_id, action] = *plan_entry;
389 switch (action) {
390 case Action::kStartTry:
391 context.OnActionStartTry(request_index, attempt_id, timestamp);
392 break;
393 case Action::kStop:
394 context.OnActionStop();
395 break;
396 }
397 auto next_wakeup_time = context.NextEventTime();
398 if (!next_wakeup_time.has_value()) {
399 break;
400 }
401 wakeup_time = *next_wakeup_time;
402 continue;
403 }
404 const auto result_idx = *wait_result;
405 UASSERT(result_idx < sub_requests.size());
406 const auto request_idx = context.GetRequestIdxBySubrequestIdx(result_idx);
407 auto& strategy = context.GetStrategy(request_idx);
408
409 auto& request = sub_requests[result_idx].request;
410 UASSERT_MSG(request, "Finished requests must not be empty");
411 auto reply = strategy.ProcessReply(std::move(*request));
412 if (reply.has_value()) {
413 /// Got reply but it's not OK and user wants to retry over
414 /// some delay
415 context.OnRetriableReply(request_idx, *reply, Clock::now());
416 /// No need to check. we just added one entry
417 wakeup_time = *context.NextEventTime();
418 } else {
419 context.OnNonRetriableReply(request_idx);
420 }
421 }
422 return context.ExtractAllReplies();
423 }
424}
425
426/// @brief Asynchronously perform bulk hedged requests described by `RequestStrategy` and
427/// return future which returns Result of type `std::vector<std::optional<ResultType>>`.
428///
429/// Result contains replies for each
430/// element in `inputs` or `std::nullopt` in case of either timeouts or bad
431/// replies (`RequestStrategy::ProcessReply(RequestType&&)` returned `std::nullopt`
432/// and `RequestStrategy::ExtractReply()` returned `std::nullopt`).
433template <typename RequestStrategy>
434auto HedgeRequestsBulkAsync(std::vector<RequestStrategy> inputs, HedgingSettings settings) {
435 return HedgedRequestBulkFuture<RequestStrategy>(utils::Async(
436 "hedged-bulk-request",
437 [inputs{std::move(inputs)}, settings{std::move(settings)}]() mutable {
438 return HedgeRequestsBulk(std::move(inputs), std::move(settings));
439 }
440 ));
441}
442
443/// Synchronously Perform hedged request described by RequestStrategy and return
444/// result or throw runtime_error. Exception can be thrown in case of timeout or
445/// if request was denied by strategy e.g. ProcessReply always returned
446/// std::nullopt or ExtractReply returned std::nullopt
447template <typename RequestStrategy>
448std::optional<typename RequestTraits<RequestStrategy>::ReplyType> HedgeRequest(
449 RequestStrategy input,
450 HedgingSettings settings
451) {
452 std::vector<RequestStrategy> inputs;
453 inputs.emplace_back(std::move(input));
454 auto bulk_ret = HedgeRequestsBulk(std::move(inputs), std::move(settings));
455 if (bulk_ret.size() != 1) {
456 return std::nullopt;
457 }
458 return bulk_ret[0];
459}
460
461/// Create future which perform hedged request described by RequestStrategy and
462/// return result or throw runtime_error. Exception can be thrown in case of
463/// timeout or if request was denied by strategy e.g. ProcessReply always
464/// returned std::nullopt or ExtractReply returned std::nullopt
465template <typename RequestStrategy>
466auto HedgeRequestAsync(RequestStrategy input, HedgingSettings settings) {
467 return HedgedRequestFuture<RequestStrategy>(utils::Async(
468 "hedged-request",
469 [input{std::move(input)}, settings{std::move(settings)}]() mutable {
470 return HedgeRequest(std::move(input), std::move(settings));
471 }
472 ));
473}
474
475} // namespace utils::hedging
476
477USERVER_NAMESPACE_END