9#include <grpcpp/client_context.h> 
   10#include <grpcpp/completion_queue.h> 
   11#include <grpcpp/impl/codegen/async_stream.h> 
   12#include <grpcpp/impl/codegen/async_unary_call.h> 
   13#include <grpcpp/impl/codegen/status.h> 
   15#include <userver/dynamic_config/fwd.hpp> 
   16#include <userver/tracing/in_place_span.hpp> 
   17#include <userver/tracing/span.hpp> 
   19#include <userver/ugrpc/client/exceptions.hpp> 
   20#include <userver/ugrpc/client/impl/async_method_invocation.hpp> 
   21#include <userver/ugrpc/client/impl/call_params.hpp> 
   22#include <userver/ugrpc/impl/async_method_invocation.hpp> 
   23#include <userver/ugrpc/impl/statistics_scope.hpp> 
   25USERVER_NAMESPACE_BEGIN
 
   27namespace ugrpc::
client::impl {
 
   33template <
typename Response>
 
   34using RawResponseReader =
 
   35    std::unique_ptr<grpc::ClientAsyncResponseReader<Response>>;
 
   37template <
typename Response>
 
   38using RawReader = std::unique_ptr<grpc::ClientAsyncReader<Response>>;
 
   40template <
typename Request>
 
   41using RawWriter = std::unique_ptr<grpc::ClientAsyncWriter<Request>>;
 
   43template <
typename Request, 
typename Response>
 
   44using RawReaderWriter =
 
   45    std::unique_ptr<grpc::ClientAsyncReaderWriter<Request, Response>>;
 
   50template <
typename Stub, 
typename Request, 
typename Response>
 
   51using RawResponseReaderPreparer = RawResponseReader<Response> (Stub::*)(
 
   52    grpc::ClientContext*, 
const Request&, grpc::CompletionQueue*);
 
   54template <
typename Stub, 
typename Request, 
typename Response>
 
   55using RawReaderPreparer = RawReader<Response> (Stub::*)(grpc::ClientContext*,
 
   57                                                        grpc::CompletionQueue*);
 
   59template <
typename Stub, 
typename Request, 
typename Response>
 
   60using RawWriterPreparer = RawWriter<Request> (Stub::*)(grpc::ClientContext*,
 
   62                                                       grpc::CompletionQueue*);
 
   64template <
typename Stub, 
typename Request, 
typename Response>
 
   65using RawReaderWriterPreparer = RawReaderWriter<Request, Response> (Stub::*)(
 
   66    grpc::ClientContext*, grpc::CompletionQueue*);
 
   69struct RpcConfigValues 
final {
 
   70  explicit RpcConfigValues(
const dynamic_config::Snapshot& config);
 
   72  bool enforce_task_deadline;
 
   75using ugrpc::
client::impl::FinishAsyncMethodInvocation;
 
   76using ugrpc::impl::AsyncMethodInvocation;
 
   80  explicit RpcData(CallParams&&);
 
   82  RpcData(RpcData&&) 
noexcept = 
delete;
 
   83  RpcData& operator=(RpcData&&) 
noexcept = 
delete;
 
   86  const grpc::ClientContext& GetContext() 
const noexcept;
 
   88  grpc::ClientContext& GetContext() 
noexcept;
 
   90  std::string_view GetCallName() 
const noexcept;
 
   92  std::string_view GetClientName() 
const noexcept;
 
   94  tracing::Span& GetSpan() 
noexcept;
 
   96  grpc::CompletionQueue& GetQueue() 
const noexcept;
 
   98  const RpcConfigValues& GetConfigValues() 
const noexcept;
 
  100  const Middlewares& GetMiddlewares() 
const noexcept;
 
  102  void ResetSpan() 
noexcept;
 
  104  ugrpc::impl::RpcStatisticsScope& GetStatsScope() 
noexcept;
 
  106  void SetWritesFinished() 
noexcept;
 
  108  bool AreWritesFinished() 
const noexcept;
 
  110  void SetFinished() 
noexcept;
 
  112  bool IsFinished() 
const noexcept;
 
  114  bool IsDeadlinePropagated() 
const noexcept;
 
  116  void SetDeadlinePropagated() 
noexcept;
 
  118  void EmplaceAsyncMethodInvocation();
 
  120  void EmplaceFinishAsyncMethodInvocation();
 
  122  AsyncMethodInvocation& GetAsyncMethodInvocation() 
noexcept;
 
  124  FinishAsyncMethodInvocation& GetFinishAsyncMethodInvocation() 
noexcept;
 
  126  grpc::Status& GetStatus() 
noexcept;
 
  128  class AsyncMethodInvocationGuard {
 
  130    AsyncMethodInvocationGuard(RpcData& data) 
noexcept;
 
  131    ~AsyncMethodInvocationGuard() 
noexcept;
 
  138  std::unique_ptr<grpc::ClientContext> context_;
 
  139  std::string client_name_;
 
  140  std::string_view call_name_;
 
  141  bool writes_finished_{
false};
 
  142  bool is_finished_{
false};
 
  143  bool is_deadline_propagated_{
false};
 
  145  std::optional<
tracing::InPlaceSpan> span_;
 
  146  ugrpc::impl::RpcStatisticsScope stats_scope_;
 
  147  grpc::CompletionQueue& queue_;
 
  148  RpcConfigValues config_values_;
 
  149  const Middlewares& mws_;
 
  151  std::variant<std::monostate, AsyncMethodInvocation,
 
  152               FinishAsyncMethodInvocation>
 
  154  grpc::Status status_;
 
  157class FutureImpl 
final {
 
  159  explicit FutureImpl(RpcData& data) 
noexcept;
 
  161  virtual ~FutureImpl() 
noexcept = 
default;
 
  163  FutureImpl(FutureImpl&&) 
noexcept;
 
  164  FutureImpl& operator=(FutureImpl&&) 
noexcept;
 
  169  [[nodiscard]] 
bool IsReady() 
const noexcept;
 
  171  RpcData* GetData() 
noexcept;
 
  172  void ClearData() 
noexcept;
 
  178void CheckOk(RpcData& data, AsyncMethodInvocation::WaitStatus status,
 
  179             std::string_view stage);
 
  181template <
typename GrpcStream>
 
  182void StartCall(GrpcStream& stream, RpcData& data) {
 
  183  AsyncMethodInvocation start_call;
 
  184  stream.StartCall(start_call.GetTag());
 
  185  CheckOk(data, Wait(start_call, data.GetContext()), 
"StartCall");
 
  188void PrepareFinish(RpcData& data);
 
  190void ProcessFinishResult(RpcData& data,
 
  191                         AsyncMethodInvocation::WaitStatus wait_status,
 
  192                         grpc::Status&& status, ParsedGStatus&& parsed_gstatus,
 
  193                         bool throw_on_error);
 
  195template <
typename GrpcStream>
 
  196void Finish(GrpcStream& stream, RpcData& data, 
bool throw_on_error) {
 
  199  FinishAsyncMethodInvocation finish(data);
 
  200  auto& status = finish.GetStatus();
 
  201  stream.Finish(&status, finish.GetTag());
 
  203  const auto wait_status = Wait(finish, data.GetContext());
 
  204  if (wait_status == impl::AsyncMethodInvocation::WaitStatus::kCancelled) {
 
  205    data.GetStatsScope().OnCancelled();
 
  206    if (throw_on_error) 
throw RpcCancelledError(data.GetCallName(), 
"Finish");
 
  208  ProcessFinishResult(data, wait_status, std::move(status),
 
  209                      std::move(finish.GetParsedGStatus()), throw_on_error);
 
  212void PrepareRead(RpcData& data);
 
  214template <
typename GrpcStream, 
typename Response>
 
  215[[nodiscard]] 
bool Read(GrpcStream& stream, Response& response, RpcData& data) {
 
  217  AsyncMethodInvocation read;
 
  218  stream.Read(&response, read.GetTag());
 
  219  const auto wait_status = Wait(read, data.GetContext());
 
  220  if (wait_status == impl::AsyncMethodInvocation::WaitStatus::kCancelled) {
 
  221    data.GetStatsScope().OnCancelled();
 
  223  return wait_status == impl::AsyncMethodInvocation::WaitStatus::kOk;
 
  226template <
typename GrpcStream, 
typename Response>
 
  227void ReadAsync(GrpcStream& stream, Response& response, RpcData& data) 
noexcept {
 
  229  data.EmplaceAsyncMethodInvocation();
 
  230  auto& read = data.GetAsyncMethodInvocation();
 
  231  stream.Read(&response, read.GetTag());
 
  234void PrepareWrite(RpcData& data);
 
  236template <
typename GrpcStream, 
typename Request>
 
  237bool Write(GrpcStream& stream, 
const Request& request,
 
  238           grpc::WriteOptions options, RpcData& data) {
 
  240  AsyncMethodInvocation write;
 
  241  stream.Write(request, options, write.GetTag());
 
  242  const auto result = Wait(write, data.GetContext());
 
  243  if (result == impl::AsyncMethodInvocation::WaitStatus::kCancelled) {
 
  244    data.GetStatsScope().OnCancelled();
 
  246  if (result != impl::AsyncMethodInvocation::WaitStatus::kOk) {
 
  247    data.SetWritesFinished();
 
  249  return result == impl::AsyncMethodInvocation::WaitStatus::kOk;
 
  252void PrepareWriteAndCheck(RpcData& data);
 
  254template <
typename GrpcStream, 
typename Request>
 
  255void WriteAndCheck(GrpcStream& stream, 
const Request& request,
 
  256                   grpc::WriteOptions options, RpcData& data) {
 
  257  PrepareWriteAndCheck(data);
 
  258  AsyncMethodInvocation write;
 
  259  stream.Write(request, options, write.GetTag());
 
  260  CheckOk(data, Wait(write, data.GetContext()), 
"WriteAndCheck");
 
  263template <
typename GrpcStream>
 
  264bool WritesDone(GrpcStream& stream, RpcData& data) {
 
  266  data.SetWritesFinished();
 
  267  AsyncMethodInvocation writes_done;
 
  268  stream.WritesDone(writes_done.GetTag());
 
  269  const auto wait_status = Wait(writes_done, data.GetContext());
 
  270  if (wait_status == impl::AsyncMethodInvocation::WaitStatus::kCancelled) {
 
  271    data.GetStatsScope().OnCancelled();
 
  273  return wait_status == impl::AsyncMethodInvocation::WaitStatus::kOk;