userver: userver/storages/redis/hedged_request.hpp Source File
Loading...
Searching...
No Matches
hedged_request.hpp
Go to the documentation of this file.
1#pragma once
2
3/// @file userver/storages/redis/hedged_request.hpp
4/// @brief
5/// Classes and functions for performing hedged requests.
6///
7/// Use MakeHedgedRedisRequest method to perform hedged redis request.
8///
9/// Example(Sync):
10/// auto result =
11/// ::redis::MakeHedgedRedisRequest<storages::redis::RequestHGet>(
12/// redis_client_shared_ptr,
13/// &storages::redis::Client::Hget,
14/// redis_cc, hedging_settings,
15/// key, field);
16/// Example(Async):
17/// auto future =
18/// ::redis::MakeHedgedRedisRequestAsync<storages::redis::RequestHGet>(
19/// redis_client_shared_ptr,
20/// &storages::redis::Client::Hget,
21/// redis_cc, hedging_settings,
22/// key, field);
23/// auto result = future.Get();
24///
25
26#include <optional>
27
28#include <userver/storages/redis/client.hpp>
29#include <userver/storages/redis/command_control.hpp>
30#include <userver/utils/hedged_request.hpp>
31
32USERVER_NAMESPACE_BEGIN
33
34namespace storages::redis {
35namespace impl {
36
37template <typename RedisRequestType>
38struct RedisRequestStrategy {
39public:
40 using RequestType = RedisRequestType;
41 using ReplyType = typename RequestType::Reply;
42 using GenF = std::function<std::optional<RequestType>(int)>;
43
44 explicit RedisRequestStrategy(GenF gen_callback)
45 : gen_callback_(std::move(gen_callback))
46 {}
47
48 RedisRequestStrategy(RedisRequestStrategy&& other) noexcept = default;
49
50 /// @{
51 /// Methods needed by HedgingStrategy
52 std::optional<RequestType> Create(size_t try_count) { return gen_callback_(try_count); }
53 std::optional<std::chrono::milliseconds> ProcessReply(RequestType&& request) {
54 reply_ = std::move(request).Get();
55 return std::nullopt;
56 }
57 std::optional<ReplyType> ExtractReply() { return std::move(reply_); }
58 void Finish(RequestType&&) {}
59 /// @}
60
61private:
62 GenF gen_callback_;
63 std::optional<ReplyType> reply_;
64};
65
66} // namespace impl
67
68template <typename RedisRequestType>
69using HedgedRedisRequest = utils::hedging::HedgedRequestFuture<impl::RedisRequestStrategy<RedisRequestType>>;
70
71template <
72 typename RedisRequestType,
73 typename... Args,
74 typename M = RedisRequestType (storages::redis::Client::*)(Args..., const redis::CommandControl&)>
75HedgedRedisRequest<RedisRequestType> MakeHedgedRedisRequestAsync(
76 std::shared_ptr<storages::redis::Client> redis,
77 M method,
78 const redis::CommandControl& cc,
79 utils::hedging::HedgingSettings hedging_settings,
80 Args... args
81) {
82 auto gen_request =
83 [redis, method, cc{std::move(cc)}, args_tuple{std::tuple(std::move(args)...)}](int try_count
84 ) mutable -> std::optional<RedisRequestType> {
85 cc.retry_counter = try_count;
86 cc.max_retries = 1; ///< We do retries ourselves
87
88 return std::apply(
89 [redis, method, cc](auto&&... args) { return (redis.get()->*method)(args..., cc); },
90 args_tuple
91 );
92 };
93 return utils::hedging::HedgeRequestAsync(
94 impl::RedisRequestStrategy<RedisRequestType>(std::move(gen_request)),
95 hedging_settings
96 );
97}
98
99template <typename RedisRequestType>
100using HedgedRedisRequest = utils::hedging::HedgedRequestFuture<impl::RedisRequestStrategy<RedisRequestType>>;
101
102template <
103 typename RedisRequestType,
104 typename... Args,
105 typename M = RedisRequestType (storages::redis::Client::*)(Args..., const redis::CommandControl&)>
106std::optional<typename RedisRequestType::Reply> MakeHedgedRedisRequest(
107 std::shared_ptr<storages::redis::Client> redis,
108 M method,
109 const redis::CommandControl& cc,
110 utils::hedging::HedgingSettings hedging_settings,
111 Args... args
112) {
113 auto gen_request =
114 [redis, method, cc{std::move(cc)}, args_tuple{std::tuple(std::move(args)...)}](int try_count
115 ) mutable -> std::optional<RedisRequestType> {
116 cc.retry_counter = try_count;
117 cc.max_retries = 1; ///< We do retries ourselves
118
119 return std::apply(
120 [redis, method, cc](auto&&... args) { return (redis.get()->*method)(args..., cc); },
121 args_tuple
122 );
123 };
124 return utils::hedging::HedgeRequest(
125 impl::RedisRequestStrategy<RedisRequestType>(std::move(gen_request)),
126 hedging_settings
127 );
128}
129
130/// Same as MakeHedgedRedisRequest but accepting a vector of argument tuples
131/// instead of a single set of arguments. And return vector of replies
132/// corresponding to input args.
133template <
134 typename RedisRequestType,
135 typename... Args,
136 typename M = RedisRequestType (storages::redis::Client::*)(Args..., const redis::CommandControl&)>
137std::vector<std::optional<typename RedisRequestType::Reply>> MakeBulkHedgedRedisRequest(
138 std::shared_ptr<storages::redis::Client> redis,
139 M method,
140 const redis::CommandControl& cc,
141 utils::hedging::HedgingSettings hedging_settings,
142 std::vector<std::tuple<Args...>> args
143) {
144 std::vector<redis::impl::RedisRequestStrategy<RedisRequestType>> strategies;
145 strategies.reserve(args.size());
146
147 for (auto&& arg : args) {
148 auto gen_request = [redis, method, cc{std::move(cc)}, args_tuple{std::move(arg)}](int try_count) mutable
149 -> std::optional<RedisRequestType> {
150 cc.retry_counter = try_count;
151 cc.max_retries = 1; ///< We do retries ourselves
152
153 return std::apply(
154 [redis, method, cc](auto&&... args) { return (redis.get()->*method)(args..., cc); },
155 args_tuple
156 );
157 };
158 strategies.emplace_back(std::move(gen_request));
159 }
160
161 return utils::hedging::HedgeRequestsBulk(std::move(strategies), hedging_settings);
162}
163
164/// Same as MakeHedgedRedisRequestAsync but accepting a vector of argument
165/// tuples instead of a single set of arguments. And return vector of replies
166/// corresponding to input args.
167template <
168 typename RedisRequestType,
169 typename... Args,
170 typename M = RedisRequestType (storages::redis::Client::*)(Args..., const redis::CommandControl&)>
171utils::hedging::HedgedRequestBulkFuture<redis::impl::RedisRequestStrategy<RedisRequestType>>
173 std::shared_ptr<storages::redis::Client> redis,
174 M method,
175 const redis::CommandControl& cc,
176 utils::hedging::HedgingSettings hedging_settings,
177 std::vector<std::tuple<Args...>> args
178) {
179 using RequestStrategy = redis::impl::RedisRequestStrategy<RedisRequestType>;
180 std::vector<RequestStrategy> strategies;
181 strategies.reserve(args.size());
182
183 for (auto&& arg : args) {
184 auto gen_request = [redis, method, cc{std::move(cc)}, args_tuple{std::move(arg)}](int try_count) mutable
185 -> std::optional<RedisRequestType> {
186 cc.retry_counter = try_count;
187 cc.max_retries = 1; ///< We do retries ourselves
188
189 return std::apply(
190 [redis, method, cc](auto&&... args) { return (redis.get()->*method)(args..., cc); },
191 args_tuple
192 );
193 };
194 strategies.emplace_back(std::move(gen_request));
195 }
196
197 return utils::hedging::HedgeRequestsBulkAsync<RequestStrategy>(std::move(strategies), hedging_settings);
198}
199
200} // namespace storages::redis
201
202USERVER_NAMESPACE_END