11#include <grpcpp/impl/codegen/proto_utils.h> 
   13#include <userver/dynamic_config/snapshot.hpp> 
   14#include <userver/utils/assert.hpp> 
   15#include <userver/utils/function_ref.hpp> 
   17#include <userver/ugrpc/client/exceptions.hpp> 
   18#include <userver/ugrpc/client/impl/async_methods.hpp> 
   19#include <userver/ugrpc/client/impl/call_params.hpp> 
   20#include <userver/ugrpc/client/impl/channel_cache.hpp> 
   21#include <userver/ugrpc/client/middlewares/fwd.hpp> 
   22#include <userver/ugrpc/impl/deadline_timepoint.hpp> 
   23#include <userver/ugrpc/impl/internal_tag_fwd.hpp> 
   24#include <userver/ugrpc/impl/statistics_scope.hpp> 
   26USERVER_NAMESPACE_BEGIN
 
   34  explicit UnaryFuture(impl::RpcData& data) 
noexcept;
 
   40  ~UnaryFuture() 
noexcept;
 
   59  impl::FutureImpl impl_;
 
   63template <
typename RPC>
 
   67  explicit StreamReadFuture(impl::RpcData& data,
 
   68                            typename RPC::RawStream& stream) 
noexcept;
 
   74  ~StreamReadFuture() 
noexcept;
 
   93  impl::FutureImpl impl_;
 
   94  typename RPC::RawStream* stream_;
 
  101  CallAnyBase(impl::CallParams&& params)
 
  102      : data_(std::make_unique<impl::RpcData>(std::move(params))) {}
 
  120  impl::RpcData& GetData(ugrpc::impl::InternalTag);
 
  124  impl::RpcData& GetData();
 
  127  std::unique_ptr<impl::RpcData> data_;
 
  137template <
typename Response>
 
  162  template <
typename Stub, 
typename Request>
 
  164      impl::CallParams&& params, Stub& stub,
 
  165      impl::RawResponseReaderPreparer<Stub, Request, Response> prepare_func,
 
  169  UnaryCall(UnaryCall&&) 
noexcept = 
default;
 
  170  UnaryCall& operator=(UnaryCall&&) 
noexcept = 
default;
 
  171  ~UnaryCall() = 
default;
 
  174  impl::RawResponseReader<Response> reader_;
 
  189template <
typename Response>
 
  199  [[nodiscard]] 
bool Read(Response& response);
 
  203  using RawStream = grpc::ClientAsyncReader<Response>;
 
  205  template <
typename Stub, 
typename Request>
 
  206  InputStream(impl::CallParams&& params, Stub& stub,
 
  207              impl::RawReaderPreparer<Stub, Request, Response> prepare_func,
 
  211  InputStream(InputStream&&) 
noexcept = 
default;
 
  212  InputStream& operator=(InputStream&&) 
noexcept = 
default;
 
  213  ~InputStream() = 
default;
 
  216  std::unique_ptr<impl::RpcData> data_;
 
  217  impl::RawReader<Response> stream_;
 
  230template <
typename Request, 
typename Response>
 
  243  [[nodiscard]] 
bool Write(
const Request& request);
 
  274  using RawStream = grpc::ClientAsyncWriter<Request>;
 
  276  template <
typename Stub>
 
  277  OutputStream(impl::CallParams&& params, Stub& stub,
 
  278               impl::RawWriterPreparer<Stub, Request, Response> prepare_func);
 
  281  OutputStream(OutputStream&&) 
noexcept = 
default;
 
  282  OutputStream& operator=(OutputStream&&) 
noexcept = 
default;
 
  283  ~OutputStream() = 
default;
 
  286  std::unique_ptr<impl::RpcData> data_;
 
  287  std::unique_ptr<Response> final_response_;
 
  288  impl::RawWriter<Request> stream_;
 
  320template <
typename Request, 
typename Response>
 
  321class [[nodiscard]] BidirectionalStream final : 
public CallAnyBase {
 
  330  [[nodiscard]] 
bool Read(Response& response);
 
  349  [[nodiscard]] 
bool Write(
const Request& request);
 
  375  using RawStream = grpc::ClientAsyncReaderWriter<Request, Response>;
 
  377  template <
typename Stub>
 
  379      impl::CallParams&& params, Stub& stub,
 
  380      impl::RawReaderWriterPreparer<Stub, Request, Response> prepare_func);
 
  383  BidirectionalStream(BidirectionalStream&&) 
noexcept = 
default;
 
  384  BidirectionalStream& operator=(BidirectionalStream&&) 
noexcept = 
default;
 
  385  ~BidirectionalStream() = 
default;
 
  388  std::unique_ptr<impl::RpcData> data_;
 
  389  impl::RawReaderWriter<Request, Response> stream_;
 
  395void CallMiddlewares(
const Middlewares& mws, 
CallAnyBase& call,
 
  396                     utils::function_ref<
void()> user_call,
 
  397                     const ::google::protobuf::Message* request);
 
  400template <
typename RPC>
 
  402    impl::RpcData& data, 
typename RPC::RawStream& stream) 
noexcept 
  403    : impl_(data), stream_(&stream) {}
 
  405template <
typename RPC>
 
  407  if (
auto* 
const data = impl_.GetData()) {
 
  408    impl::RpcData::AsyncMethodInvocationGuard guard(*data);
 
  409    const auto wait_status =
 
  410        impl::Wait(data->GetAsyncMethodInvocation(), data->GetContext());
 
  411    if (wait_status != impl::AsyncMethodInvocation::WaitStatus::kOk) {
 
  412      if (wait_status == impl::AsyncMethodInvocation::WaitStatus::kCancelled) {
 
  413        data->GetStatsScope().OnCancelled();
 
  415      impl::Finish(*stream_, *data, 
false);
 
  420template <
typename RPC>
 
  423  if (
this == &other) 
return *
this;
 
  424  [[maybe_unused]] 
auto for_destruction = std::move(*
this);
 
  425  impl_ = std::move(other.impl_);
 
  426  stream_ = other.stream_;
 
  430template <
typename RPC>
 
  432  auto* 
const data = impl_.GetData();
 
  433  UINVARIANT(data, 
"'Get' must be called only once");
 
  434  impl::RpcData::AsyncMethodInvocationGuard guard(*data);
 
  437      impl::Wait(data->GetAsyncMethodInvocation(), data->GetContext());
 
  438  if (result == impl::AsyncMethodInvocation::WaitStatus::kCancelled) {
 
  439    data->GetStatsScope().OnCancelled();
 
  440    data->GetStatsScope().Flush();
 
  442  if (result == impl::AsyncMethodInvocation::WaitStatus::kError) {
 
  445    impl::Finish(*stream_, *data, 
true);
 
  447  return result == impl::AsyncMethodInvocation::WaitStatus::kOk;
 
  450template <
typename RPC>
 
  452  return impl_.IsReady();
 
  456template <
typename Stub, 
typename Request>
 
  457UnaryCall<Response>::UnaryCall(
 
  458    impl::CallParams&& params, Stub& stub,
 
  459    impl::RawResponseReaderPreparer<Stub, Request, Response> prepare_func,
 
  462  impl::CallMiddlewares(
 
  463      GetData().GetMiddlewares(), *
this,
 
  465        reader_ = (stub.*prepare_func)(&GetData().GetContext(), req,
 
  466                                       &GetData().GetQueue());
 
  467        reader_->StartCall();
 
  470  GetData().SetWritesFinished();
 
  473template <
typename Response>
 
  481template <
typename Response>
 
  484  PrepareFinish(GetData());
 
  485  GetData().EmplaceFinishAsyncMethodInvocation();
 
  486  auto& finish = GetData().GetFinishAsyncMethodInvocation();
 
  487  auto& status = GetData().GetStatus();
 
  488  reader_->Finish(&response, &status, finish.GetTag());
 
  493template <
typename Stub, 
typename Request>
 
  494InputStream<Response>::InputStream(
 
  495    impl::CallParams&& params, Stub& stub,
 
  496    impl::RawReaderPreparer<Stub, Request, Response> prepare_func,
 
  499  impl::CallMiddlewares(
 
  500      GetData().GetMiddlewares(), *
this,
 
  502        stream_ = (stub.*prepare_func)(&GetData().GetContext(), req,
 
  503                                       &GetData().GetQueue());
 
  504        impl::StartCall(*stream_, GetData());
 
  507  GetData().SetWritesFinished();
 
  510template <
typename Response>
 
  511bool InputStream<Response>::
Read(Response& response) {
 
  512  if (impl::Read(*stream_, response, GetData())) {
 
  517    impl::Finish(*stream_, GetData(), 
true);
 
  523template <
typename Stub>
 
  524OutputStream<Request, Response>::OutputStream(
 
  525    impl::CallParams&& params, Stub& stub,
 
  526    impl::RawWriterPreparer<Stub, Request, Response> prepare_func)
 
  528      final_response_(std::make_unique<Response>()) {
 
  529  impl::CallMiddlewares(
 
  530      GetData().GetMiddlewares(), *
this,
 
  534            (stub.*prepare_func)(&GetData().GetContext(), final_response_.get(),
 
  535                                 &GetData().GetQueue());
 
  536        impl::StartCall(*stream_, GetData());
 
  541template <
typename Request, 
typename Response>
 
  542bool OutputStream<Request, Response>::
Write(
const Request& request) {
 
  545  grpc::WriteOptions write_options{};
 
  547  return impl::Write(*stream_, request, write_options, GetData());
 
  550template <
typename Request, 
typename Response>
 
  551void OutputStream<Request, Response>::
WriteAndCheck(
const Request& request) {
 
  554  grpc::WriteOptions write_options{};
 
  556  if (!impl::Write(*stream_, request, write_options, GetData())) {
 
  557    impl::Finish(*stream_, GetData(), 
true);
 
  561template <
typename Request, 
typename Response>
 
  562Response OutputStream<Request, Response>::
Finish() {
 
  565  if (!GetData().AreWritesFinished()) {
 
  566    impl::WritesDone(*stream_, GetData());
 
  569  impl::Finish(*stream_, GetData(), 
true);
 
  571  return std::move(*final_response_);
 
  575template <
typename Stub>
 
  576BidirectionalStream<Request, Response>::BidirectionalStream(
 
  577    impl::CallParams&& params, Stub& stub,
 
  578    impl::RawReaderWriterPreparer<Stub, Request, Response> prepare_func)
 
  580  impl::CallMiddlewares(
 
  581      GetData().GetMiddlewares(), *
this,
 
  583        stream_ = (stub.*prepare_func)(&GetData().GetContext(),
 
  584                                       &GetData().GetQueue());
 
  585        impl::StartCall(*stream_, GetData());
 
  590template <
typename Request, 
typename Response>
 
  592BidirectionalStream<Request, Response>::
ReadAsync(Response& response) 
noexcept {
 
  593  impl::ReadAsync(*stream_, response, GetData());
 
  594  return StreamReadFuture<BidirectionalStream<Request, Response>>{GetData(),
 
  598template <
typename Request, 
typename Response>
 
  599bool BidirectionalStream<Request, Response>::
Read(Response& response) {
 
  604template <
typename Request, 
typename Response>
 
  605bool BidirectionalStream<Request, Response>::
Write(
const Request& request) {
 
  607  grpc::WriteOptions write_options{};
 
  609  return impl::Write(*stream_, request, write_options, GetData());
 
  612template <
typename Request, 
typename Response>
 
  614    const Request& request) {
 
  616  grpc::WriteOptions write_options{};
 
  618  impl::WriteAndCheck(*stream_, request, write_options, GetData());
 
  621template <
typename Request, 
typename Response>
 
  623  return impl::WritesDone(*stream_, GetData());