userver: userver/ugrpc/client/impl/async_methods.hpp Source File
Loading...
Searching...
No Matches
async_methods.hpp
1#pragma once
2
3#include <atomic>
4#include <memory>
5#include <optional>
6#include <string_view>
7#include <utility>
8
9#include <grpcpp/client_context.h>
10#include <grpcpp/completion_queue.h>
11#include <grpcpp/impl/codegen/async_stream.h>
12#include <grpcpp/impl/codegen/async_unary_call.h>
13#include <grpcpp/impl/codegen/status.h>
14
15#include <userver/dynamic_config/fwd.hpp>
16#include <userver/tracing/in_place_span.hpp>
17#include <userver/tracing/span.hpp>
18
19#include <userver/ugrpc/client/exceptions.hpp>
20#include <userver/ugrpc/client/impl/async_method_invocation.hpp>
21#include <userver/ugrpc/client/impl/call_params.hpp>
22#include <userver/ugrpc/impl/async_method_invocation.hpp>
23#include <userver/ugrpc/impl/statistics_scope.hpp>
24
25USERVER_NAMESPACE_BEGIN
26
27namespace ugrpc::client::impl {
28
29/// @{
30/// @brief Helper type aliases for low-level asynchronous gRPC streams
31/// @see <grpcpp/impl/codegen/async_unary_call_impl.h>
32/// @see <grpcpp/impl/codegen/async_stream_impl.h>
33template <typename Response>
34using RawResponseReader =
35 std::unique_ptr<grpc::ClientAsyncResponseReader<Response>>;
36
37template <typename Response>
38using RawReader = std::unique_ptr<grpc::ClientAsyncReader<Response>>;
39
40template <typename Request>
41using RawWriter = std::unique_ptr<grpc::ClientAsyncWriter<Request>>;
42
43template <typename Request, typename Response>
44using RawReaderWriter =
45 std::unique_ptr<grpc::ClientAsyncReaderWriter<Request, Response>>;
46/// @}
47
48/// @{
49/// @brief Helper type aliases for stub member function pointers
50template <typename Stub, typename Request, typename Response>
51using RawResponseReaderPreparer = RawResponseReader<Response> (Stub::*)(
52 grpc::ClientContext*, const Request&, grpc::CompletionQueue*);
53
54template <typename Stub, typename Request, typename Response>
55using RawReaderPreparer = RawReader<Response> (Stub::*)(grpc::ClientContext*,
56 const Request&,
57 grpc::CompletionQueue*);
58
59template <typename Stub, typename Request, typename Response>
60using RawWriterPreparer = RawWriter<Request> (Stub::*)(grpc::ClientContext*,
61 Response*,
62 grpc::CompletionQueue*);
63
64template <typename Stub, typename Request, typename Response>
65using RawReaderWriterPreparer = RawReaderWriter<Request, Response> (Stub::*)(
66 grpc::ClientContext*, grpc::CompletionQueue*);
67/// @}
68
69struct RpcConfigValues final {
70 explicit RpcConfigValues(const dynamic_config::Snapshot& config);
71
72 bool enforce_task_deadline;
73};
74
75using ugrpc::client::impl::FinishAsyncMethodInvocation;
76using ugrpc::impl::AsyncMethodInvocation;
77
78class RpcData final {
79 public:
80 explicit RpcData(CallParams&&);
81
82 RpcData(RpcData&&) noexcept = delete;
83 RpcData& operator=(RpcData&&) noexcept = delete;
84 ~RpcData();
85
86 const grpc::ClientContext& GetContext() const noexcept;
87
88 grpc::ClientContext& GetContext() noexcept;
89
90 std::string_view GetCallName() const noexcept;
91
92 std::string_view GetClientName() const noexcept;
93
94 tracing::Span& GetSpan() noexcept;
95
96 grpc::CompletionQueue& GetQueue() const noexcept;
97
98 const RpcConfigValues& GetConfigValues() const noexcept;
99
100 const Middlewares& GetMiddlewares() const noexcept;
101
102 void ResetSpan() noexcept;
103
104 ugrpc::impl::RpcStatisticsScope& GetStatsScope() noexcept;
105
106 void SetWritesFinished() noexcept;
107
108 bool AreWritesFinished() const noexcept;
109
110 void SetFinished() noexcept;
111
112 bool IsFinished() const noexcept;
113
114 bool IsDeadlinePropagated() const noexcept;
115
116 void SetDeadlinePropagated() noexcept;
117
118 // please read comments for 'invocation_' private member on why
119 // we use two different invocation types
120 void EmplaceAsyncMethodInvocation();
121
122 // please read comments for 'invocation_' private member on why
123 // we use two different invocation types
124 void EmplaceFinishAsyncMethodInvocation();
125
126 // please read comments for 'invocation_' private member on why
127 // we use two different invocation types
128 AsyncMethodInvocation& GetAsyncMethodInvocation() noexcept;
129
130 // please read comments for 'invocation_' private member on why
131 // we use two different invocation types
132 FinishAsyncMethodInvocation& GetFinishAsyncMethodInvocation() noexcept;
133
134 // This are for asserts and invariants. Do NOT branch actual code
135 // based on these two functions. Branching based on these two functions
136 // is considered UB, no diagnostics required.
137 bool HoldsAsyncMethodInvocationDebug() noexcept;
138 bool HoldsFinishAsyncMethodInvocationDebug() noexcept;
139
140 grpc::Status& GetStatus() noexcept;
141
142 class AsyncMethodInvocationGuard {
143 public:
144 AsyncMethodInvocationGuard(RpcData& data) noexcept;
145 AsyncMethodInvocationGuard(const AsyncMethodInvocationGuard&) = delete;
146 AsyncMethodInvocationGuard(AsyncMethodInvocationGuard&&) = delete;
147 ~AsyncMethodInvocationGuard() noexcept;
148
149 void Disarm() noexcept { disarm_ = true; }
150
151 private:
152 RpcData& data_;
153 bool disarm_{false};
154 };
155
156 private:
157 std::unique_ptr<grpc::ClientContext> context_;
158 std::string client_name_;
159 std::string_view call_name_;
160 bool writes_finished_{false};
161 bool is_finished_{false};
162 bool is_deadline_propagated_{false};
163
164 std::optional<tracing::InPlaceSpan> span_;
165 ugrpc::impl::RpcStatisticsScope stats_scope_;
166 grpc::CompletionQueue& queue_;
167 RpcConfigValues config_values_;
168 const Middlewares& mws_;
169
170 // This data is common for all types of grpc calls - unary and streaming
171 // However, in unary call the call is finished as soon as grpc core
172 // gives us back a response - so for unary call we use
173 // FinishAsyncMethodInvocation that will correctly close all our
174 // tracing::Span objects and account everything in statistics.
175 // In stream response, we use AsyncMethodInvocation for every intermediate
176 // Read* call, because we don't need to close span and/or account stats
177 // when finishing Read* call.
178 std::variant<std::monostate, AsyncMethodInvocation,
179 FinishAsyncMethodInvocation>
180 invocation_;
181 grpc::Status status_;
182};
183
184class FutureImpl final {
185 public:
186 explicit FutureImpl(RpcData& data) noexcept;
187
188 virtual ~FutureImpl() noexcept = default;
189
190 FutureImpl(FutureImpl&&) noexcept;
191 FutureImpl& operator=(FutureImpl&&) noexcept;
192
193 /// @brief Checks if the asynchronous call has completed
194 /// Note, that once user gets result, IsReady should not be called
195 /// @return true if result ready
196 [[nodiscard]] bool IsReady() const noexcept;
197
198 RpcData* GetData() noexcept;
199 void ClearData() noexcept;
200
201 private:
202 RpcData* data_;
203};
204
205void CheckOk(RpcData& data, AsyncMethodInvocation::WaitStatus status,
206 std::string_view stage);
207
208template <typename GrpcStream>
209void StartCall(GrpcStream& stream, RpcData& data) {
210 AsyncMethodInvocation start_call;
211 stream.StartCall(start_call.GetTag());
212 CheckOk(data, Wait(start_call, data.GetContext()), "StartCall");
213}
214
215void PrepareFinish(RpcData& data);
216
217void ProcessFinishResult(RpcData& data,
218 AsyncMethodInvocation::WaitStatus wait_status,
219 grpc::Status&& status, ParsedGStatus&& parsed_gstatus,
220 bool throw_on_error);
221
222template <typename GrpcStream>
223void Finish(GrpcStream& stream, RpcData& data, bool throw_on_error) {
224 PrepareFinish(data);
225
226 FinishAsyncMethodInvocation finish(data);
227 auto& status = finish.GetStatus();
228 stream.Finish(&status, finish.GetTag());
229
230 const auto wait_status = Wait(finish, data.GetContext());
231 if (wait_status == impl::AsyncMethodInvocation::WaitStatus::kCancelled) {
232 data.GetStatsScope().OnCancelled();
233 if (throw_on_error) throw RpcCancelledError(data.GetCallName(), "Finish");
234 }
235 ProcessFinishResult(data, wait_status, std::move(status),
236 std::move(finish.GetParsedGStatus()), throw_on_error);
237}
238
239void PrepareRead(RpcData& data);
240
241template <typename GrpcStream, typename Response>
242[[nodiscard]] bool Read(GrpcStream& stream, Response& response, RpcData& data) {
243 PrepareRead(data);
244 AsyncMethodInvocation read;
245 stream.Read(&response, read.GetTag());
246 const auto wait_status = Wait(read, data.GetContext());
247 if (wait_status == impl::AsyncMethodInvocation::WaitStatus::kCancelled) {
248 data.GetStatsScope().OnCancelled();
249 }
250 return wait_status == impl::AsyncMethodInvocation::WaitStatus::kOk;
251}
252
253template <typename GrpcStream, typename Response>
254void ReadAsync(GrpcStream& stream, Response& response, RpcData& data) noexcept {
255 PrepareRead(data);
256 data.EmplaceAsyncMethodInvocation();
257 auto& read = data.GetAsyncMethodInvocation();
258 stream.Read(&response, read.GetTag());
259}
260
261void PrepareWrite(RpcData& data);
262
263template <typename GrpcStream, typename Request>
264bool Write(GrpcStream& stream, const Request& request,
265 grpc::WriteOptions options, RpcData& data) {
266 PrepareWrite(data);
267 AsyncMethodInvocation write;
268 stream.Write(request, options, write.GetTag());
269 const auto result = Wait(write, data.GetContext());
270 if (result == impl::AsyncMethodInvocation::WaitStatus::kCancelled) {
271 data.GetStatsScope().OnCancelled();
272 }
273 if (result != impl::AsyncMethodInvocation::WaitStatus::kOk) {
274 data.SetWritesFinished();
275 }
276 return result == impl::AsyncMethodInvocation::WaitStatus::kOk;
277}
278
279void PrepareWriteAndCheck(RpcData& data);
280
281template <typename GrpcStream, typename Request>
282void WriteAndCheck(GrpcStream& stream, const Request& request,
283 grpc::WriteOptions options, RpcData& data) {
284 PrepareWriteAndCheck(data);
285 AsyncMethodInvocation write;
286 stream.Write(request, options, write.GetTag());
287 CheckOk(data, Wait(write, data.GetContext()), "WriteAndCheck");
288}
289
290template <typename GrpcStream>
291bool WritesDone(GrpcStream& stream, RpcData& data) {
292 PrepareWrite(data);
293 data.SetWritesFinished();
294 AsyncMethodInvocation writes_done;
295 stream.WritesDone(writes_done.GetTag());
296 const auto wait_status = Wait(writes_done, data.GetContext());
297 if (wait_status == impl::AsyncMethodInvocation::WaitStatus::kCancelled) {
298 data.GetStatsScope().OnCancelled();
299 }
300 return wait_status == impl::AsyncMethodInvocation::WaitStatus::kOk;
301}
302
303} // namespace ugrpc::client::impl
304
305USERVER_NAMESPACE_END