userver: userver/ugrpc/client/impl/async_methods.hpp Source File
⚠️ This is the documentation for an old userver version. Click here to switch to the latest version.
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages Concepts
async_methods.hpp
1#pragma once
2
3#include <atomic>
4#include <memory>
5#include <optional>
6#include <string_view>
7#include <utility>
8
9#include <grpcpp/client_context.h>
10#include <grpcpp/completion_queue.h>
11#include <grpcpp/impl/codegen/async_stream.h>
12#include <grpcpp/impl/codegen/async_unary_call.h>
13#include <grpcpp/impl/codegen/status.h>
14
15#include <userver/dynamic_config/fwd.hpp>
16#include <userver/tracing/in_place_span.hpp>
17#include <userver/tracing/span.hpp>
18
19#include <userver/ugrpc/client/exceptions.hpp>
20#include <userver/ugrpc/client/impl/async_method_invocation.hpp>
21#include <userver/ugrpc/client/impl/call_params.hpp>
22#include <userver/ugrpc/impl/async_method_invocation.hpp>
23#include <userver/ugrpc/impl/statistics_scope.hpp>
24
25USERVER_NAMESPACE_BEGIN
26
27namespace ugrpc::client::impl {
28
29/// @{
30/// @brief Helper type aliases for low-level asynchronous gRPC streams
31/// @see <grpcpp/impl/codegen/async_unary_call_impl.h>
32/// @see <grpcpp/impl/codegen/async_stream_impl.h>
33template <typename Response>
34using RawResponseReader =
35 std::unique_ptr<grpc::ClientAsyncResponseReader<Response>>;
36
37template <typename Response>
38using RawReader = std::unique_ptr<grpc::ClientAsyncReader<Response>>;
39
40template <typename Request>
41using RawWriter = std::unique_ptr<grpc::ClientAsyncWriter<Request>>;
42
43template <typename Request, typename Response>
44using RawReaderWriter =
45 std::unique_ptr<grpc::ClientAsyncReaderWriter<Request, Response>>;
46/// @}
47
48/// @{
49/// @brief Helper type aliases for stub member function pointers
50template <typename Stub, typename Request, typename Response>
51using RawResponseReaderPreparer = RawResponseReader<Response> (Stub::*)(
52 grpc::ClientContext*, const Request&, grpc::CompletionQueue*);
53
54template <typename Stub, typename Request, typename Response>
55using RawReaderPreparer = RawReader<Response> (Stub::*)(grpc::ClientContext*,
56 const Request&,
57 grpc::CompletionQueue*);
58
59template <typename Stub, typename Request, typename Response>
60using RawWriterPreparer = RawWriter<Request> (Stub::*)(grpc::ClientContext*,
61 Response*,
62 grpc::CompletionQueue*);
63
64template <typename Stub, typename Request, typename Response>
65using RawReaderWriterPreparer = RawReaderWriter<Request, Response> (Stub::*)(
66 grpc::ClientContext*, grpc::CompletionQueue*);
67/// @}
68
69struct RpcConfigValues final {
70 explicit RpcConfigValues(const dynamic_config::Snapshot& config);
71
72 bool enforce_task_deadline;
73};
74
75using ugrpc::client::impl::FinishAsyncMethodInvocation;
76using ugrpc::impl::AsyncMethodInvocation;
77
78class RpcData final {
79 public:
80 explicit RpcData(CallParams&&);
81
82 RpcData(RpcData&&) noexcept = delete;
83 RpcData& operator=(RpcData&&) noexcept = delete;
84 ~RpcData();
85
86 const grpc::ClientContext& GetContext() const noexcept;
87
88 grpc::ClientContext& GetContext() noexcept;
89
90 std::string_view GetCallName() const noexcept;
91
92 std::string_view GetClientName() const noexcept;
93
94 tracing::Span& GetSpan() noexcept;
95
96 grpc::CompletionQueue& GetQueue() const noexcept;
97
98 const RpcConfigValues& GetConfigValues() const noexcept;
99
100 const Middlewares& GetMiddlewares() const noexcept;
101
102 void ResetSpan() noexcept;
103
104 ugrpc::impl::RpcStatisticsScope& GetStatsScope() noexcept;
105
106 void SetWritesFinished() noexcept;
107
108 bool AreWritesFinished() const noexcept;
109
110 void SetFinished() noexcept;
111
112 bool IsFinished() const noexcept;
113
114 bool IsDeadlinePropagated() const noexcept;
115
116 void SetDeadlinePropagated() noexcept;
117
118 void EmplaceAsyncMethodInvocation();
119
120 void EmplaceFinishAsyncMethodInvocation();
121
122 AsyncMethodInvocation& GetAsyncMethodInvocation() noexcept;
123
124 FinishAsyncMethodInvocation& GetFinishAsyncMethodInvocation() noexcept;
125
126 grpc::Status& GetStatus() noexcept;
127
128 class AsyncMethodInvocationGuard {
129 public:
130 AsyncMethodInvocationGuard(RpcData& data) noexcept;
131 ~AsyncMethodInvocationGuard() noexcept;
132
133 private:
134 RpcData& data_;
135 };
136
137 private:
138 std::unique_ptr<grpc::ClientContext> context_;
139 std::string client_name_;
140 std::string_view call_name_;
141 bool writes_finished_{false};
142 bool is_finished_{false};
143 bool is_deadline_propagated_{false};
144
145 std::optional<tracing::InPlaceSpan> span_;
146 ugrpc::impl::RpcStatisticsScope stats_scope_;
147 grpc::CompletionQueue& queue_;
148 RpcConfigValues config_values_;
149 const Middlewares& mws_;
150
151 std::variant<std::monostate, AsyncMethodInvocation,
152 FinishAsyncMethodInvocation>
153 invocation_;
154 grpc::Status status_;
155};
156
157class FutureImpl final {
158 public:
159 explicit FutureImpl(RpcData& data) noexcept;
160
161 virtual ~FutureImpl() noexcept = default;
162
163 FutureImpl(FutureImpl&&) noexcept;
164 FutureImpl& operator=(FutureImpl&&) noexcept;
165
166 /// @brief Checks if the asynchronous call has completed
167 /// Note, that once user gets result, IsReady should not be called
168 /// @return true if result ready
169 [[nodiscard]] bool IsReady() const noexcept;
170
171 RpcData* GetData() noexcept;
172 void ClearData() noexcept;
173
174 private:
175 RpcData* data_;
176};
177
178void CheckOk(RpcData& data, AsyncMethodInvocation::WaitStatus status,
179 std::string_view stage);
180
181template <typename GrpcStream>
182void StartCall(GrpcStream& stream, RpcData& data) {
183 AsyncMethodInvocation start_call;
184 stream.StartCall(start_call.GetTag());
185 CheckOk(data, Wait(start_call, data.GetContext()), "StartCall");
186}
187
188void PrepareFinish(RpcData& data);
189
190void ProcessFinishResult(RpcData& data,
191 AsyncMethodInvocation::WaitStatus wait_status,
192 grpc::Status&& status, ParsedGStatus&& parsed_gstatus,
193 bool throw_on_error);
194
195template <typename GrpcStream>
196void Finish(GrpcStream& stream, RpcData& data, bool throw_on_error) {
197 PrepareFinish(data);
198
199 FinishAsyncMethodInvocation finish(data);
200 auto& status = finish.GetStatus();
201 stream.Finish(&status, finish.GetTag());
202
203 const auto wait_status = Wait(finish, data.GetContext());
204 if (wait_status == impl::AsyncMethodInvocation::WaitStatus::kCancelled) {
205 data.GetStatsScope().OnCancelled();
206 if (throw_on_error) throw RpcCancelledError(data.GetCallName(), "Finish");
207 }
208 ProcessFinishResult(data, wait_status, std::move(status),
209 std::move(finish.GetParsedGStatus()), throw_on_error);
210}
211
212void PrepareRead(RpcData& data);
213
214template <typename GrpcStream, typename Response>
215[[nodiscard]] bool Read(GrpcStream& stream, Response& response, RpcData& data) {
216 PrepareRead(data);
217 AsyncMethodInvocation read;
218 stream.Read(&response, read.GetTag());
219 const auto wait_status = Wait(read, data.GetContext());
220 if (wait_status == impl::AsyncMethodInvocation::WaitStatus::kCancelled) {
221 data.GetStatsScope().OnCancelled();
222 }
223 return wait_status == impl::AsyncMethodInvocation::WaitStatus::kOk;
224}
225
226template <typename GrpcStream, typename Response>
227void ReadAsync(GrpcStream& stream, Response& response, RpcData& data) noexcept {
228 PrepareRead(data);
229 data.EmplaceAsyncMethodInvocation();
230 auto& read = data.GetAsyncMethodInvocation();
231 stream.Read(&response, read.GetTag());
232}
233
234void PrepareWrite(RpcData& data);
235
236template <typename GrpcStream, typename Request>
237bool Write(GrpcStream& stream, const Request& request,
238 grpc::WriteOptions options, RpcData& data) {
239 PrepareWrite(data);
240 AsyncMethodInvocation write;
241 stream.Write(request, options, write.GetTag());
242 const auto result = Wait(write, data.GetContext());
243 if (result == impl::AsyncMethodInvocation::WaitStatus::kCancelled) {
244 data.GetStatsScope().OnCancelled();
245 }
246 if (result != impl::AsyncMethodInvocation::WaitStatus::kOk) {
247 data.SetWritesFinished();
248 }
249 return result == impl::AsyncMethodInvocation::WaitStatus::kOk;
250}
251
252void PrepareWriteAndCheck(RpcData& data);
253
254template <typename GrpcStream, typename Request>
255void WriteAndCheck(GrpcStream& stream, const Request& request,
256 grpc::WriteOptions options, RpcData& data) {
257 PrepareWriteAndCheck(data);
258 AsyncMethodInvocation write;
259 stream.Write(request, options, write.GetTag());
260 CheckOk(data, Wait(write, data.GetContext()), "WriteAndCheck");
261}
262
263template <typename GrpcStream>
264bool WritesDone(GrpcStream& stream, RpcData& data) {
265 PrepareWrite(data);
266 data.SetWritesFinished();
267 AsyncMethodInvocation writes_done;
268 stream.WritesDone(writes_done.GetTag());
269 const auto wait_status = Wait(writes_done, data.GetContext());
270 if (wait_status == impl::AsyncMethodInvocation::WaitStatus::kCancelled) {
271 data.GetStatsScope().OnCancelled();
272 }
273 return wait_status == impl::AsyncMethodInvocation::WaitStatus::kOk;
274}
275
276} // namespace ugrpc::client::impl
277
278USERVER_NAMESPACE_END