9#include <grpcpp/client_context.h>
10#include <grpcpp/completion_queue.h>
11#include <grpcpp/impl/codegen/async_stream.h>
12#include <grpcpp/impl/codegen/async_unary_call.h>
13#include <grpcpp/impl/codegen/status.h>
15#include <userver/dynamic_config/fwd.hpp>
16#include <userver/tracing/in_place_span.hpp>
17#include <userver/tracing/span.hpp>
19#include <userver/ugrpc/client/exceptions.hpp>
20#include <userver/ugrpc/client/impl/async_method_invocation.hpp>
21#include <userver/ugrpc/client/impl/call_params.hpp>
22#include <userver/ugrpc/impl/async_method_invocation.hpp>
23#include <userver/ugrpc/impl/statistics_scope.hpp>
25USERVER_NAMESPACE_BEGIN
27namespace ugrpc::
client::impl {
33template <
typename Response>
34using RawResponseReader =
35 std::unique_ptr<grpc::ClientAsyncResponseReader<Response>>;
37template <
typename Response>
38using RawReader = std::unique_ptr<grpc::ClientAsyncReader<Response>>;
40template <
typename Request>
41using RawWriter = std::unique_ptr<grpc::ClientAsyncWriter<Request>>;
43template <
typename Request,
typename Response>
44using RawReaderWriter =
45 std::unique_ptr<grpc::ClientAsyncReaderWriter<Request, Response>>;
50template <
typename Stub,
typename Request,
typename Response>
51using RawResponseReaderPreparer = RawResponseReader<Response> (Stub::*)(
52 grpc::ClientContext*,
const Request&, grpc::CompletionQueue*);
54template <
typename Stub,
typename Request,
typename Response>
55using RawReaderPreparer = RawReader<Response> (Stub::*)(grpc::ClientContext*,
57 grpc::CompletionQueue*);
59template <
typename Stub,
typename Request,
typename Response>
60using RawWriterPreparer = RawWriter<Request> (Stub::*)(grpc::ClientContext*,
62 grpc::CompletionQueue*);
64template <
typename Stub,
typename Request,
typename Response>
65using RawReaderWriterPreparer = RawReaderWriter<Request, Response> (Stub::*)(
66 grpc::ClientContext*, grpc::CompletionQueue*);
69struct RpcConfigValues
final {
70 explicit RpcConfigValues(
const dynamic_config::Snapshot& config);
72 bool enforce_task_deadline;
75using ugrpc::
client::impl::FinishAsyncMethodInvocation;
76using ugrpc::impl::AsyncMethodInvocation;
80 explicit RpcData(CallParams&&);
82 RpcData(RpcData&&)
noexcept =
delete;
83 RpcData& operator=(RpcData&&)
noexcept =
delete;
86 const grpc::ClientContext& GetContext()
const noexcept;
88 grpc::ClientContext& GetContext()
noexcept;
90 std::string_view GetCallName()
const noexcept;
92 std::string_view GetClientName()
const noexcept;
94 tracing::Span& GetSpan()
noexcept;
96 grpc::CompletionQueue& GetQueue()
const noexcept;
98 const RpcConfigValues& GetConfigValues()
const noexcept;
100 const Middlewares& GetMiddlewares()
const noexcept;
102 void ResetSpan()
noexcept;
104 ugrpc::impl::RpcStatisticsScope& GetStatsScope()
noexcept;
106 void SetWritesFinished()
noexcept;
108 bool AreWritesFinished()
const noexcept;
110 void SetFinished()
noexcept;
112 bool IsFinished()
const noexcept;
114 bool IsDeadlinePropagated()
const noexcept;
116 void SetDeadlinePropagated()
noexcept;
118 void EmplaceAsyncMethodInvocation();
120 void EmplaceFinishAsyncMethodInvocation();
122 AsyncMethodInvocation& GetAsyncMethodInvocation()
noexcept;
124 FinishAsyncMethodInvocation& GetFinishAsyncMethodInvocation()
noexcept;
126 grpc::Status& GetStatus()
noexcept;
128 class AsyncMethodInvocationGuard {
130 AsyncMethodInvocationGuard(RpcData& data)
noexcept;
131 ~AsyncMethodInvocationGuard()
noexcept;
138 std::unique_ptr<grpc::ClientContext> context_;
139 std::string client_name_;
140 std::string_view call_name_;
141 bool writes_finished_{
false};
142 bool is_finished_{
false};
143 bool is_deadline_propagated_{
false};
145 std::optional<
tracing::InPlaceSpan> span_;
146 ugrpc::impl::RpcStatisticsScope stats_scope_;
147 grpc::CompletionQueue& queue_;
148 RpcConfigValues config_values_;
149 const Middlewares& mws_;
151 std::variant<std::monostate, AsyncMethodInvocation,
152 FinishAsyncMethodInvocation>
154 grpc::Status status_;
157class FutureImpl
final {
159 explicit FutureImpl(RpcData& data)
noexcept;
161 virtual ~FutureImpl()
noexcept =
default;
163 FutureImpl(FutureImpl&&)
noexcept;
164 FutureImpl& operator=(FutureImpl&&)
noexcept;
169 [[nodiscard]]
bool IsReady()
const noexcept;
171 RpcData* GetData()
noexcept;
172 void ClearData()
noexcept;
178void CheckOk(RpcData& data, AsyncMethodInvocation::WaitStatus status,
179 std::string_view stage);
181template <
typename GrpcStream>
182void StartCall(GrpcStream& stream, RpcData& data) {
183 AsyncMethodInvocation start_call;
184 stream.StartCall(start_call.GetTag());
185 CheckOk(data, Wait(start_call, data.GetContext()),
"StartCall");
188void PrepareFinish(RpcData& data);
190void ProcessFinishResult(RpcData& data,
191 AsyncMethodInvocation::WaitStatus wait_status,
192 grpc::Status&& status, ParsedGStatus&& parsed_gstatus,
193 bool throw_on_error);
195template <
typename GrpcStream>
196void Finish(GrpcStream& stream, RpcData& data,
bool throw_on_error) {
199 FinishAsyncMethodInvocation finish(data);
200 auto& status = finish.GetStatus();
201 stream.Finish(&status, finish.GetTag());
203 const auto wait_status = Wait(finish, data.GetContext());
204 if (wait_status == impl::AsyncMethodInvocation::WaitStatus::kCancelled) {
205 data.GetStatsScope().OnCancelled();
206 if (throw_on_error)
throw RpcCancelledError(data.GetCallName(),
"Finish");
208 ProcessFinishResult(data, wait_status, std::move(status),
209 std::move(finish.GetParsedGStatus()), throw_on_error);
212void PrepareRead(RpcData& data);
214template <
typename GrpcStream,
typename Response>
215[[nodiscard]]
bool Read(GrpcStream& stream, Response& response, RpcData& data) {
217 AsyncMethodInvocation read;
218 stream.Read(&response, read.GetTag());
219 const auto wait_status = Wait(read, data.GetContext());
220 if (wait_status == impl::AsyncMethodInvocation::WaitStatus::kCancelled) {
221 data.GetStatsScope().OnCancelled();
223 return wait_status == impl::AsyncMethodInvocation::WaitStatus::kOk;
226template <
typename GrpcStream,
typename Response>
227void ReadAsync(GrpcStream& stream, Response& response, RpcData& data)
noexcept {
229 data.EmplaceAsyncMethodInvocation();
230 auto& read = data.GetAsyncMethodInvocation();
231 stream.Read(&response, read.GetTag());
234void PrepareWrite(RpcData& data);
236template <
typename GrpcStream,
typename Request>
237bool Write(GrpcStream& stream,
const Request& request,
238 grpc::WriteOptions options, RpcData& data) {
240 AsyncMethodInvocation write;
241 stream.Write(request, options, write.GetTag());
242 const auto result = Wait(write, data.GetContext());
243 if (result == impl::AsyncMethodInvocation::WaitStatus::kCancelled) {
244 data.GetStatsScope().OnCancelled();
246 if (result != impl::AsyncMethodInvocation::WaitStatus::kOk) {
247 data.SetWritesFinished();
249 return result == impl::AsyncMethodInvocation::WaitStatus::kOk;
252void PrepareWriteAndCheck(RpcData& data);
254template <
typename GrpcStream,
typename Request>
255void WriteAndCheck(GrpcStream& stream,
const Request& request,
256 grpc::WriteOptions options, RpcData& data) {
257 PrepareWriteAndCheck(data);
258 AsyncMethodInvocation write;
259 stream.Write(request, options, write.GetTag());
260 CheckOk(data, Wait(write, data.GetContext()),
"WriteAndCheck");
263template <
typename GrpcStream>
264bool WritesDone(GrpcStream& stream, RpcData& data) {
266 data.SetWritesFinished();
267 AsyncMethodInvocation writes_done;
268 stream.WritesDone(writes_done.GetTag());
269 const auto wait_status = Wait(writes_done, data.GetContext());
270 if (wait_status == impl::AsyncMethodInvocation::WaitStatus::kCancelled) {
271 data.GetStatsScope().OnCancelled();
273 return wait_status == impl::AsyncMethodInvocation::WaitStatus::kOk;