userver: userver/ugrpc/client/impl/async_methods.hpp Source File
Loading...
Searching...
No Matches
async_methods.hpp
1#pragma once
2
3#include <atomic>
4#include <memory>
5#include <optional>
6#include <string_view>
7#include <utility>
8
9#include <grpcpp/client_context.h>
10#include <grpcpp/completion_queue.h>
11#include <grpcpp/impl/codegen/async_stream.h>
12#include <grpcpp/impl/codegen/async_unary_call.h>
13#include <grpcpp/impl/codegen/status.h>
14
15#include <userver/dynamic_config/fwd.hpp>
16#include <userver/tracing/in_place_span.hpp>
17#include <userver/tracing/span.hpp>
18
19#include <userver/ugrpc/client/exceptions.hpp>
20#include <userver/ugrpc/client/impl/async_method_invocation.hpp>
21#include <userver/ugrpc/client/impl/call_params.hpp>
22#include <userver/ugrpc/impl/async_method_invocation.hpp>
23#include <userver/ugrpc/impl/statistics_scope.hpp>
24
25USERVER_NAMESPACE_BEGIN
26
27namespace ugrpc::client::impl {
28
29/// @{
30/// @brief Helper type aliases for low-level asynchronous gRPC streams
31/// @see <grpcpp/impl/codegen/async_unary_call_impl.h>
32/// @see <grpcpp/impl/codegen/async_stream_impl.h>
33template <typename Response>
34using RawResponseReader =
35 std::unique_ptr<grpc::ClientAsyncResponseReader<Response>>;
36
37template <typename Response>
38using RawReader = std::unique_ptr<grpc::ClientAsyncReader<Response>>;
39
40template <typename Request>
41using RawWriter = std::unique_ptr<grpc::ClientAsyncWriter<Request>>;
42
43template <typename Request, typename Response>
44using RawReaderWriter =
45 std::unique_ptr<grpc::ClientAsyncReaderWriter<Request, Response>>;
46/// @}
47
48/// @{
49/// @brief Helper type aliases for stub member function pointers
50template <typename Stub, typename Request, typename Response>
51using RawResponseReaderPreparer = RawResponseReader<Response> (Stub::*)(
52 grpc::ClientContext*, const Request&, grpc::CompletionQueue*);
53
54template <typename Stub, typename Request, typename Response>
55using RawReaderPreparer = RawReader<Response> (Stub::*)(grpc::ClientContext*,
56 const Request&,
57 grpc::CompletionQueue*);
58
59template <typename Stub, typename Request, typename Response>
60using RawWriterPreparer = RawWriter<Request> (Stub::*)(grpc::ClientContext*,
61 Response*,
62 grpc::CompletionQueue*);
63
64template <typename Stub, typename Request, typename Response>
65using RawReaderWriterPreparer = RawReaderWriter<Request, Response> (Stub::*)(
66 grpc::ClientContext*, grpc::CompletionQueue*);
67/// @}
68
69struct RpcConfigValues final {
70 explicit RpcConfigValues(const dynamic_config::Snapshot& config);
71
72 bool enforce_task_deadline;
73};
74
75using ugrpc::client::impl::FinishAsyncMethodInvocation;
76using ugrpc::impl::AsyncMethodInvocation;
77
78class RpcData final {
79 public:
80 explicit RpcData(CallParams&&);
81
82 RpcData(RpcData&&) noexcept = delete;
83 RpcData& operator=(RpcData&&) noexcept = delete;
84 ~RpcData();
85
86 const grpc::ClientContext& GetContext() const noexcept;
87
88 grpc::ClientContext& GetContext() noexcept;
89
90 std::string_view GetCallName() const noexcept;
91
92 std::string_view GetClientName() const noexcept;
93
94 tracing::Span& GetSpan() noexcept;
95
96 grpc::CompletionQueue& GetQueue() const noexcept;
97
98 const RpcConfigValues& GetConfigValues() const noexcept;
99
100 const Middlewares& GetMiddlewares() const noexcept;
101
102 void ResetSpan() noexcept;
103
104 ugrpc::impl::RpcStatisticsScope& GetStatsScope() noexcept;
105
106 void SetWritesFinished() noexcept;
107
108 bool AreWritesFinished() const noexcept;
109
110 void SetFinished() noexcept;
111
112 bool IsFinished() const noexcept;
113
114 bool IsDeadlinePropagated() const noexcept;
115
116 void SetDeadlinePropagated() noexcept;
117
118 void EmplaceAsyncMethodInvocation();
119
120 void EmplaceFinishAsyncMethodInvocation();
121
122 AsyncMethodInvocation& GetAsyncMethodInvocation() noexcept;
123
124 FinishAsyncMethodInvocation& GetFinishAsyncMethodInvocation() noexcept;
125
126 grpc::Status& GetStatus() noexcept;
127
128 class AsyncMethodInvocationGuard {
129 public:
130 AsyncMethodInvocationGuard(RpcData& data) noexcept;
131 ~AsyncMethodInvocationGuard() noexcept;
132
133 private:
134 RpcData& data_;
135 };
136
137 private:
138 std::unique_ptr<grpc::ClientContext> context_;
139 std::string client_name_;
140 std::string_view call_name_;
141 bool writes_finished_{false};
142 bool is_finished_{false};
143 bool is_deadline_propagated_{false};
144
145 std::optional<tracing::InPlaceSpan> span_;
146 ugrpc::impl::RpcStatisticsScope stats_scope_;
147 grpc::CompletionQueue& queue_;
148 RpcConfigValues config_values_;
149 const Middlewares& mws_;
150
151 std::variant<std::monostate, AsyncMethodInvocation,
152 FinishAsyncMethodInvocation>
153 invocation_;
154 grpc::Status status_;
155};
156
157class FutureImpl final {
158 public:
159 explicit FutureImpl(RpcData& data) noexcept;
160
161 virtual ~FutureImpl() noexcept = default;
162
163 FutureImpl(FutureImpl&&) noexcept;
164 FutureImpl& operator=(FutureImpl&&) noexcept;
165
166 /// @brief Checks if the asynchronous call has completed
167 /// Note, that once user gets result, IsReady should not be called
168 /// @return true if result ready
169 [[nodiscard]] bool IsReady() const noexcept;
170
171 RpcData* GetData() noexcept;
172 void ClearData() noexcept;
173
174 private:
175 RpcData* data_;
176};
177
178void CheckOk(RpcData& data, AsyncMethodInvocation::WaitStatus status,
179 std::string_view stage);
180
181template <typename GrpcStream>
182void StartCall(GrpcStream& stream, RpcData& data) {
183 AsyncMethodInvocation start_call;
184 stream.StartCall(start_call.GetTag());
185 CheckOk(data, Wait(start_call, data.GetContext()), "StartCall");
186}
187
188void PrepareFinish(RpcData& data);
189
190void ProcessFinishResult(RpcData& data,
191 AsyncMethodInvocation::WaitStatus wait_status,
192 grpc::Status&& status, ParsedGStatus&& parsed_gstatus,
193 bool throw_on_error);
194
195template <typename GrpcStream>
196void Finish(GrpcStream& stream, RpcData& data, bool throw_on_error) {
197 PrepareFinish(data);
198
199 FinishAsyncMethodInvocation finish(data);
200 auto& status = finish.GetStatus();
201 stream.Finish(&status, finish.GetTag());
202
203 const auto wait_status = Wait(finish, data.GetContext());
204 if (wait_status == impl::AsyncMethodInvocation::WaitStatus::kCancelled) {
205 data.GetStatsScope().OnCancelled();
206 if (throw_on_error) throw RpcCancelledError(data.GetCallName(), "Finish");
207 }
208 ProcessFinishResult(data, wait_status, std::move(status),
209 std::move(finish.GetParsedGStatus()), throw_on_error);
210}
211
212void PrepareRead(RpcData& data);
213
214template <typename GrpcStream, typename Response>
215[[nodiscard]] bool Read(GrpcStream& stream, Response& response, RpcData& data) {
216 PrepareRead(data);
217 AsyncMethodInvocation read;
218 stream.Read(&response, read.GetTag());
219 const auto wait_status = Wait(read, data.GetContext());
220 if (wait_status == impl::AsyncMethodInvocation::WaitStatus::kCancelled) {
221 data.GetStatsScope().OnCancelled();
222 }
223 return wait_status == impl::AsyncMethodInvocation::WaitStatus::kOk;
224}
225
226template <typename GrpcStream, typename Response>
227void ReadAsync(GrpcStream& stream, Response& response, RpcData& data) noexcept {
228 PrepareRead(data);
229 data.EmplaceAsyncMethodInvocation();
230 auto& read = data.GetAsyncMethodInvocation();
231 stream.Read(&response, read.GetTag());
232}
233
234void PrepareWrite(RpcData& data);
235
236template <typename GrpcStream, typename Request>
237bool Write(GrpcStream& stream, const Request& request,
238 grpc::WriteOptions options, RpcData& data) {
239 PrepareWrite(data);
240 AsyncMethodInvocation write;
241 stream.Write(request, options, write.GetTag());
242 const auto result = Wait(write, data.GetContext());
243 if (result == impl::AsyncMethodInvocation::WaitStatus::kCancelled) {
244 data.GetStatsScope().OnCancelled();
245 }
246 if (result != impl::AsyncMethodInvocation::WaitStatus::kOk) {
247 data.SetWritesFinished();
248 }
249 return result == impl::AsyncMethodInvocation::WaitStatus::kOk;
250}
251
252void PrepareWriteAndCheck(RpcData& data);
253
254template <typename GrpcStream, typename Request>
255void WriteAndCheck(GrpcStream& stream, const Request& request,
256 grpc::WriteOptions options, RpcData& data) {
257 PrepareWriteAndCheck(data);
258 AsyncMethodInvocation write;
259 stream.Write(request, options, write.GetTag());
260 CheckOk(data, Wait(write, data.GetContext()), "WriteAndCheck");
261}
262
263template <typename GrpcStream>
264bool WritesDone(GrpcStream& stream, RpcData& data) {
265 PrepareWrite(data);
266 data.SetWritesFinished();
267 AsyncMethodInvocation writes_done;
268 stream.WritesDone(writes_done.GetTag());
269 const auto wait_status = Wait(writes_done, data.GetContext());
270 if (wait_status == impl::AsyncMethodInvocation::WaitStatus::kCancelled) {
271 data.GetStatsScope().OnCancelled();
272 }
273 return wait_status == impl::AsyncMethodInvocation::WaitStatus::kOk;
274}
275
276} // namespace ugrpc::client::impl
277
278USERVER_NAMESPACE_END