userver: userver/utils/hedged_request.hpp Source File
Loading...
Searching...
No Matches
hedged_request.hpp
Go to the documentation of this file.
1#pragma once
2
3/// @file userver/utils/hedged_request.hpp
4/// @brief
5/// Classes and functions for performing hedged requests.
6///
7/// To perform hedged request you need to define RequestStrategy - a class
8/// similar to following ExampleStrategy:
9///
10/// class ExampleStrategy {
11/// public:
12/// /// Create request future.
13///
14/// /// Called at least once per hedged request. First time at the beginning
15/// /// of a hedged request and then every HedgingSettings::hedging_delay
16/// /// milliseconds if none of the previous requests are ready or
17/// /// ProcessReply returned non-nullopt result. If ProcessReply returned
18/// /// some interval of time, then additional request will be scheduled at
19/// /// this interval of time.
20/// /// @param attempt - increasing number for each try
21/// std::optional<RequestType> Create(std::size_t attempt);
22///
23/// /// ProcessReply is called when some request has finished. Method should
24/// /// evaluate request status and decide whether new attempt is required.
25/// /// If new attempt is required, then method must return delay for next
26/// /// attempt. If no other attempt is required, then method must return
27/// /// std::nullopt. It is expected that successful result will be stored
28/// /// somewhere internally and will be available with ExtractReply()
29/// /// method.
30/// std::optional<std::chrono::milliseconds> ProcessReply(RequestType&&);
31///
32/// std::optional<ReplyType> ExtractReply();
33///
34/// /// Method is called when system does not need request any more.
35/// /// For example, if one of the requests has been finished successfully,
36/// /// the system will finish all related subrequests.
37/// /// It is recommended to make this call as fast as possible in order
38/// /// not to block overall execution. For example, if you internally have
39/// /// some future, it is recommended to call TryCancel() here and wait
40/// /// for a cancellation in destructor, rather than call TryCancel() and
41/// /// immediately wait.
42/// void Finish(RequestType&&);
43/// };
44///
45/// Then call any of these functions:
46/// - HedgeRequest
47/// - HedgeRequestsBulk
48/// - HedgeRequestAsync
49/// - HedgeRequestsBulkAsync
50///
51
52#include <chrono>
53#include <functional>
54#include <optional>
55#include <queue>
56#include <tuple>
57#include <type_traits>
58
59#include <userver/engine/task/cancel.hpp>
60#include <userver/engine/wait_any.hpp>
61#include <userver/utils/assert.hpp>
62#include <userver/utils/async.hpp>
63#include <userver/utils/datetime.hpp>
64
65USERVER_NAMESPACE_BEGIN
66
67namespace utils::hedging {
68
69/// Class define hedging settings
71 /// Maximum requests to do
72 std::size_t max_attempts{3};
73 /// Delay between attempts
74 std::chrono::milliseconds hedging_delay{7};
75 /// Max time to wait for all requests
76 std::chrono::milliseconds timeout_all{100};
77};
78
79template <typename RequestStrategy>
81 using RequestType =
82 typename std::invoke_result_t<decltype(&RequestStrategy::Create),
84 using ReplyType =
85 typename std::invoke_result_t<decltype(&RequestStrategy::ExtractReply),
87};
88
89namespace impl {
90
91using Clock = utils::datetime::SteadyClock;
92using TimePoint = Clock::time_point;
93
94enum class Action { StartTry, Stop };
95
96struct PlanEntry {
97 public:
98 PlanEntry(TimePoint timepoint, std::size_t request_index,
99 std::size_t attempt_id, Action action)
100 : timepoint(timepoint),
101 request_index(request_index),
102 attempt_id(attempt_id),
103 action(action) {}
104
105 bool operator<(const PlanEntry& other) const noexcept {
106 return tie() < other.tie();
107 }
108 bool operator>(const PlanEntry& other) const noexcept {
109 return tie() > other.tie();
110 }
111 bool operator==(const PlanEntry& other) const noexcept {
112 return tie() == other.tie();
113 }
114 bool operator<=(const PlanEntry& other) const noexcept {
115 return tie() <= other.tie();
116 }
117 bool operator>=(const PlanEntry& other) const noexcept {
118 return tie() >= other.tie();
119 }
120 bool operator!=(const PlanEntry& other) const noexcept {
121 return tie() != other.tie();
122 }
123
124 TimePoint timepoint;
125 std::size_t request_index{0};
126 std::size_t attempt_id{0};
127 Action action;
128
129 private:
130 std::tuple<const TimePoint&, const size_t&, const size_t&, const Action&>
131 tie() const noexcept {
132 return std::tie(timepoint, request_index, attempt_id, action);
133 }
134};
135
136/// This wrapper allows us to cancel subrequests without removing elements
137/// from vector of requests
138template <typename RequestStrategy>
139struct SubrequestWrapper {
140 using RequestType = typename RequestTraits<RequestStrategy>::RequestType;
141
142 SubrequestWrapper() = default;
143 SubrequestWrapper(SubrequestWrapper&&) noexcept = default;
144 explicit SubrequestWrapper(std::optional<RequestType>&& request)
145 : request(std::move(request)) {}
146
147 engine::impl::ContextAccessor* TryGetContextAccessor() {
148 if (!request) return nullptr;
149 return request->TryGetContextAccessor();
150 }
151
152 std::optional<RequestType> request;
153};
154
155struct RequestState {
156 std::vector<std::size_t> subrequest_indices;
157 std::size_t attempts_made = 0;
158 bool finished = false;
159};
160
161template <typename RequestStrategy>
162struct Context {
163 using RequestType = typename RequestTraits<RequestStrategy>::RequestType;
164 using ReplyType = typename RequestTraits<RequestStrategy>::ReplyType;
165
166 Context(std::vector<RequestStrategy> inputs, HedgingSettings settings)
167 : inputs_(std::move(inputs)), settings(std::move(settings)) {
168 const std::size_t size = this->inputs_.size();
169 request_states_.resize(size);
170 }
171 Context(Context&&) noexcept = default;
172
173 void Prepare(TimePoint start_time) {
174 const auto request_count = GetRequestsCount();
175 for (std::size_t request_id = 0; request_id < request_count; ++request_id) {
176 plan_.emplace(start_time, request_id, 0, Action::StartTry);
177 }
178 plan_.emplace(start_time + settings.timeout_all, 0, 0, Action::Stop);
179 subrequests_.reserve(settings.max_attempts * request_count);
180 }
181
182 std::optional<TimePoint> NextEventTime() const {
183 if (plan_.empty()) return std::nullopt;
184 return plan_.top().timepoint;
185 }
186 std::optional<PlanEntry> PopPlan() {
187 if (plan_.empty()) return std::nullopt;
188 auto ret = plan_.top();
189 plan_.pop();
190 return ret;
191 }
192 bool IsStop() const { return stop_; }
193
194 void FinishAllSubrequests(std::size_t request_index) {
195 auto& request_state = request_states_[request_index];
196 request_state.finished = true;
197 const auto& subrequest_indices = request_state.subrequest_indices;
198 auto& strategy = inputs_[request_index];
199 for (auto i : subrequest_indices) {
200 auto& request = subrequests_[i].request;
201 if (request) {
202 strategy.Finish(std::move(*request));
203 request.reset();
204 }
205 }
206 }
207
208 const HedgingSettings& GetSettings() const { return settings; }
209
210 size_t GetRequestsCount() const { return inputs_.size(); }
211
212 size_t GetRequestIdxBySubrequestIdx(size_t subrequest_idx) const {
213 return input_by_subrequests_.at(subrequest_idx);
214 }
215
216 RequestStrategy& GetStrategy(size_t index) { return inputs_[index]; }
217
218 auto& GetSubRequests() { return subrequests_; }
219
220 std::vector<std::optional<ReplyType>> ExtractAllReplies() {
221 std::vector<std::optional<ReplyType>> ret;
222 ret.reserve(GetRequestsCount());
223 for (auto&& strategy : inputs_) {
224 ret.emplace_back(strategy.ExtractReply());
225 }
226 return ret;
227 }
228
229 /// @name Reactions on events with WaitAny*
230 /// @{
231 /// Called on elapsed timeout of WaitAny when next event is Stop some
232 /// request
233 void OnActionStop() {
234 for (std::size_t i = 0; i < inputs_.size(); ++i) FinishAllSubrequests(i);
235 stop_ = true;
236 }
237
238 /// Called on elapsed timeout of WaitAny when next event is Start retry of
239 /// request with id equal @param request_index
240 void OnActionStartTry(std::size_t request_index, std::size_t attempt_id,
241 TimePoint now) {
242 auto& request_state = request_states_[request_index];
243 if (request_state.finished) {
244 return;
245 }
246 auto& attempts_made = request_state.attempts_made;
247 // We could have already launched attempt with this number, for example if
248 // attempt number 2 failed with retryable code, we will add it to plan with
249 // number 3. This way, there are now two planned events, both with number=3
250 // - one from retry, one from hedging. We will execute the earliest one and
251 // skip the second one.
252 if (attempt_id < attempts_made) {
253 return;
254 }
255
256 if (attempts_made >= settings.max_attempts) {
257 return;
258 }
259 auto& strategy = inputs_[request_index];
260 auto request_opt = strategy.Create(attempts_made);
261 if (!request_opt) {
262 request_state.finished = true;
263 // User do not want to make another request (maybe some retry budget is
264 // used)
265 return;
266 }
267 const auto idx = subrequests_.size();
268 subrequests_.emplace_back(std::move(request_opt));
269 request_state.subrequest_indices.push_back(idx);
270 input_by_subrequests_[idx] = request_index;
271 attempts_made++;
272 plan_.emplace(now + settings.hedging_delay, request_index, attempts_made,
273 Action::StartTry);
274 }
275
276 /// Called on getting error in request with @param request_idx
277 void OnRetriableReply(std::size_t request_idx,
278 std::chrono::milliseconds retry_delay, TimePoint now) {
279 const auto& request_state = request_states_[request_idx];
280 if (request_state.finished) return;
281 if (request_state.attempts_made >= settings.max_attempts) return;
282
283 plan_.emplace(now + retry_delay, request_idx, request_state.attempts_made,
284 Action::StartTry);
285 }
286
287 void OnNonRetriableReply(std::size_t request_idx) {
288 FinishAllSubrequests(request_idx);
289 }
290 /// @}
291
292 private:
293 /// user provided request strategies bulk
294 std::vector<RequestStrategy> inputs_;
295 HedgingSettings settings;
296
297 /// Our plan of what we will do at what time
298 std::priority_queue<PlanEntry, std::vector<PlanEntry>, std::greater<>>
299 plan_{};
300 std::vector<SubrequestWrapper<RequestStrategy>> subrequests_{};
301 /// Store index of input by subrequest index
302 std::unordered_map<std::size_t, std::size_t> input_by_subrequests_{};
303 std::vector<RequestState> request_states_{};
304 bool stop_{false};
305};
306
307} // namespace impl
308
309/// Future of hedged bulk request
310template <typename RequestStrategy>
312 using RequestType = typename RequestTraits<RequestStrategy>::RequestType;
313 using ReplyType = typename RequestTraits<RequestStrategy>::ReplyType;
314
315 HedgedRequestBulkFuture(HedgedRequestBulkFuture&&) noexcept = default;
316 ~HedgedRequestBulkFuture() { task_.SyncCancel(); }
317
318 void Wait() { task_.Wait(); }
319 std::vector<std::optional<ReplyType>> Get() { return task_.Get(); }
320 engine::impl::ContextAccessor* TryGetContextAccessor() {
321 return task_.TryGetContextAccessor();
322 }
323
324 private:
325 template <typename RequestStrategy_>
326 friend auto HedgeRequestsBulkAsync(std::vector<RequestStrategy_> inputs,
327 HedgingSettings settings);
329 HedgedRequestBulkFuture(Task&& task) : task_(std::move(task)) {}
330 Task task_;
331};
332
333/// Future of hedged request
334template <typename RequestStrategy>
336 using RequestType = typename RequestTraits<RequestStrategy>::RequestType;
337 using ReplyType = typename RequestTraits<RequestStrategy>::ReplyType;
338
339 HedgedRequestFuture(HedgedRequestFuture&&) noexcept = default;
340 ~HedgedRequestFuture() { task_.SyncCancel(); }
341
342 void Wait() { task_.Wait(); }
343 std::optional<ReplyType> Get() { return task_.Get(); }
344 void IgnoreResult() {}
345 engine::impl::ContextAccessor* TryGetContextAccessor() {
346 return task_.TryGetContextAccessor();
347 }
348
349 private:
350 template <typename RequestStrategy_>
351 friend auto HedgeRequestAsync(RequestStrategy_ input,
352 HedgingSettings settings);
353 using Task = engine::TaskWithResult<std::optional<ReplyType>>;
354 HedgedRequestFuture(Task&& task) : task_(std::move(task)) {}
355 Task task_;
356};
357
358/// Synchronously perform bulk hedged requests described by RequestStrategy and
359/// return result of type std::vector<std::optional<ResultType>>. Result
360/// contains replies for each element in @param inputs or std::nullopt in case
361/// either timeouts or bad replies (RequestStrategy::ProcessReply(RequestType&&)
362/// returned std::nullopt and RequestStrategy::ExtractReply() returned
363/// std::nullopt)
364template <typename RequestStrategy>
365auto HedgeRequestsBulk(std::vector<RequestStrategy> inputs,
366 HedgingSettings hedging_settings) {
367 {
368 using Action = impl::Action;
369 using Clock = impl::Clock;
370 auto context =
371 impl::Context(std::move(inputs), std::move(hedging_settings));
372
373 auto& sub_requests = context.GetSubRequests();
374
375 auto wakeup_time = Clock::now();
376 context.Prepare(wakeup_time);
377
378 while (!context.IsStop()) {
379 auto wait_result = engine::WaitAnyUntil(wakeup_time, sub_requests);
380 if (!wait_result.has_value()) {
381 if (engine::current_task::ShouldCancel()) {
382 context.OnActionStop();
383 break;
384 }
385 /// timeout - need to process plan
386 auto plan_entry = context.PopPlan();
387 if (!plan_entry.has_value()) {
388 /// Timeout but we don't have planed actions any more
389 break;
390 }
391 const auto [timestamp, request_index, attempt_id, action] = *plan_entry;
392 switch (action) {
393 case Action::StartTry:
394 context.OnActionStartTry(request_index, attempt_id, timestamp);
395 break;
396 case Action::Stop:
397 context.OnActionStop();
398 break;
399 }
400 auto next_wakeup_time = context.NextEventTime();
401 if (!next_wakeup_time.has_value()) {
402 break;
403 }
404 wakeup_time = *next_wakeup_time;
405 continue;
406 }
407 const auto result_idx = *wait_result;
408 UASSERT(result_idx < sub_requests.size());
409 const auto request_idx = context.GetRequestIdxBySubrequestIdx(result_idx);
410 auto& strategy = context.GetStrategy(request_idx);
411
412 auto& request = sub_requests[result_idx].request;
413 UASSERT_MSG(request, "Finished requests must not be empty");
414 auto reply = strategy.ProcessReply(std::move(*request));
415 if (reply.has_value()) {
416 /// Got reply but it's not OK and user wants to retry over
417 /// some delay
418 context.OnRetriableReply(request_idx, *reply, Clock::now());
419 /// No need to check. we just added one entry
420 wakeup_time = *context.NextEventTime();
421 } else {
422 context.OnNonRetriableReply(request_idx);
423 }
424 }
425 return context.ExtractAllReplies();
426 }
427}
428
429/// Asynchronously perform bulk hedged requests described by RequestStrategy and
430/// return future which returns Result of type
431/// std::vector<std::optional<ResultType>>. Result contains replies for each
432/// element in @param inputs or std::nullopt in case either timeouts or bad
433/// replies (RequestStrategy::ProcessReply(RequestType&&) returned std::nullopt
434/// and RequestStrategy::ExtractReply() returned std::nullopt)
435template <typename RequestStrategy>
436auto HedgeRequestsBulkAsync(std::vector<RequestStrategy> inputs,
437 HedgingSettings settings) {
438 return HedgedRequestBulkFuture<RequestStrategy>(utils::Async(
439 "hedged-bulk-request",
440 [inputs{std::move(inputs)}, settings{std::move(settings)}]() mutable {
441 return HedgeRequestsBulk(std::move(inputs), std::move(settings));
442 }));
443}
444
445/// Synchronously Perform hedged request described by RequestStrategy and return
446/// result or throw runtime_error. Exception can be thrown in case of timeout or
447/// if request was denied by strategy e.g. ProcessReply always returned
448/// std::nullopt or ExtractReply returned std::nullopt
449template <typename RequestStrategy>
450std::optional<typename RequestTraits<RequestStrategy>::ReplyType> HedgeRequest(
451 RequestStrategy input, HedgingSettings settings) {
452 std::vector<RequestStrategy> inputs;
453 inputs.emplace_back(std::move(input));
454 auto bulk_ret = HedgeRequestsBulk(std::move(inputs), std::move(settings));
455 if (bulk_ret.size() != 1) {
456 return std::nullopt;
457 }
458 return bulk_ret[0];
459}
460
461/// Create future which perform hedged request described by RequestStrategy and
462/// return result or throw runtime_error. Exception can be thrown in case of
463/// timeout or if request was denied by strategy e.g. ProcessReply always
464/// returned std::nullopt or ExtractReply returned std::nullopt
465template <typename RequestStrategy>
466auto HedgeRequestAsync(RequestStrategy input, HedgingSettings settings) {
467 return HedgedRequestFuture<RequestStrategy>(utils::Async(
468 "hedged-request",
469 [input{std::move(input)}, settings{std::move(settings)}]() mutable {
470 return HedgeRequest(std::move(input), std::move(settings));
471 }));
472}
473
474} // namespace utils::hedging
475
476USERVER_NAMESPACE_END