9#include <grpcpp/client_context.h>
10#include <grpcpp/completion_queue.h>
11#include <grpcpp/impl/codegen/async_stream.h>
12#include <grpcpp/impl/codegen/async_unary_call.h>
13#include <grpcpp/impl/codegen/status.h>
15#include <userver/dynamic_config/fwd.hpp>
16#include <userver/tracing/in_place_span.hpp>
17#include <userver/tracing/span.hpp>
19#include <userver/ugrpc/client/exceptions.hpp>
20#include <userver/ugrpc/client/impl/async_method_invocation.hpp>
21#include <userver/ugrpc/client/impl/call_params.hpp>
22#include <userver/ugrpc/impl/async_method_invocation.hpp>
23#include <userver/ugrpc/impl/statistics_scope.hpp>
25USERVER_NAMESPACE_BEGIN
27namespace ugrpc::
client::impl {
33template <
typename Response>
34using RawResponseReader =
35 std::unique_ptr<grpc::ClientAsyncResponseReader<Response>>;
37template <
typename Response>
38using RawReader = std::unique_ptr<grpc::ClientAsyncReader<Response>>;
40template <
typename Request>
41using RawWriter = std::unique_ptr<grpc::ClientAsyncWriter<Request>>;
43template <
typename Request,
typename Response>
44using RawReaderWriter =
45 std::unique_ptr<grpc::ClientAsyncReaderWriter<Request, Response>>;
50template <
typename Stub,
typename Request,
typename Response>
51using RawResponseReaderPreparer = RawResponseReader<Response> (Stub::*)(
52 grpc::ClientContext*,
const Request&, grpc::CompletionQueue*);
54template <
typename Stub,
typename Request,
typename Response>
55using RawReaderPreparer = RawReader<Response> (Stub::*)(grpc::ClientContext*,
57 grpc::CompletionQueue*);
59template <
typename Stub,
typename Request,
typename Response>
60using RawWriterPreparer = RawWriter<Request> (Stub::*)(grpc::ClientContext*,
62 grpc::CompletionQueue*);
64template <
typename Stub,
typename Request,
typename Response>
65using RawReaderWriterPreparer = RawReaderWriter<Request, Response> (Stub::*)(
66 grpc::ClientContext*, grpc::CompletionQueue*);
69struct RpcConfigValues
final {
70 explicit RpcConfigValues(
const dynamic_config::Snapshot& config);
72 bool enforce_task_deadline;
75using ugrpc::
client::impl::FinishAsyncMethodInvocation;
76using ugrpc::impl::AsyncMethodInvocation;
80 explicit RpcData(CallParams&&);
82 RpcData(RpcData&&)
noexcept =
delete;
83 RpcData& operator=(RpcData&&)
noexcept =
delete;
86 const grpc::ClientContext& GetContext()
const noexcept;
88 grpc::ClientContext& GetContext()
noexcept;
90 std::string_view GetCallName()
const noexcept;
92 std::string_view GetClientName()
const noexcept;
94 tracing::Span& GetSpan()
noexcept;
96 grpc::CompletionQueue& GetQueue()
const noexcept;
98 const RpcConfigValues& GetConfigValues()
const noexcept;
100 const Middlewares& GetMiddlewares()
const noexcept;
102 void ResetSpan()
noexcept;
104 ugrpc::impl::RpcStatisticsScope& GetStatsScope()
noexcept;
106 void SetWritesFinished()
noexcept;
108 bool AreWritesFinished()
const noexcept;
110 void SetFinished()
noexcept;
112 bool IsFinished()
const noexcept;
114 bool IsDeadlinePropagated()
const noexcept;
116 void SetDeadlinePropagated()
noexcept;
120 void EmplaceAsyncMethodInvocation();
124 void EmplaceFinishAsyncMethodInvocation();
128 AsyncMethodInvocation& GetAsyncMethodInvocation()
noexcept;
132 FinishAsyncMethodInvocation& GetFinishAsyncMethodInvocation()
noexcept;
137 bool HoldsAsyncMethodInvocationDebug()
noexcept;
138 bool HoldsFinishAsyncMethodInvocationDebug()
noexcept;
140 grpc::Status& GetStatus()
noexcept;
142 class AsyncMethodInvocationGuard {
144 AsyncMethodInvocationGuard(RpcData& data)
noexcept;
145 AsyncMethodInvocationGuard(
const AsyncMethodInvocationGuard&) =
delete;
146 AsyncMethodInvocationGuard(AsyncMethodInvocationGuard&&) =
delete;
147 ~AsyncMethodInvocationGuard()
noexcept;
149 void Disarm()
noexcept { disarm_ =
true; }
157 std::unique_ptr<grpc::ClientContext> context_;
158 std::string client_name_;
159 std::string_view call_name_;
160 bool writes_finished_{
false};
161 bool is_finished_{
false};
162 bool is_deadline_propagated_{
false};
164 std::optional<
tracing::InPlaceSpan> span_;
165 ugrpc::impl::RpcStatisticsScope stats_scope_;
166 grpc::CompletionQueue& queue_;
167 RpcConfigValues config_values_;
168 const Middlewares& mws_;
178 std::variant<std::monostate, AsyncMethodInvocation,
179 FinishAsyncMethodInvocation>
181 grpc::Status status_;
184class FutureImpl
final {
186 explicit FutureImpl(RpcData& data)
noexcept;
188 virtual ~FutureImpl()
noexcept =
default;
190 FutureImpl(FutureImpl&&)
noexcept;
191 FutureImpl& operator=(FutureImpl&&)
noexcept;
196 [[nodiscard]]
bool IsReady()
const noexcept;
198 RpcData* GetData()
noexcept;
199 void ClearData()
noexcept;
205void CheckOk(RpcData& data, AsyncMethodInvocation::WaitStatus status,
206 std::string_view stage);
208template <
typename GrpcStream>
209void StartCall(GrpcStream& stream, RpcData& data) {
210 AsyncMethodInvocation start_call;
211 stream.StartCall(start_call.GetTag());
212 CheckOk(data, Wait(start_call, data.GetContext()),
"StartCall");
215void PrepareFinish(RpcData& data);
217void ProcessFinishResult(RpcData& data,
218 AsyncMethodInvocation::WaitStatus wait_status,
219 grpc::Status&& status, ParsedGStatus&& parsed_gstatus,
220 bool throw_on_error);
222template <
typename GrpcStream>
223void Finish(GrpcStream& stream, RpcData& data,
bool throw_on_error) {
226 FinishAsyncMethodInvocation finish(data);
227 auto& status = finish.GetStatus();
228 stream.Finish(&status, finish.GetTag());
230 const auto wait_status = Wait(finish, data.GetContext());
231 if (wait_status == impl::AsyncMethodInvocation::WaitStatus::kCancelled) {
232 data.GetStatsScope().OnCancelled();
233 if (throw_on_error)
throw RpcCancelledError(data.GetCallName(),
"Finish");
235 ProcessFinishResult(data, wait_status, std::move(status),
236 std::move(finish.GetParsedGStatus()), throw_on_error);
239void PrepareRead(RpcData& data);
241template <
typename GrpcStream,
typename Response>
242[[nodiscard]]
bool Read(GrpcStream& stream, Response& response, RpcData& data) {
244 AsyncMethodInvocation read;
245 stream.Read(&response, read.GetTag());
246 const auto wait_status = Wait(read, data.GetContext());
247 if (wait_status == impl::AsyncMethodInvocation::WaitStatus::kCancelled) {
248 data.GetStatsScope().OnCancelled();
250 return wait_status == impl::AsyncMethodInvocation::WaitStatus::kOk;
253template <
typename GrpcStream,
typename Response>
254void ReadAsync(GrpcStream& stream, Response& response, RpcData& data)
noexcept {
256 data.EmplaceAsyncMethodInvocation();
257 auto& read = data.GetAsyncMethodInvocation();
258 stream.Read(&response, read.GetTag());
261void PrepareWrite(RpcData& data);
263template <
typename GrpcStream,
typename Request>
264bool Write(GrpcStream& stream,
const Request& request,
265 grpc::WriteOptions options, RpcData& data) {
267 AsyncMethodInvocation write;
268 stream.Write(request, options, write.GetTag());
269 const auto result = Wait(write, data.GetContext());
270 if (result == impl::AsyncMethodInvocation::WaitStatus::kCancelled) {
271 data.GetStatsScope().OnCancelled();
273 if (result != impl::AsyncMethodInvocation::WaitStatus::kOk) {
274 data.SetWritesFinished();
276 return result == impl::AsyncMethodInvocation::WaitStatus::kOk;
279void PrepareWriteAndCheck(RpcData& data);
281template <
typename GrpcStream,
typename Request>
282void WriteAndCheck(GrpcStream& stream,
const Request& request,
283 grpc::WriteOptions options, RpcData& data) {
284 PrepareWriteAndCheck(data);
285 AsyncMethodInvocation write;
286 stream.Write(request, options, write.GetTag());
287 CheckOk(data, Wait(write, data.GetContext()),
"WriteAndCheck");
290template <
typename GrpcStream>
291bool WritesDone(GrpcStream& stream, RpcData& data) {
293 data.SetWritesFinished();
294 AsyncMethodInvocation writes_done;
295 stream.WritesDone(writes_done.GetTag());
296 const auto wait_status = Wait(writes_done, data.GetContext());
297 if (wait_status == impl::AsyncMethodInvocation::WaitStatus::kCancelled) {
298 data.GetStatsScope().OnCancelled();
300 return wait_status == impl::AsyncMethodInvocation::WaitStatus::kOk;