11#include <grpcpp/impl/codegen/proto_utils.h> 
   13#include <userver/dynamic_config/snapshot.hpp> 
   14#include <userver/engine/deadline.hpp> 
   15#include <userver/engine/future_status.hpp> 
   16#include <userver/utils/assert.hpp> 
   17#include <userver/utils/function_ref.hpp> 
   19#include <userver/ugrpc/client/exceptions.hpp> 
   20#include <userver/ugrpc/client/impl/async_methods.hpp> 
   21#include <userver/ugrpc/client/impl/call_params.hpp> 
   22#include <userver/ugrpc/client/impl/channel_cache.hpp> 
   23#include <userver/ugrpc/client/middlewares/fwd.hpp> 
   24#include <userver/ugrpc/impl/deadline_timepoint.hpp> 
   25#include <userver/ugrpc/impl/internal_tag_fwd.hpp> 
   26#include <userver/ugrpc/impl/statistics_scope.hpp> 
   28USERVER_NAMESPACE_BEGIN
 
   36  explicit UnaryFuture(impl::RpcData& data) 
noexcept;
 
   44  ~UnaryFuture() 
noexcept;
 
   75  engine::impl::ContextAccessor* TryGetContextAccessor() 
noexcept;
 
   78  impl::FutureImpl impl_;
 
   82template <
typename RPC>
 
   86  explicit StreamReadFuture(impl::RpcData& data,
 
   87                            typename RPC::RawStream& stream) 
noexcept;
 
   93  ~StreamReadFuture() 
noexcept;
 
  112  impl::FutureImpl impl_;
 
  113  typename RPC::RawStream* stream_;
 
  120  CallAnyBase(impl::CallParams&& params)
 
  121      : data_(std::make_unique<impl::RpcData>(std::move(params))) {}
 
  139  impl::RpcData& GetData(ugrpc::impl::InternalTag);
 
  143  impl::RpcData& GetData();
 
  146  std::unique_ptr<impl::RpcData> data_;
 
  156template <
typename Response>
 
  181  template <
typename Stub, 
typename Request>
 
  183      impl::CallParams&& params, Stub& stub,
 
  184      impl::RawResponseReaderPreparer<Stub, Request, Response> prepare_func,
 
  188  UnaryCall(UnaryCall&&) 
noexcept = 
default;
 
  189  UnaryCall& operator=(UnaryCall&&) 
noexcept = 
default;
 
  190  ~UnaryCall() = 
default;
 
  193  impl::RawResponseReader<Response> reader_;
 
  208template <
typename Response>
 
  218  [[nodiscard]] 
bool Read(Response& response);
 
  222  using RawStream = grpc::ClientAsyncReader<Response>;
 
  224  template <
typename Stub, 
typename Request>
 
  225  InputStream(impl::CallParams&& params, Stub& stub,
 
  226              impl::RawReaderPreparer<Stub, Request, Response> prepare_func,
 
  230  InputStream(InputStream&&) 
noexcept = 
default;
 
  231  InputStream& operator=(InputStream&&) 
noexcept = 
default;
 
  232  ~InputStream() = 
default;
 
  235  impl::RawReader<Response> stream_;
 
  248template <
typename Request, 
typename Response>
 
  261  [[nodiscard]] 
bool Write(
const Request& request);
 
  292  using RawStream = grpc::ClientAsyncWriter<Request>;
 
  294  template <
typename Stub>
 
  295  OutputStream(impl::CallParams&& params, Stub& stub,
 
  296               impl::RawWriterPreparer<Stub, Request, Response> prepare_func);
 
  299  OutputStream(OutputStream&&) 
noexcept = 
default;
 
  300  OutputStream& operator=(OutputStream&&) 
noexcept = 
default;
 
  301  ~OutputStream() = 
default;
 
  304  std::unique_ptr<Response> final_response_;
 
  305  impl::RawWriter<Request> stream_;
 
  343template <
typename Request, 
typename Response>
 
  344class [[nodiscard]] BidirectionalStream final : 
public CallAnyBase {
 
  353  [[nodiscard]] 
bool Read(Response& response);
 
  372  [[nodiscard]] 
bool Write(
const Request& request);
 
  398  using RawStream = grpc::ClientAsyncReaderWriter<Request, Response>;
 
  400  template <
typename Stub>
 
  402      impl::CallParams&& params, Stub& stub,
 
  403      impl::RawReaderWriterPreparer<Stub, Request, Response> prepare_func);
 
  406  BidirectionalStream(BidirectionalStream&&) 
noexcept = 
default;
 
  407  BidirectionalStream& operator=(BidirectionalStream&&) 
noexcept = 
default;
 
  408  ~BidirectionalStream() = 
default;
 
  411  impl::RawReaderWriter<Request, Response> stream_;
 
  417void CallMiddlewares(
const Middlewares& mws, 
CallAnyBase& call,
 
  418                     utils::function_ref<
void()> user_call,
 
  419                     const ::google::protobuf::Message* request);
 
  422template <
typename RPC>
 
  424    impl::RpcData& data, 
typename RPC::RawStream& stream) 
noexcept 
  425    : impl_(data), stream_(&stream) {}
 
  427template <
typename RPC>
 
  429  if (
auto* 
const data = impl_.GetData()) {
 
  430    impl::RpcData::AsyncMethodInvocationGuard guard(*data);
 
  431    const auto wait_status =
 
  432        impl::Wait(data->GetAsyncMethodInvocation(), data->GetContext());
 
  433    if (wait_status != impl::AsyncMethodInvocation::WaitStatus::kOk) {
 
  434      if (wait_status == impl::AsyncMethodInvocation::WaitStatus::kCancelled) {
 
  435        data->GetStatsScope().OnCancelled();
 
  437      impl::Finish(*stream_, *data, 
false);
 
  442template <
typename RPC>
 
  445  if (
this == &other) 
return *
this;
 
  446  [[maybe_unused]] 
auto for_destruction = std::move(*
this);
 
  447  impl_ = std::move(other.impl_);
 
  448  stream_ = other.stream_;
 
  452template <
typename RPC>
 
  454  auto* 
const data = impl_.GetData();
 
  455  UINVARIANT(data, 
"'Get' must be called only once");
 
  456  impl::RpcData::AsyncMethodInvocationGuard guard(*data);
 
  459      impl::Wait(data->GetAsyncMethodInvocation(), data->GetContext());
 
  460  if (result == impl::AsyncMethodInvocation::WaitStatus::kCancelled) {
 
  461    data->GetStatsScope().OnCancelled();
 
  462    data->GetStatsScope().Flush();
 
  464  if (result == impl::AsyncMethodInvocation::WaitStatus::kError) {
 
  467    impl::Finish(*stream_, *data, 
true);
 
  469  return result == impl::AsyncMethodInvocation::WaitStatus::kOk;
 
  472template <
typename RPC>
 
  474  return impl_.IsReady();
 
  478template <
typename Stub, 
typename Request>
 
  479UnaryCall<Response>::UnaryCall(
 
  480    impl::CallParams&& params, Stub& stub,
 
  481    impl::RawResponseReaderPreparer<Stub, Request, Response> prepare_func,
 
  484  impl::CallMiddlewares(
 
  485      GetData().GetMiddlewares(), *
this,
 
  487        reader_ = (stub.*prepare_func)(&GetData().GetContext(), req,
 
  488                                       &GetData().GetQueue());
 
  489        reader_->StartCall();
 
  492  GetData().SetWritesFinished();
 
  495template <
typename Response>
 
  503template <
typename Response>
 
  506  PrepareFinish(GetData());
 
  507  GetData().EmplaceFinishAsyncMethodInvocation();
 
  508  auto& finish = GetData().GetFinishAsyncMethodInvocation();
 
  509  auto& status = GetData().GetStatus();
 
  510  reader_->Finish(&response, &status, finish.GetTag());
 
  515template <
typename Stub, 
typename Request>
 
  516InputStream<Response>::InputStream(
 
  517    impl::CallParams&& params, Stub& stub,
 
  518    impl::RawReaderPreparer<Stub, Request, Response> prepare_func,
 
  521  impl::CallMiddlewares(
 
  522      GetData().GetMiddlewares(), *
this,
 
  524        stream_ = (stub.*prepare_func)(&GetData().GetContext(), req,
 
  525                                       &GetData().GetQueue());
 
  526        impl::StartCall(*stream_, GetData());
 
  529  GetData().SetWritesFinished();
 
  532template <
typename Response>
 
  533bool InputStream<Response>::
Read(Response& response) {
 
  534  if (impl::Read(*stream_, response, GetData())) {
 
  539    impl::Finish(*stream_, GetData(), 
true);
 
  545template <
typename Stub>
 
  546OutputStream<Request, Response>::OutputStream(
 
  547    impl::CallParams&& params, Stub& stub,
 
  548    impl::RawWriterPreparer<Stub, Request, Response> prepare_func)
 
  550      final_response_(std::make_unique<Response>()) {
 
  551  impl::CallMiddlewares(
 
  552      GetData().GetMiddlewares(), *
this,
 
  556            (stub.*prepare_func)(&GetData().GetContext(), final_response_.get(),
 
  557                                 &GetData().GetQueue());
 
  558        impl::StartCall(*stream_, GetData());
 
  563template <
typename Request, 
typename Response>
 
  564bool OutputStream<Request, Response>::
Write(
const Request& request) {
 
  567  grpc::WriteOptions write_options{};
 
  569  return impl::Write(*stream_, request, write_options, GetData());
 
  572template <
typename Request, 
typename Response>
 
  573void OutputStream<Request, Response>::
WriteAndCheck(
const Request& request) {
 
  576  grpc::WriteOptions write_options{};
 
  578  if (!impl::Write(*stream_, request, write_options, GetData())) {
 
  579    impl::Finish(*stream_, GetData(), 
true);
 
  583template <
typename Request, 
typename Response>
 
  584Response OutputStream<Request, Response>::
Finish() {
 
  587  if (!GetData().AreWritesFinished()) {
 
  588    impl::WritesDone(*stream_, GetData());
 
  591  impl::Finish(*stream_, GetData(), 
true);
 
  593  return std::move(*final_response_);
 
  597template <
typename Stub>
 
  598BidirectionalStream<Request, Response>::BidirectionalStream(
 
  599    impl::CallParams&& params, Stub& stub,
 
  600    impl::RawReaderWriterPreparer<Stub, Request, Response> prepare_func)
 
  602  impl::CallMiddlewares(
 
  603      GetData().GetMiddlewares(), *
this,
 
  605        stream_ = (stub.*prepare_func)(&GetData().GetContext(),
 
  606                                       &GetData().GetQueue());
 
  607        impl::StartCall(*stream_, GetData());
 
  612template <
typename Request, 
typename Response>
 
  614BidirectionalStream<Request, Response>::
ReadAsync(Response& response) 
noexcept {
 
  615  impl::ReadAsync(*stream_, response, GetData());
 
  616  return StreamReadFuture<BidirectionalStream<Request, Response>>{GetData(),
 
  620template <
typename Request, 
typename Response>
 
  621bool BidirectionalStream<Request, Response>::
Read(Response& response) {
 
  626template <
typename Request, 
typename Response>
 
  627bool BidirectionalStream<Request, Response>::
Write(
const Request& request) {
 
  629  grpc::WriteOptions write_options{};
 
  631  return impl::Write(*stream_, request, write_options, GetData());
 
  634template <
typename Request, 
typename Response>
 
  636    const Request& request) {
 
  638  grpc::WriteOptions write_options{};
 
  640  impl::WriteAndCheck(*stream_, request, write_options, GetData());
 
  643template <
typename Request, 
typename Response>
 
  645  return impl::WritesDone(*stream_, GetData());