11#include <grpcpp/impl/codegen/proto_utils.h>
13#include <userver/dynamic_config/snapshot.hpp>
14#include <userver/engine/deadline.hpp>
15#include <userver/engine/future_status.hpp>
16#include <userver/utils/assert.hpp>
17#include <userver/utils/function_ref.hpp>
19#include <userver/ugrpc/client/exceptions.hpp>
20#include <userver/ugrpc/client/impl/async_methods.hpp>
21#include <userver/ugrpc/client/impl/call_params.hpp>
22#include <userver/ugrpc/client/impl/channel_cache.hpp>
23#include <userver/ugrpc/client/middlewares/fwd.hpp>
24#include <userver/ugrpc/impl/deadline_timepoint.hpp>
25#include <userver/ugrpc/impl/internal_tag_fwd.hpp>
26#include <userver/ugrpc/impl/statistics_scope.hpp>
28USERVER_NAMESPACE_BEGIN
36 explicit UnaryFuture(impl::RpcData& data)
noexcept;
44 ~UnaryFuture()
noexcept;
75 engine::impl::ContextAccessor* TryGetContextAccessor()
noexcept;
78 impl::FutureImpl impl_;
82template <
typename RPC>
86 explicit StreamReadFuture(impl::RpcData& data,
87 typename RPC::RawStream& stream)
noexcept;
93 ~StreamReadFuture()
noexcept;
112 impl::FutureImpl impl_;
113 typename RPC::RawStream* stream_;
120 CallAnyBase(impl::CallParams&& params)
121 : data_(std::make_unique<impl::RpcData>(std::move(params))) {}
139 impl::RpcData& GetData(ugrpc::impl::InternalTag);
143 impl::RpcData& GetData();
146 std::unique_ptr<impl::RpcData> data_;
156template <
typename Response>
181 template <
typename Stub,
typename Request>
183 impl::CallParams&& params, Stub& stub,
184 impl::RawResponseReaderPreparer<Stub, Request, Response> prepare_func,
188 UnaryCall(UnaryCall&&)
noexcept =
default;
189 UnaryCall& operator=(UnaryCall&&)
noexcept =
default;
190 ~UnaryCall() =
default;
193 impl::RawResponseReader<Response> reader_;
208template <
typename Response>
218 [[nodiscard]]
bool Read(Response& response);
222 using RawStream = grpc::ClientAsyncReader<Response>;
224 template <
typename Stub,
typename Request>
225 InputStream(impl::CallParams&& params, Stub& stub,
226 impl::RawReaderPreparer<Stub, Request, Response> prepare_func,
230 InputStream(InputStream&&)
noexcept =
default;
231 InputStream& operator=(InputStream&&)
noexcept =
default;
232 ~InputStream() =
default;
235 impl::RawReader<Response> stream_;
248template <
typename Request,
typename Response>
261 [[nodiscard]]
bool Write(
const Request& request);
292 using RawStream = grpc::ClientAsyncWriter<Request>;
294 template <
typename Stub>
295 OutputStream(impl::CallParams&& params, Stub& stub,
296 impl::RawWriterPreparer<Stub, Request, Response> prepare_func);
299 OutputStream(OutputStream&&)
noexcept =
default;
300 OutputStream& operator=(OutputStream&&)
noexcept =
default;
301 ~OutputStream() =
default;
304 std::unique_ptr<Response> final_response_;
305 impl::RawWriter<Request> stream_;
343template <
typename Request,
typename Response>
344class [[nodiscard]] BidirectionalStream final :
public CallAnyBase {
353 [[nodiscard]]
bool Read(Response& response);
372 [[nodiscard]]
bool Write(
const Request& request);
398 using RawStream = grpc::ClientAsyncReaderWriter<Request, Response>;
400 template <
typename Stub>
402 impl::CallParams&& params, Stub& stub,
403 impl::RawReaderWriterPreparer<Stub, Request, Response> prepare_func);
406 BidirectionalStream(BidirectionalStream&&)
noexcept =
default;
407 BidirectionalStream& operator=(BidirectionalStream&&)
noexcept =
default;
408 ~BidirectionalStream() =
default;
411 impl::RawReaderWriter<Request, Response> stream_;
417void CallMiddlewares(
const Middlewares& mws,
CallAnyBase& call,
418 utils::function_ref<
void()> user_call,
419 const ::google::protobuf::Message* request);
422template <
typename RPC>
424 impl::RpcData& data,
typename RPC::RawStream& stream)
noexcept
425 : impl_(data), stream_(&stream) {}
427template <
typename RPC>
429 if (
auto*
const data = impl_.GetData()) {
430 impl::RpcData::AsyncMethodInvocationGuard guard(*data);
431 const auto wait_status =
432 impl::Wait(data->GetAsyncMethodInvocation(), data->GetContext());
433 if (wait_status != impl::AsyncMethodInvocation::WaitStatus::kOk) {
434 if (wait_status == impl::AsyncMethodInvocation::WaitStatus::kCancelled) {
435 data->GetStatsScope().OnCancelled();
437 impl::Finish(*stream_, *data,
false);
442template <
typename RPC>
445 if (
this == &other)
return *
this;
446 [[maybe_unused]]
auto for_destruction = std::move(*
this);
447 impl_ = std::move(other.impl_);
448 stream_ = other.stream_;
452template <
typename RPC>
454 auto*
const data = impl_.GetData();
455 UINVARIANT(data,
"'Get' must be called only once");
456 impl::RpcData::AsyncMethodInvocationGuard guard(*data);
459 impl::Wait(data->GetAsyncMethodInvocation(), data->GetContext());
460 if (result == impl::AsyncMethodInvocation::WaitStatus::kCancelled) {
461 data->GetStatsScope().OnCancelled();
462 data->GetStatsScope().Flush();
464 if (result == impl::AsyncMethodInvocation::WaitStatus::kError) {
467 impl::Finish(*stream_, *data,
true);
469 return result == impl::AsyncMethodInvocation::WaitStatus::kOk;
472template <
typename RPC>
474 return impl_.IsReady();
478template <
typename Stub,
typename Request>
479UnaryCall<Response>::UnaryCall(
480 impl::CallParams&& params, Stub& stub,
481 impl::RawResponseReaderPreparer<Stub, Request, Response> prepare_func,
484 impl::CallMiddlewares(
485 GetData().GetMiddlewares(), *
this,
487 reader_ = (stub.*prepare_func)(&GetData().GetContext(), req,
488 &GetData().GetQueue());
489 reader_->StartCall();
492 GetData().SetWritesFinished();
495template <
typename Response>
503template <
typename Response>
506 PrepareFinish(GetData());
507 GetData().EmplaceFinishAsyncMethodInvocation();
508 auto& finish = GetData().GetFinishAsyncMethodInvocation();
509 auto& status = GetData().GetStatus();
510 reader_->Finish(&response, &status, finish.GetTag());
515template <
typename Stub,
typename Request>
516InputStream<Response>::InputStream(
517 impl::CallParams&& params, Stub& stub,
518 impl::RawReaderPreparer<Stub, Request, Response> prepare_func,
521 impl::CallMiddlewares(
522 GetData().GetMiddlewares(), *
this,
524 stream_ = (stub.*prepare_func)(&GetData().GetContext(), req,
525 &GetData().GetQueue());
526 impl::StartCall(*stream_, GetData());
529 GetData().SetWritesFinished();
532template <
typename Response>
533bool InputStream<Response>::
Read(Response& response) {
534 if (impl::Read(*stream_, response, GetData())) {
539 impl::Finish(*stream_, GetData(),
true);
545template <
typename Stub>
546OutputStream<Request, Response>::OutputStream(
547 impl::CallParams&& params, Stub& stub,
548 impl::RawWriterPreparer<Stub, Request, Response> prepare_func)
550 final_response_(std::make_unique<Response>()) {
551 impl::CallMiddlewares(
552 GetData().GetMiddlewares(), *
this,
556 (stub.*prepare_func)(&GetData().GetContext(), final_response_.get(),
557 &GetData().GetQueue());
558 impl::StartCall(*stream_, GetData());
563template <
typename Request,
typename Response>
564bool OutputStream<Request, Response>::
Write(
const Request& request) {
567 grpc::WriteOptions write_options{};
569 return impl::Write(*stream_, request, write_options, GetData());
572template <
typename Request,
typename Response>
573void OutputStream<Request, Response>::
WriteAndCheck(
const Request& request) {
576 grpc::WriteOptions write_options{};
578 if (!impl::Write(*stream_, request, write_options, GetData())) {
579 impl::Finish(*stream_, GetData(),
true);
583template <
typename Request,
typename Response>
584Response OutputStream<Request, Response>::
Finish() {
587 if (!GetData().AreWritesFinished()) {
588 impl::WritesDone(*stream_, GetData());
591 impl::Finish(*stream_, GetData(),
true);
593 return std::move(*final_response_);
597template <
typename Stub>
598BidirectionalStream<Request, Response>::BidirectionalStream(
599 impl::CallParams&& params, Stub& stub,
600 impl::RawReaderWriterPreparer<Stub, Request, Response> prepare_func)
602 impl::CallMiddlewares(
603 GetData().GetMiddlewares(), *
this,
605 stream_ = (stub.*prepare_func)(&GetData().GetContext(),
606 &GetData().GetQueue());
607 impl::StartCall(*stream_, GetData());
612template <
typename Request,
typename Response>
614BidirectionalStream<Request, Response>::
ReadAsync(Response& response)
noexcept {
615 impl::ReadAsync(*stream_, response, GetData());
616 return StreamReadFuture<BidirectionalStream<Request, Response>>{GetData(),
620template <
typename Request,
typename Response>
621bool BidirectionalStream<Request, Response>::
Read(Response& response) {
626template <
typename Request,
typename Response>
627bool BidirectionalStream<Request, Response>::
Write(
const Request& request) {
629 grpc::WriteOptions write_options{};
631 return impl::Write(*stream_, request, write_options, GetData());
634template <
typename Request,
typename Response>
636 const Request& request) {
638 grpc::WriteOptions write_options{};
640 impl::WriteAndCheck(*stream_, request, write_options, GetData());
643template <
typename Request,
typename Response>
645 return impl::WritesDone(*stream_, GetData());