userver: userver/storages/redis/hedged_request.hpp Source File
Loading...
Searching...
No Matches
hedged_request.hpp
Go to the documentation of this file.
1#pragma once
2
3/// @file userver/storages/redis/hedged_request.hpp
4/// @brief
5/// Classes and functions for performing hedged requests.
6///
7/// Use MakeHedgedRedisRequest method to perform hedged redis request.
8///
9/// Example(Sync):
10/// auto result =
11/// ::redis::MakeHedgedRedisRequest<storages::redis::RequestHGet>(
12/// redis_client_shared_ptr,
13/// &storages::redis::Client::Hget,
14/// redis_cc, hedging_settings,
15/// key, field);
16/// Example(Async):
17/// auto future =
18/// ::redis::MakeHedgedRedisRequestAsync<storages::redis::RequestHGet>(
19/// redis_client_shared_ptr,
20/// &storages::redis::Client::Hget,
21/// redis_cc, hedging_settings,
22/// key, field);
23/// auto result = future.Get();
24///
25
26#include <optional>
27
28#include <userver/storages/redis/client.hpp>
29#include <userver/storages/redis/command_control.hpp>
30#include <userver/utils/hedged_request.hpp>
31
32USERVER_NAMESPACE_BEGIN
33
34namespace storages::redis {
35namespace impl {
36
37template <typename RedisRequestType>
38struct RedisRequestStrategy {
39 using RequestType = RedisRequestType;
40 using ReplyType = typename RequestType::Reply;
41 using GenF = std::function<std::optional<RequestType>(int)>;
42
43 explicit RedisRequestStrategy(GenF gen_callback)
44 : gen_callback_(std::move(gen_callback))
45 {}
46
47 RedisRequestStrategy(RedisRequestStrategy&& other) noexcept = default;
48
49 /// @{
50 /// Methods needed by HedgingStrategy
51 std::optional<RequestType> Create(size_t try_count) { return gen_callback_(try_count); }
52 std::optional<std::chrono::milliseconds> ProcessReply(RequestType&& request) {
53 reply_ = std::move(request).Get();
54 return std::nullopt;
55 }
56 std::optional<ReplyType> ExtractReply() { return std::move(reply_); }
57 void Finish(RequestType&&) {}
58 /// @}
59
60private:
61 GenF gen_callback_;
62 std::optional<ReplyType> reply_;
63};
64
65} // namespace impl
66
67template <typename RedisRequestType>
68using HedgedRedisRequest = utils::hedging::HedgedRequestFuture<impl::RedisRequestStrategy<RedisRequestType>>;
69
70template <
71 typename RedisRequestType,
72 typename... Args,
73 typename M = RedisRequestType (storages::redis::Client::*)(Args..., const redis::CommandControl&)>
74HedgedRedisRequest<RedisRequestType> MakeHedgedRedisRequestAsync(
75 std::shared_ptr<storages::redis::Client> redis,
76 M method,
77 const redis::CommandControl& cc,
78 utils::hedging::HedgingSettings hedging_settings,
79 Args... args
80) {
81 auto gen_request =
82 [redis, method, cc{std::move(cc)}, args_tuple{std::tuple(std::move(args)...)}](int try_count
83 ) mutable -> std::optional<RedisRequestType> {
84 cc.retry_counter = try_count;
85 cc.max_retries = 1; ///< We do retries ourselves
86
87 return std::apply(
88 [redis, method, cc](auto&&... args) { return (redis.get()->*method)(args..., cc); },
89 args_tuple
90 );
91 };
92 return utils::hedging::HedgeRequestAsync(
93 impl::RedisRequestStrategy<RedisRequestType>(std::move(gen_request)),
94 hedging_settings
95 );
96}
97
98template <typename RedisRequestType>
99using HedgedRedisRequest = utils::hedging::HedgedRequestFuture<impl::RedisRequestStrategy<RedisRequestType>>;
100
101template <
102 typename RedisRequestType,
103 typename... Args,
104 typename M = RedisRequestType (storages::redis::Client::*)(Args..., const redis::CommandControl&)>
105std::optional<typename RedisRequestType::Reply> MakeHedgedRedisRequest(
106 std::shared_ptr<storages::redis::Client> redis,
107 M method,
108 const redis::CommandControl& cc,
109 utils::hedging::HedgingSettings hedging_settings,
110 Args... args
111) {
112 auto gen_request =
113 [redis, method, cc{std::move(cc)}, args_tuple{std::tuple(std::move(args)...)}](int try_count
114 ) mutable -> std::optional<RedisRequestType> {
115 cc.retry_counter = try_count;
116 cc.max_retries = 1; ///< We do retries ourselves
117
118 return std::apply(
119 [redis, method, cc](auto&&... args) { return (redis.get()->*method)(args..., cc); },
120 args_tuple
121 );
122 };
123 return utils::hedging::HedgeRequest(
124 impl::RedisRequestStrategy<RedisRequestType>(std::move(gen_request)),
125 hedging_settings
126 );
127}
128
129/// Same as MakeHedgedRedisRequest but accepting a vector of argument tuples
130/// instead of a single set of arguments. And return vector of replies
131/// corresponding to input args.
132template <
133 typename RedisRequestType,
134 typename... Args,
135 typename M = RedisRequestType (storages::redis::Client::*)(Args..., const redis::CommandControl&)>
136std::vector<std::optional<typename RedisRequestType::Reply>> MakeBulkHedgedRedisRequest(
137 std::shared_ptr<storages::redis::Client> redis,
138 M method,
139 const redis::CommandControl& cc,
140 utils::hedging::HedgingSettings hedging_settings,
141 std::vector<std::tuple<Args...>> args
142) {
143 std::vector<redis::impl::RedisRequestStrategy<RedisRequestType>> strategies;
144 strategies.reserve(args.size());
145
146 for (auto&& arg : args) {
147 auto gen_request = [redis, method, cc{std::move(cc)}, args_tuple{std::move(arg)}](int try_count) mutable
148 -> std::optional<RedisRequestType> {
149 cc.retry_counter = try_count;
150 cc.max_retries = 1; ///< We do retries ourselves
151
152 return std::apply(
153 [redis, method, cc](auto&&... args) { return (redis.get()->*method)(args..., cc); },
154 args_tuple
155 );
156 };
157 strategies.emplace_back(std::move(gen_request));
158 }
159
160 return utils::hedging::HedgeRequestsBulk(std::move(strategies), hedging_settings);
161}
162
163/// Same as MakeHedgedRedisRequestAsync but accepting a vector of argument
164/// tuples instead of a single set of arguments. And return vector of replies
165/// corresponding to input args.
166template <
167 typename RedisRequestType,
168 typename... Args,
169 typename M = RedisRequestType (storages::redis::Client::*)(Args..., const redis::CommandControl&)>
170utils::hedging::HedgedRequestBulkFuture<redis::impl::RedisRequestStrategy<RedisRequestType>>
172 std::shared_ptr<storages::redis::Client> redis,
173 M method,
174 const redis::CommandControl& cc,
175 utils::hedging::HedgingSettings hedging_settings,
176 std::vector<std::tuple<Args...>> args
177) {
178 using RequestStrategy = redis::impl::RedisRequestStrategy<RedisRequestType>;
179 std::vector<RequestStrategy> strategies;
180 strategies.reserve(args.size());
181
182 for (auto&& arg : args) {
183 auto gen_request = [redis, method, cc{std::move(cc)}, args_tuple{std::move(arg)}](int try_count) mutable
184 -> std::optional<RedisRequestType> {
185 cc.retry_counter = try_count;
186 cc.max_retries = 1; ///< We do retries ourselves
187
188 return std::apply(
189 [redis, method, cc](auto&&... args) { return (redis.get()->*method)(args..., cc); },
190 args_tuple
191 );
192 };
193 strategies.emplace_back(std::move(gen_request));
194 }
195
196 return utils::hedging::HedgeRequestsBulkAsync<RequestStrategy>(std::move(strategies), hedging_settings);
197}
198
199} // namespace storages::redis
200
201USERVER_NAMESPACE_END