9#include <grpcpp/client_context.h> 
   10#include <grpcpp/completion_queue.h> 
   11#include <grpcpp/impl/codegen/async_stream.h> 
   12#include <grpcpp/impl/codegen/async_unary_call.h> 
   13#include <grpcpp/impl/codegen/status.h> 
   15#include <userver/dynamic_config/fwd.hpp> 
   16#include <userver/tracing/in_place_span.hpp> 
   17#include <userver/tracing/span.hpp> 
   19#include <userver/ugrpc/client/exceptions.hpp> 
   20#include <userver/ugrpc/client/impl/async_method_invocation.hpp> 
   21#include <userver/ugrpc/client/impl/call_params.hpp> 
   22#include <userver/ugrpc/impl/async_method_invocation.hpp> 
   23#include <userver/ugrpc/impl/statistics_scope.hpp> 
   25USERVER_NAMESPACE_BEGIN
 
   27namespace ugrpc::
client::impl {
 
   33template <
typename Response>
 
   34using RawResponseReader =
 
   35    std::unique_ptr<grpc::ClientAsyncResponseReader<Response>>;
 
   37template <
typename Response>
 
   38using RawReader = std::unique_ptr<grpc::ClientAsyncReader<Response>>;
 
   40template <
typename Request>
 
   41using RawWriter = std::unique_ptr<grpc::ClientAsyncWriter<Request>>;
 
   43template <
typename Request, 
typename Response>
 
   44using RawReaderWriter =
 
   45    std::unique_ptr<grpc::ClientAsyncReaderWriter<Request, Response>>;
 
   50template <
typename Stub, 
typename Request, 
typename Response>
 
   51using RawResponseReaderPreparer = RawResponseReader<Response> (Stub::*)(
 
   52    grpc::ClientContext*, 
const Request&, grpc::CompletionQueue*);
 
   54template <
typename Stub, 
typename Request, 
typename Response>
 
   55using RawReaderPreparer = RawReader<Response> (Stub::*)(grpc::ClientContext*,
 
   57                                                        grpc::CompletionQueue*);
 
   59template <
typename Stub, 
typename Request, 
typename Response>
 
   60using RawWriterPreparer = RawWriter<Request> (Stub::*)(grpc::ClientContext*,
 
   62                                                       grpc::CompletionQueue*);
 
   64template <
typename Stub, 
typename Request, 
typename Response>
 
   65using RawReaderWriterPreparer = RawReaderWriter<Request, Response> (Stub::*)(
 
   66    grpc::ClientContext*, grpc::CompletionQueue*);
 
   69struct RpcConfigValues 
final {
 
   70  explicit RpcConfigValues(
const dynamic_config::Snapshot& config);
 
   72  bool enforce_task_deadline;
 
   75using ugrpc::
client::impl::FinishAsyncMethodInvocation;
 
   76using ugrpc::impl::AsyncMethodInvocation;
 
   80  explicit RpcData(CallParams&&);
 
   82  RpcData(RpcData&&) 
noexcept = 
delete;
 
   83  RpcData& operator=(RpcData&&) 
noexcept = 
delete;
 
   86  const grpc::ClientContext& GetContext() 
const noexcept;
 
   88  grpc::ClientContext& GetContext() 
noexcept;
 
   90  std::string_view GetCallName() 
const noexcept;
 
   92  std::string_view GetClientName() 
const noexcept;
 
   94  tracing::Span& GetSpan() 
noexcept;
 
   96  grpc::CompletionQueue& GetQueue() 
const noexcept;
 
   98  const RpcConfigValues& GetConfigValues() 
const noexcept;
 
  100  const Middlewares& GetMiddlewares() 
const noexcept;
 
  102  void ResetSpan() 
noexcept;
 
  104  ugrpc::impl::RpcStatisticsScope& GetStatsScope() 
noexcept;
 
  106  void SetWritesFinished() 
noexcept;
 
  108  bool AreWritesFinished() 
const noexcept;
 
  110  void SetFinished() 
noexcept;
 
  112  bool IsFinished() 
const noexcept;
 
  114  bool IsDeadlinePropagated() 
const noexcept;
 
  116  void SetDeadlinePropagated() 
noexcept;
 
  120  void EmplaceAsyncMethodInvocation();
 
  124  void EmplaceFinishAsyncMethodInvocation();
 
  128  AsyncMethodInvocation& GetAsyncMethodInvocation() 
noexcept;
 
  132  FinishAsyncMethodInvocation& GetFinishAsyncMethodInvocation() 
noexcept;
 
  137  bool HoldsAsyncMethodInvocationDebug() 
noexcept;
 
  138  bool HoldsFinishAsyncMethodInvocationDebug() 
noexcept;
 
  140  grpc::Status& GetStatus() 
noexcept;
 
  142  class AsyncMethodInvocationGuard {
 
  144    AsyncMethodInvocationGuard(RpcData& data) 
noexcept;
 
  145    AsyncMethodInvocationGuard(
const AsyncMethodInvocationGuard&) = 
delete;
 
  146    AsyncMethodInvocationGuard(AsyncMethodInvocationGuard&&) = 
delete;
 
  147    ~AsyncMethodInvocationGuard() 
noexcept;
 
  149    void Disarm() 
noexcept { disarm_ = 
true; }
 
  157  std::unique_ptr<grpc::ClientContext> context_;
 
  158  std::string client_name_;
 
  159  std::string_view call_name_;
 
  160  bool writes_finished_{
false};
 
  161  bool is_finished_{
false};
 
  162  bool is_deadline_propagated_{
false};
 
  164  std::optional<
tracing::InPlaceSpan> span_;
 
  165  ugrpc::impl::RpcStatisticsScope stats_scope_;
 
  166  grpc::CompletionQueue& queue_;
 
  167  RpcConfigValues config_values_;
 
  168  const Middlewares& mws_;
 
  178  std::variant<std::monostate, AsyncMethodInvocation,
 
  179               FinishAsyncMethodInvocation>
 
  181  grpc::Status status_;
 
  184class FutureImpl 
final {
 
  186  explicit FutureImpl(RpcData& data) 
noexcept;
 
  188  virtual ~FutureImpl() 
noexcept = 
default;
 
  190  FutureImpl(FutureImpl&&) 
noexcept;
 
  191  FutureImpl& operator=(FutureImpl&&) 
noexcept;
 
  196  [[nodiscard]] 
bool IsReady() 
const noexcept;
 
  198  RpcData* GetData() 
noexcept;
 
  199  void ClearData() 
noexcept;
 
  205void CheckOk(RpcData& data, AsyncMethodInvocation::WaitStatus status,
 
  206             std::string_view stage);
 
  208template <
typename GrpcStream>
 
  209void StartCall(GrpcStream& stream, RpcData& data) {
 
  210  AsyncMethodInvocation start_call;
 
  211  stream.StartCall(start_call.GetTag());
 
  212  CheckOk(data, Wait(start_call, data.GetContext()), 
"StartCall");
 
  215void PrepareFinish(RpcData& data);
 
  217void ProcessFinishResult(RpcData& data,
 
  218                         AsyncMethodInvocation::WaitStatus wait_status,
 
  219                         grpc::Status&& status, ParsedGStatus&& parsed_gstatus,
 
  220                         bool throw_on_error);
 
  222template <
typename GrpcStream>
 
  223void Finish(GrpcStream& stream, RpcData& data, 
bool throw_on_error) {
 
  226  FinishAsyncMethodInvocation finish(data);
 
  227  auto& status = finish.GetStatus();
 
  228  stream.Finish(&status, finish.GetTag());
 
  230  const auto wait_status = Wait(finish, data.GetContext());
 
  231  if (wait_status == impl::AsyncMethodInvocation::WaitStatus::kCancelled) {
 
  232    data.GetStatsScope().OnCancelled();
 
  233    if (throw_on_error) 
throw RpcCancelledError(data.GetCallName(), 
"Finish");
 
  235  ProcessFinishResult(data, wait_status, std::move(status),
 
  236                      std::move(finish.GetParsedGStatus()), throw_on_error);
 
  239void PrepareRead(RpcData& data);
 
  241template <
typename GrpcStream, 
typename Response>
 
  242[[nodiscard]] 
bool Read(GrpcStream& stream, Response& response, RpcData& data) {
 
  244  AsyncMethodInvocation read;
 
  245  stream.Read(&response, read.GetTag());
 
  246  const auto wait_status = Wait(read, data.GetContext());
 
  247  if (wait_status == impl::AsyncMethodInvocation::WaitStatus::kCancelled) {
 
  248    data.GetStatsScope().OnCancelled();
 
  250  return wait_status == impl::AsyncMethodInvocation::WaitStatus::kOk;
 
  253template <
typename GrpcStream, 
typename Response>
 
  254void ReadAsync(GrpcStream& stream, Response& response, RpcData& data) 
noexcept {
 
  256  data.EmplaceAsyncMethodInvocation();
 
  257  auto& read = data.GetAsyncMethodInvocation();
 
  258  stream.Read(&response, read.GetTag());
 
  261void PrepareWrite(RpcData& data);
 
  263template <
typename GrpcStream, 
typename Request>
 
  264bool Write(GrpcStream& stream, 
const Request& request,
 
  265           grpc::WriteOptions options, RpcData& data) {
 
  267  AsyncMethodInvocation write;
 
  268  stream.Write(request, options, write.GetTag());
 
  269  const auto result = Wait(write, data.GetContext());
 
  270  if (result == impl::AsyncMethodInvocation::WaitStatus::kCancelled) {
 
  271    data.GetStatsScope().OnCancelled();
 
  273  if (result != impl::AsyncMethodInvocation::WaitStatus::kOk) {
 
  274    data.SetWritesFinished();
 
  276  return result == impl::AsyncMethodInvocation::WaitStatus::kOk;
 
  279void PrepareWriteAndCheck(RpcData& data);
 
  281template <
typename GrpcStream, 
typename Request>
 
  282void WriteAndCheck(GrpcStream& stream, 
const Request& request,
 
  283                   grpc::WriteOptions options, RpcData& data) {
 
  284  PrepareWriteAndCheck(data);
 
  285  AsyncMethodInvocation write;
 
  286  stream.Write(request, options, write.GetTag());
 
  287  CheckOk(data, Wait(write, data.GetContext()), 
"WriteAndCheck");
 
  290template <
typename GrpcStream>
 
  291bool WritesDone(GrpcStream& stream, RpcData& data) {
 
  293  data.SetWritesFinished();
 
  294  AsyncMethodInvocation writes_done;
 
  295  stream.WritesDone(writes_done.GetTag());
 
  296  const auto wait_status = Wait(writes_done, data.GetContext());
 
  297  if (wait_status == impl::AsyncMethodInvocation::WaitStatus::kCancelled) {
 
  298    data.GetStatsScope().OnCancelled();
 
  300  return wait_status == impl::AsyncMethodInvocation::WaitStatus::kOk;