userver: userver/utils/hedged_request.hpp Source File
Loading...
Searching...
No Matches
hedged_request.hpp
Go to the documentation of this file.
1#pragma once
2
3/// @file userver/utils/hedged_request.hpp
4/// @brief
5/// Classes and functions for performing hedged requests.
6///
7/// To perform hedged request you need to define RequestStrategy - a class
8/// similar to following ExampleStrategy:
9///
10/// class ExampleStrategy {
11/// public:
12/// /// Create request future.
13///
14/// /// Called at least once per hedged request. First time at the beginning
15/// /// of a hedged request and then every HedgingSettings::hedging_delay
16/// /// milliseconds if none of the previous requests are ready or
17/// /// ProcessReply returned non-nullopt result. If ProcessReply returned
18/// /// some interval of time, then additional request will be scheduled at
19/// /// this interval of time.
20/// /// @param attempt - increasing number for each try
21/// std::optional<RequestType> Create(std::size_t attempt);
22///
23/// /// ProcessReply is called when some request has finished. Method should
24/// /// evaluate request status and decide whether new attempt is required.
25/// /// If new attempt is required, then method must return delay for next
26/// /// attempt. If no other attempt is required, then method must return
27/// /// std::nullopt. It is expected that successful result will be stored
28/// /// somewhere internally and will be available with ExtractReply()
29/// /// method.
30/// std::optional<std::chrono::milliseconds> ProcessReply(RequestType&&);
31///
32/// std::optional<ReplyType> ExtractReply();
33///
34/// /// Method is called when system does not need request any more.
35/// /// For example, if one of the requests has been finished successfully,
36/// /// the system will finish all related subrequests.
37/// /// It is recommended to make this call as fast as possible in order
38/// /// not to block overall execution. For example, if you internally have
39/// /// some future, it is recommended to call TryCancel() here and wait
40/// /// for a cancellation in destructor, rather than call TryCancel() and
41/// /// immediately wait.
42/// void Finish(RequestType&&);
43/// };
44///
45/// Then call any of these functions:
46/// - HedgeRequest
47/// - HedgeRequestsBulk
48/// - HedgeRequestAsync
49/// - HedgeRequestsBulkAsync
50///
51
52#include <chrono>
53#include <functional>
54#include <optional>
55#include <queue>
56#include <tuple>
57#include <type_traits>
58
59#include <userver/engine/task/cancel.hpp>
60#include <userver/engine/wait_any.hpp>
61#include <userver/utils/assert.hpp>
62#include <userver/utils/async.hpp>
63#include <userver/utils/datetime.hpp>
64
65USERVER_NAMESPACE_BEGIN
66
67namespace utils::hedging {
68
69/// Class define hedging settings
71 /// Maximum requests to do
72 std::size_t max_attempts{3};
73 /// Delay between attempts
74 std::chrono::milliseconds hedging_delay{7};
75 /// Max time to wait for all requests
76 std::chrono::milliseconds timeout_all{100};
77};
78
79template <typename RequestStrategy>
81 using RequestType =
83 using ReplyType =
85};
86
87namespace impl {
88
89using Clock = utils::datetime::SteadyClock;
90using TimePoint = Clock::time_point;
91
92enum class Action { StartTry, Stop };
93
94struct PlanEntry {
95public:
96 PlanEntry(TimePoint timepoint, std::size_t request_index, std::size_t attempt_id, Action action)
97 : timepoint(timepoint), request_index(request_index), attempt_id(attempt_id), action(action) {}
98
99 bool operator<(const PlanEntry& other) const noexcept { return tie() < other.tie(); }
100 bool operator>(const PlanEntry& other) const noexcept { return tie() > other.tie(); }
101 bool operator==(const PlanEntry& other) const noexcept { return tie() == other.tie(); }
102 bool operator<=(const PlanEntry& other) const noexcept { return tie() <= other.tie(); }
103 bool operator>=(const PlanEntry& other) const noexcept { return tie() >= other.tie(); }
104 bool operator!=(const PlanEntry& other) const noexcept { return tie() != other.tie(); }
105
106 TimePoint timepoint;
107 std::size_t request_index{0};
108 std::size_t attempt_id{0};
109 Action action;
110
111private:
112 std::tuple<const TimePoint&, const size_t&, const size_t&, const Action&> tie() const noexcept {
113 return std::tie(timepoint, request_index, attempt_id, action);
114 }
115};
116
117/// This wrapper allows us to cancel subrequests without removing elements
118/// from vector of requests
119template <typename RequestStrategy>
120struct SubrequestWrapper {
121 using RequestType = typename RequestTraits<RequestStrategy>::RequestType;
122
123 SubrequestWrapper() = default;
124 SubrequestWrapper(SubrequestWrapper&&) noexcept = default;
125 explicit SubrequestWrapper(std::optional<RequestType>&& request) : request(std::move(request)) {}
126
127 engine::impl::ContextAccessor* TryGetContextAccessor() {
128 if (!request) return nullptr;
129 return request->TryGetContextAccessor();
130 }
131
132 std::optional<RequestType> request;
133};
134
135struct RequestState {
136 std::vector<std::size_t> subrequest_indices;
137 std::size_t attempts_made = 0;
138 bool finished = false;
139};
140
141template <typename RequestStrategy>
142struct Context {
143 using RequestType = typename RequestTraits<RequestStrategy>::RequestType;
144 using ReplyType = typename RequestTraits<RequestStrategy>::ReplyType;
145
146 Context(std::vector<RequestStrategy> inputs, HedgingSettings settings)
147 : inputs_(std::move(inputs)), settings(std::move(settings)) {
148 const std::size_t size = this->inputs_.size();
149 request_states_.resize(size);
150 }
151 Context(Context&&) noexcept = default;
152
153 void Prepare(TimePoint start_time) {
154 const auto request_count = GetRequestsCount();
155 for (std::size_t request_id = 0; request_id < request_count; ++request_id) {
156 plan_.emplace(start_time, request_id, 0, Action::StartTry);
157 }
158 plan_.emplace(start_time + settings.timeout_all, 0, 0, Action::Stop);
159 subrequests_.reserve(settings.max_attempts * request_count);
160 }
161
162 std::optional<TimePoint> NextEventTime() const {
163 if (plan_.empty()) return std::nullopt;
164 return plan_.top().timepoint;
165 }
166 std::optional<PlanEntry> PopPlan() {
167 if (plan_.empty()) return std::nullopt;
168 auto ret = plan_.top();
169 plan_.pop();
170 return ret;
171 }
172 bool IsStop() const { return stop_; }
173
174 void FinishAllSubrequests(std::size_t request_index) {
175 auto& request_state = request_states_[request_index];
176 request_state.finished = true;
177 const auto& subrequest_indices = request_state.subrequest_indices;
178 auto& strategy = inputs_[request_index];
179 for (auto i : subrequest_indices) {
180 auto& request = subrequests_[i].request;
181 if (request) {
182 strategy.Finish(std::move(*request));
183 request.reset();
184 }
185 }
186 }
187
188 const HedgingSettings& GetSettings() const { return settings; }
189
190 size_t GetRequestsCount() const { return inputs_.size(); }
191
192 size_t GetRequestIdxBySubrequestIdx(size_t subrequest_idx) const {
193 return input_by_subrequests_.at(subrequest_idx);
194 }
195
196 RequestStrategy& GetStrategy(size_t index) { return inputs_[index]; }
197
198 auto& GetSubRequests() { return subrequests_; }
199
200 std::vector<std::optional<ReplyType>> ExtractAllReplies() {
201 std::vector<std::optional<ReplyType>> ret;
202 ret.reserve(GetRequestsCount());
203 for (auto&& strategy : inputs_) {
204 ret.emplace_back(strategy.ExtractReply());
205 }
206 return ret;
207 }
208
209 /// @name Reactions on events with WaitAny*
210 /// @{
211 /// Called on elapsed timeout of WaitAny when next event is Stop some
212 /// request
213 void OnActionStop() {
214 for (std::size_t i = 0; i < inputs_.size(); ++i) FinishAllSubrequests(i);
215 stop_ = true;
216 }
217
218 /// Called on elapsed timeout of WaitAny when next event is Start retry of
219 /// request with id equal @param request_index
220 void OnActionStartTry(std::size_t request_index, std::size_t attempt_id, TimePoint now) {
221 auto& request_state = request_states_[request_index];
222 if (request_state.finished) {
223 return;
224 }
225 auto& attempts_made = request_state.attempts_made;
226 // We could have already launched attempt with this number, for example if
227 // attempt number 2 failed with retryable code, we will add it to plan with
228 // number 3. This way, there are now two planned events, both with number=3
229 // - one from retry, one from hedging. We will execute the earliest one and
230 // skip the second one.
231 if (attempt_id < attempts_made) {
232 return;
233 }
234
235 if (attempts_made >= settings.max_attempts) {
236 return;
237 }
238 auto& strategy = inputs_[request_index];
239 auto request_opt = strategy.Create(attempts_made);
240 if (!request_opt) {
241 request_state.finished = true;
242 // User do not want to make another request (maybe some retry budget is
243 // used)
244 return;
245 }
246 const auto idx = subrequests_.size();
247 subrequests_.emplace_back(std::move(request_opt));
248 request_state.subrequest_indices.push_back(idx);
249 input_by_subrequests_[idx] = request_index;
250 attempts_made++;
251 plan_.emplace(now + settings.hedging_delay, request_index, attempts_made, Action::StartTry);
252 }
253
254 /// Called on getting error in request with @param request_idx
255 void OnRetriableReply(std::size_t request_idx, std::chrono::milliseconds retry_delay, TimePoint now) {
256 const auto& request_state = request_states_[request_idx];
257 if (request_state.finished) return;
258 if (request_state.attempts_made >= settings.max_attempts) return;
259
260 plan_.emplace(now + retry_delay, request_idx, request_state.attempts_made, Action::StartTry);
261 }
262
263 void OnNonRetriableReply(std::size_t request_idx) { FinishAllSubrequests(request_idx); }
264 /// @}
265
266private:
267 /// user provided request strategies bulk
268 std::vector<RequestStrategy> inputs_;
269 HedgingSettings settings;
270
271 /// Our plan of what we will do at what time
272 std::priority_queue<PlanEntry, std::vector<PlanEntry>, std::greater<>> plan_{};
273 std::vector<SubrequestWrapper<RequestStrategy>> subrequests_{};
274 /// Store index of input by subrequest index
275 std::unordered_map<std::size_t, std::size_t> input_by_subrequests_{};
276 std::vector<RequestState> request_states_{};
277 bool stop_{false};
278};
279
280} // namespace impl
281
282/// Future of hedged bulk request
283template <typename RequestStrategy>
285 using RequestType = typename RequestTraits<RequestStrategy>::RequestType;
286 using ReplyType = typename RequestTraits<RequestStrategy>::ReplyType;
287
288 HedgedRequestBulkFuture(HedgedRequestBulkFuture&&) noexcept = default;
289 ~HedgedRequestBulkFuture() { task_.SyncCancel(); }
290
291 void Wait() { task_.Wait(); }
292 std::vector<std::optional<ReplyType>> Get() { return task_.Get(); }
293 engine::impl::ContextAccessor* TryGetContextAccessor() { return task_.TryGetContextAccessor(); }
294
295private:
296 template <typename RequestStrategy_>
297 friend auto HedgeRequestsBulkAsync(std::vector<RequestStrategy_> inputs, HedgingSettings settings);
299 HedgedRequestBulkFuture(Task&& task) : task_(std::move(task)) {}
300 Task task_;
301};
302
303/// Future of hedged request
304template <typename RequestStrategy>
306 using RequestType = typename RequestTraits<RequestStrategy>::RequestType;
307 using ReplyType = typename RequestTraits<RequestStrategy>::ReplyType;
308
309 HedgedRequestFuture(HedgedRequestFuture&&) noexcept = default;
310 ~HedgedRequestFuture() { task_.SyncCancel(); }
311
312 void Wait() { task_.Wait(); }
313 std::optional<ReplyType> Get() { return task_.Get(); }
314 void IgnoreResult() {}
315 engine::impl::ContextAccessor* TryGetContextAccessor() { return task_.TryGetContextAccessor(); }
316
317private:
318 template <typename RequestStrategy_>
319 friend auto HedgeRequestAsync(RequestStrategy_ input, HedgingSettings settings);
320 using Task = engine::TaskWithResult<std::optional<ReplyType>>;
321 HedgedRequestFuture(Task&& task) : task_(std::move(task)) {}
322 Task task_;
323};
324
325/// Synchronously perform bulk hedged requests described by RequestStrategy and
326/// return result of type std::vector<std::optional<ResultType>>. Result
327/// contains replies for each element in @param inputs or std::nullopt in case
328/// either timeouts or bad replies (RequestStrategy::ProcessReply(RequestType&&)
329/// returned std::nullopt and RequestStrategy::ExtractReply() returned
330/// std::nullopt)
331template <typename RequestStrategy>
332auto HedgeRequestsBulk(std::vector<RequestStrategy> inputs, HedgingSettings hedging_settings) {
333 {
334 using Action = impl::Action;
335 using Clock = impl::Clock;
336 auto context = impl::Context(std::move(inputs), std::move(hedging_settings));
337
338 auto& sub_requests = context.GetSubRequests();
339
340 auto wakeup_time = Clock::now();
341 context.Prepare(wakeup_time);
342
343 while (!context.IsStop()) {
344 auto wait_result = engine::WaitAnyUntil(wakeup_time, sub_requests);
345 if (!wait_result.has_value()) {
346 if (engine::current_task::ShouldCancel()) {
347 context.OnActionStop();
348 break;
349 }
350 /// timeout - need to process plan
351 auto plan_entry = context.PopPlan();
352 if (!plan_entry.has_value()) {
353 /// Timeout but we don't have planed actions any more
354 break;
355 }
356 const auto [timestamp, request_index, attempt_id, action] = *plan_entry;
357 switch (action) {
358 case Action::StartTry:
359 context.OnActionStartTry(request_index, attempt_id, timestamp);
360 break;
361 case Action::Stop:
362 context.OnActionStop();
363 break;
364 }
365 auto next_wakeup_time = context.NextEventTime();
366 if (!next_wakeup_time.has_value()) {
367 break;
368 }
369 wakeup_time = *next_wakeup_time;
370 continue;
371 }
372 const auto result_idx = *wait_result;
373 UASSERT(result_idx < sub_requests.size());
374 const auto request_idx = context.GetRequestIdxBySubrequestIdx(result_idx);
375 auto& strategy = context.GetStrategy(request_idx);
376
377 auto& request = sub_requests[result_idx].request;
378 UASSERT_MSG(request, "Finished requests must not be empty");
379 auto reply = strategy.ProcessReply(std::move(*request));
380 if (reply.has_value()) {
381 /// Got reply but it's not OK and user wants to retry over
382 /// some delay
383 context.OnRetriableReply(request_idx, *reply, Clock::now());
384 /// No need to check. we just added one entry
385 wakeup_time = *context.NextEventTime();
386 } else {
387 context.OnNonRetriableReply(request_idx);
388 }
389 }
390 return context.ExtractAllReplies();
391 }
392}
393
394/// Asynchronously perform bulk hedged requests described by RequestStrategy and
395/// return future which returns Result of type
396/// std::vector<std::optional<ResultType>>. Result contains replies for each
397/// element in @param inputs or std::nullopt in case either timeouts or bad
398/// replies (RequestStrategy::ProcessReply(RequestType&&) returned std::nullopt
399/// and RequestStrategy::ExtractReply() returned std::nullopt)
400template <typename RequestStrategy>
401auto HedgeRequestsBulkAsync(std::vector<RequestStrategy> inputs, HedgingSettings settings) {
402 return HedgedRequestBulkFuture<RequestStrategy>(utils::Async(
403 "hedged-bulk-request",
404 [inputs{std::move(inputs)}, settings{std::move(settings)}]() mutable {
405 return HedgeRequestsBulk(std::move(inputs), std::move(settings));
406 }
407 ));
408}
409
410/// Synchronously Perform hedged request described by RequestStrategy and return
411/// result or throw runtime_error. Exception can be thrown in case of timeout or
412/// if request was denied by strategy e.g. ProcessReply always returned
413/// std::nullopt or ExtractReply returned std::nullopt
414template <typename RequestStrategy>
415std::optional<typename RequestTraits<RequestStrategy>::ReplyType>
416HedgeRequest(RequestStrategy input, HedgingSettings settings) {
417 std::vector<RequestStrategy> inputs;
418 inputs.emplace_back(std::move(input));
419 auto bulk_ret = HedgeRequestsBulk(std::move(inputs), std::move(settings));
420 if (bulk_ret.size() != 1) {
421 return std::nullopt;
422 }
423 return bulk_ret[0];
424}
425
426/// Create future which perform hedged request described by RequestStrategy and
427/// return result or throw runtime_error. Exception can be thrown in case of
428/// timeout or if request was denied by strategy e.g. ProcessReply always
429/// returned std::nullopt or ExtractReply returned std::nullopt
430template <typename RequestStrategy>
431auto HedgeRequestAsync(RequestStrategy input, HedgingSettings settings) {
432 return HedgedRequestFuture<RequestStrategy>(utils::Async(
433 "hedged-request",
434 [input{std::move(input)}, settings{std::move(settings)}]() mutable {
435 return HedgeRequest(std::move(input), std::move(settings));
436 }
437 ));
438}
439
440} // namespace utils::hedging
441
442USERVER_NAMESPACE_END