11#include <grpcpp/impl/codegen/proto_utils.h>
13#include <userver/engine/deadline.hpp>
14#include <userver/engine/future_status.hpp>
15#include <userver/utils/assert.hpp>
17#include <userver/ugrpc/client/call.hpp>
18#include <userver/ugrpc/client/impl/async_methods.hpp>
19#include <userver/ugrpc/client/impl/call_state.hpp>
20#include <userver/ugrpc/client/impl/middleware_pipeline.hpp>
21#include <userver/ugrpc/client/impl/prepare_call.hpp>
22#include <userver/ugrpc/client/middlewares/fwd.hpp>
23#include <userver/ugrpc/time_utils.hpp>
25USERVER_NAMESPACE_BEGIN
27namespace ugrpc::client {
32class UnaryFinishFutureImpl {
34 UnaryFinishFutureImpl(CallState& state,
const google::protobuf::Message* response)
noexcept;
36 UnaryFinishFutureImpl(UnaryFinishFutureImpl&&)
noexcept;
37 UnaryFinishFutureImpl& operator=(UnaryFinishFutureImpl&&)
noexcept;
38 UnaryFinishFutureImpl(
const UnaryFinishFutureImpl&) =
delete;
39 UnaryFinishFutureImpl& operator=(
const UnaryFinishFutureImpl&) =
delete;
40 ~UnaryFinishFutureImpl();
42 [[nodiscard]]
bool IsReady()
const noexcept;
44 [[nodiscard]] engine::
FutureStatus WaitUntil(engine::Deadline deadline)
const noexcept;
48 engine::impl::ContextAccessor* TryGetContextAccessor()
noexcept;
52 const google::protobuf::Message* response_;
53 mutable std::exception_ptr exception_;
57template <
typename Response>
58class [[nodiscard]] UnaryFinishFuture {
61 UnaryFinishFuture(CallState& state, std::unique_ptr<Response>&& response)
noexcept
62 : response_(std::move(response)), impl_(state, impl::ToBaseMessage(*response_)) {}
65 UnaryFinishFuture(UnaryFinishFuture&&)
noexcept =
default;
66 UnaryFinishFuture& operator=(UnaryFinishFuture&&)
noexcept =
default;
67 UnaryFinishFuture(
const UnaryFinishFuture&) =
delete;
68 UnaryFinishFuture& operator=(
const UnaryFinishFuture&) =
delete;
73 [[nodiscard]]
bool IsReady()
const noexcept {
return impl_.IsReady(); }
79 [[nodiscard]] engine::
FutureStatus WaitUntil(engine::Deadline deadline)
const noexcept {
80 return impl_.WaitUntil(deadline);
95 return std::move(*response_);
100 engine::impl::ContextAccessor* TryGetContextAccessor()
noexcept {
return impl_.TryGetContextAccessor(); }
104 std::unique_ptr<Response> response_;
105 UnaryFinishFutureImpl impl_;
111template <
typename RPC>
116 impl::CallState& state,
117 typename RPC::RawStream& stream,
118 const google::protobuf::Message* recv_message
146 impl::CallState* state_{};
147 typename RPC::RawStream* stream_{};
148 const google::protobuf::Message* recv_message_;
160template <
typename Response>
161class [[nodiscard]] UnaryCall final :
public CallAnyBase {
176 UnaryFinishFuture<Response>& GetFinishFuture();
179 const UnaryFinishFuture<Response>& GetFinishFuture()
const;
183 template <
typename Stub,
typename Request>
186 PrepareUnaryCallProxy<Stub, Request, Response> prepare_unary_call,
187 const Request& request
191 UnaryCall(UnaryCall&&)
noexcept =
default;
192 UnaryCall& operator=(UnaryCall&&)
noexcept =
default;
193 ~UnaryCall() =
default;
196 RawResponseReader<Response> reader_{};
197 std::optional<UnaryFinishFuture<Response>> finish_future_{};
211template <
typename Response>
222 [[nodiscard]]
bool Read(Response& response);
226 using RawStream = grpc::ClientAsyncReader<Response>;
228 template <
typename Stub,
typename Request>
230 impl::CallParams&& params,
231 impl::PrepareServerStreamingCall<Stub, Request, Response> prepare_async_method,
232 const Request& request
236 InputStream(InputStream&&)
noexcept =
default;
237 InputStream& operator=(InputStream&&)
noexcept =
default;
238 ~InputStream() =
default;
241 impl::RawReader<Response> stream_;
251template <
typename Request,
typename Response>
265 [[nodiscard]]
bool Write(
const Request& request);
297 using RawStream = grpc::ClientAsyncWriter<Request>;
299 template <
typename Stub>
301 impl::CallParams&& params,
302 impl::PrepareClientStreamingCall<Stub, Request, Response> prepare_async_method
306 OutputStream(OutputStream&&)
noexcept =
default;
307 OutputStream& operator=(OutputStream&&)
noexcept =
default;
308 ~OutputStream() =
default;
311 std::unique_ptr<Response> response_;
312 impl::RawWriter<Request> stream_;
350template <
typename Request,
typename Response>
351class [[nodiscard]] BidirectionalStream final :
public CallAnyBase {
361 [[nodiscard]]
bool Read(Response& response);
382 [[nodiscard]]
bool Write(
const Request& request);
409 using RawStream = grpc::ClientAsyncReaderWriter<Request, Response>;
411 template <
typename Stub>
413 impl::CallParams&& params,
414 impl::PrepareBidiStreamingCall<Stub, Request, Response> prepare_async_method
418 BidirectionalStream(BidirectionalStream&&)
noexcept =
default;
419 BidirectionalStream& operator=(BidirectionalStream&&)
noexcept =
default;
420 ~BidirectionalStream() =
default;
423 impl::RawReaderWriter<Request, Response> stream_;
426template <
typename RPC>
428 impl::CallState& state,
429 typename RPC::RawStream& stream,
430 const google::protobuf::Message* recv_message
432 : state_(&state), stream_(&stream), recv_message_(recv_message) {}
434template <
typename RPC>
437 : state_{std::exchange(other.state_,
nullptr)}, stream_(other.stream_), recv_message_{other.recv_message_} {}
439template <
typename RPC>
441 if (
this == &other)
return *
this;
442 [[maybe_unused]]
auto for_destruction = std::move(*
this);
444 state_ = std::exchange(other.state_,
nullptr);
445 stream_ = other.stream_;
446 recv_message_ = other.recv_message_;
450template <
typename RPC>
453 const impl::CallState::AsyncMethodInvocationGuard guard(*state_);
454 const auto wait_status =
455 impl::WaitAndTryCancelIfNeeded(state_->GetAsyncMethodInvocation(), state_->GetContext());
456 if (wait_status != impl::AsyncMethodInvocation::WaitStatus::kOk) {
457 if (wait_status == impl::AsyncMethodInvocation::WaitStatus::kCancelled) {
458 state_->GetStatsScope().OnCancelled();
460 impl::Finish(*stream_, *state_,
nullptr,
false);
463 impl::MiddlewarePipeline::PostRecvMessage(*state_, *recv_message_);
469template <
typename RPC>
471 UINVARIANT(state_,
"'Get' must be called only once");
472 const impl::CallState::AsyncMethodInvocationGuard guard(*state_);
473 auto*
const state = std::exchange(state_,
nullptr);
474 const auto result = impl::WaitAndTryCancelIfNeeded(state->GetAsyncMethodInvocation(), state->GetContext());
475 if (result == impl::AsyncMethodInvocation::WaitStatus::kCancelled) {
476 state->GetStatsScope().OnCancelled();
477 state->GetStatsScope().Flush();
478 }
else if (result == impl::AsyncMethodInvocation::WaitStatus::kError) {
481 impl::Finish(*stream_, *state,
nullptr,
true);
484 impl::MiddlewarePipeline::PostRecvMessage(*state, *recv_message_);
487 return result == impl::AsyncMethodInvocation::WaitStatus::kOk;
490template <
typename RPC>
492 UINVARIANT(state_,
"IsReady should be called only before 'Get'");
493 auto& method = state_->GetAsyncMethodInvocation();
494 return method.IsReady();
499template <
typename Response>
500template <
typename Stub,
typename Request>
501UnaryCall<Response>::UnaryCall(
503 PrepareUnaryCallProxy<Stub, Request, Response> prepare_unary_call,
504 const Request& request
506 :
CallAnyBase(std::move(params), CallKind::kUnaryCall) {
507 MiddlewarePipeline::PreStartCall(GetState());
508 if constexpr (std::is_base_of_v<google::protobuf::Message, Request>) {
509 MiddlewarePipeline::PreSendMessage(GetState(), request);
513 prepare_unary_call.PrepareCall(GetState().GetStub(), &GetState().GetContext(), request, &GetState().GetQueue());
514 reader_->StartCall();
516 GetState().SetWritesFinished();
521template <
typename Response>
522void UnaryCall<Response>::FinishAsync() {
524 auto response = std::make_unique<Response>();
526 PrepareFinish(GetState());
527 GetState().EmplaceFinishAsyncMethodInvocation();
528 auto& finish = GetState().GetFinishAsyncMethodInvocation();
529 auto& status = GetState().GetStatus();
530 reader_->Finish(response.get(), &status, finish.GetCompletionTag());
532 finish_future_.emplace(GetState(), std::move(response));
535template <
typename Response>
536UnaryFinishFuture<Response>& UnaryCall<Response>::GetFinishFuture() {
538 return *finish_future_;
541template <
typename Response>
542const UnaryFinishFuture<Response>& UnaryCall<Response>::GetFinishFuture()
const {
544 return *finish_future_;
549template <
typename Response>
550template <
typename Stub,
typename Request>
551InputStream<Response>::InputStream(
552 impl::CallParams&& params,
553 impl::PrepareServerStreamingCall<Stub, Request, Response> prepare_async_method,
554 const Request& request
556 :
CallAnyBase(std::move(params), impl::CallKind::kInputStream) {
557 impl::MiddlewarePipeline::PreStartCall(GetState());
558 impl::MiddlewarePipeline::PreSendMessage(GetState(), request);
561 stream_ = impl::PrepareCall(
562 prepare_async_method, GetState().GetStub(), &GetState().GetContext(), request, &GetState().GetQueue()
564 impl::StartCall(*stream_, GetState());
566 GetState().SetWritesFinished();
569template <
typename Response>
570bool InputStream<Response>::
Read(Response& response) {
571 if (!GetState().IsReadAvailable()) {
577 if (impl::Read(*stream_, response, GetState())) {
578 impl::MiddlewarePipeline::PostRecvMessage(GetState(), response);
583 impl::Finish(*stream_, GetState(),
nullptr,
true);
570bool InputStream<Response>::
Read(Response& response) {
…}
588template <
typename Request,
typename Response>
589template <
typename Stub>
590OutputStream<Request, Response>::OutputStream(
591 impl::CallParams&& params,
592 impl::PrepareClientStreamingCall<Stub, Request, Response> prepare_async_method
594 :
CallAnyBase(std::move(params), impl::CallKind::kOutputStream), response_(std::make_unique<Response>()) {
595 impl::MiddlewarePipeline::PreStartCall(GetState());
599 stream_ = impl::PrepareCall(
600 prepare_async_method, GetState().GetStub(), &GetState().GetContext(), response_.get(), &GetState().GetQueue()
602 impl::StartCall(*stream_, GetState());
605template <
typename Request,
typename Response>
606bool OutputStream<Request, Response>::
Write(
const Request& request) {
607 if (!GetState().IsWriteAvailable()) {
613 impl::MiddlewarePipeline::PreSendMessage(GetState(), request);
617 const grpc::WriteOptions write_options{};
618 return impl::Write(*stream_, request, write_options, GetState());
606bool OutputStream<Request, Response>::
Write(
const Request& request) {
…}
621template <
typename Request,
typename Response>
622void OutputStream<Request, Response>::
WriteAndCheck(
const Request& request) {
623 if (!GetState().IsWriteAndCheckAvailable()) {
626 throw RpcError(GetState().GetCallName(),
"'WriteAndCheck' called on a finished or closed stream");
629 impl::MiddlewarePipeline::PreSendMessage(GetState(), request);
633 const grpc::WriteOptions write_options{};
634 if (!impl::Write(*stream_, request, write_options, GetState())) {
636 impl::Finish(*stream_, GetState(),
nullptr,
true);
640template <
typename Request,
typename Response>
641Response OutputStream<Request, Response>::
Finish() {
644 if (GetState().IsWriteAvailable()) {
645 impl::WritesDone(*stream_, GetState());
649 impl::Finish(*stream_, GetState(), impl::ToBaseMessage(*response_),
true);
651 return std::move(*response_);
641Response OutputStream<Request, Response>::
Finish() {
…}
654template <
typename Request,
typename Response>
655template <
typename Stub>
656BidirectionalStream<Request, Response>::BidirectionalStream(
657 impl::CallParams&& params,
658 impl::PrepareBidiStreamingCall<Stub, Request, Response> prepare_async_method
660 :
CallAnyBase(std::move(params), impl::CallKind::kBidirectionalStream) {
661 impl::MiddlewarePipeline::PreStartCall(GetState());
665 impl::PrepareCall(prepare_async_method, GetState().GetStub(), &GetState().GetContext(), &GetState().GetQueue());
666 impl::StartCall(*stream_, GetState());
669template <
typename Request,
typename Response>
673 if (!GetState().IsReadAvailable()) {
676 throw RpcError(GetState().GetCallName(),
"'ReadAsync' called on a finished call");
679 impl::ReadAsync(*stream_, response, GetState());
681 GetState(), *stream_, impl::ToBaseMessage(response)};
684template <
typename Request,
typename Response>
685bool BidirectionalStream<Request, Response>::
Read(Response& response) {
686 if (!GetState().IsReadAvailable()) {
685bool BidirectionalStream<Request, Response>::
Read(Response& response) {
…}
694template <
typename Request,
typename Response>
695bool BidirectionalStream<Request, Response>::
Write(
const Request& request) {
696 if (!GetState().IsWriteAvailable()) {
702 impl::MiddlewarePipeline::PreSendMessage(GetState(), request);
705 const grpc::WriteOptions write_options{};
706 return impl::Write(*stream_, request, write_options, GetState());
695bool BidirectionalStream<Request, Response>::
Write(
const Request& request) {
…}
709template <
typename Request,
typename Response>
710void BidirectionalStream<Request, Response>::
WriteAndCheck(
const Request& request) {
711 if (!GetState().IsWriteAndCheckAvailable()) {
714 throw RpcError(GetState().GetCallName(),
"'WriteAndCheck' called on a finished or closed stream");
717 impl::MiddlewarePipeline::PreSendMessage(GetState(), request);
720 const grpc::WriteOptions write_options{};
721 impl::WriteAndCheck(*stream_, request, write_options, GetState());
710void BidirectionalStream<Request, Response>::
WriteAndCheck(
const Request& request) {
…}
724template <
typename Request,
typename Response>
726 if (!GetState().IsWriteAvailable()) {
732 return impl::WritesDone(*stream_, GetState());