11#include <grpcpp/impl/codegen/proto_utils.h>
13#include <userver/engine/deadline.hpp>
14#include <userver/engine/future_status.hpp>
15#include <userver/utils/assert.hpp>
17#include <userver/ugrpc/client/call.hpp>
18#include <userver/ugrpc/client/impl/async_methods.hpp>
19#include <userver/ugrpc/client/impl/call_state.hpp>
20#include <userver/ugrpc/client/impl/middleware_pipeline.hpp>
21#include <userver/ugrpc/client/impl/prepare_call.hpp>
22#include <userver/ugrpc/client/middlewares/fwd.hpp>
23#include <userver/ugrpc/time_utils.hpp>
25USERVER_NAMESPACE_BEGIN
27namespace ugrpc::client {
32class UnaryFinishFutureImpl {
34 UnaryFinishFutureImpl(CallState& state,
const google::protobuf::Message* response)
noexcept;
36 UnaryFinishFutureImpl(UnaryFinishFutureImpl&&)
noexcept;
37 UnaryFinishFutureImpl& operator=(UnaryFinishFutureImpl&&)
noexcept;
38 UnaryFinishFutureImpl(
const UnaryFinishFutureImpl&) =
delete;
39 UnaryFinishFutureImpl& operator=(
const UnaryFinishFutureImpl&) =
delete;
40 ~UnaryFinishFutureImpl();
42 [[nodiscard]]
bool IsReady()
const noexcept;
44 [[nodiscard]] engine::
FutureStatus WaitUntil(engine::Deadline deadline)
const noexcept;
48 engine::impl::ContextAccessor* TryGetContextAccessor()
noexcept;
52 const google::protobuf::Message* response_;
53 mutable std::exception_ptr exception_;
57template <
typename Response>
58class [[nodiscard]] UnaryFinishFuture {
61 UnaryFinishFuture(impl::CallState& state, std::unique_ptr<Response>&& response)
noexcept
62 : response_(std::move(response)), impl_(state, impl::ToBaseMessage(*response_)) {}
65 UnaryFinishFuture(UnaryFinishFuture&&)
noexcept =
default;
66 UnaryFinishFuture& operator=(UnaryFinishFuture&&)
noexcept =
default;
67 UnaryFinishFuture(
const UnaryFinishFuture&) =
delete;
68 UnaryFinishFuture& operator=(
const UnaryFinishFuture&) =
delete;
73 [[nodiscard]]
bool IsReady()
const noexcept {
return impl_.IsReady(); }
79 [[nodiscard]] engine::
FutureStatus WaitUntil(engine::Deadline deadline)
const noexcept {
80 return impl_.WaitUntil(deadline);
95 return std::move(*response_);
100 engine::impl::ContextAccessor* TryGetContextAccessor()
noexcept {
return impl_.TryGetContextAccessor(); }
104 std::unique_ptr<Response> response_;
105 impl::UnaryFinishFutureImpl impl_;
111template <
typename RPC>
116 impl::CallState& state,
117 typename RPC::RawStream& stream,
118 const google::protobuf::Message* recv_message
146 impl::CallState* state_{};
147 typename RPC::RawStream* stream_{};
148 const google::protobuf::Message* recv_message_;
160template <
typename Response>
161class [[nodiscard]] UnaryCall final :
public CallAnyBase {
176 UnaryFinishFuture<Response>& GetFinishFuture();
179 const UnaryFinishFuture<Response>& GetFinishFuture()
const;
183 template <
typename PrepareAsyncMethod,
typename Request,
typename... PrepareExtra>
185 impl::CallParams&& params,
186 PrepareAsyncMethod prepare_async_method,
187 const Request& request,
188 PrepareExtra&&... extra
192 UnaryCall(UnaryCall&&)
noexcept =
default;
193 UnaryCall& operator=(UnaryCall&&)
noexcept =
default;
194 ~UnaryCall() =
default;
197 impl::RawResponseReader<Response> reader_{};
198 std::optional<UnaryFinishFuture<Response>> finish_future_{};
212template <
typename Response>
223 [[nodiscard]]
bool Read(Response& response);
227 using RawStream = grpc::ClientAsyncReader<Response>;
229 template <
typename PrepareAsyncMethod,
typename Request>
230 InputStream(impl::CallParams&& params, PrepareAsyncMethod prepare_async_method,
const Request& request);
233 InputStream(InputStream&&)
noexcept =
default;
234 InputStream& operator=(InputStream&&)
noexcept =
default;
235 ~InputStream() =
default;
238 impl::RawReader<Response> stream_;
248template <
typename Request,
typename Response>
262 [[nodiscard]]
bool Write(
const Request& request);
294 using RawStream = grpc::ClientAsyncWriter<Request>;
296 template <
typename PrepareAsyncMethod>
297 OutputStream(impl::CallParams&& params, PrepareAsyncMethod prepare_async_method);
300 OutputStream(OutputStream&&)
noexcept =
default;
301 OutputStream& operator=(OutputStream&&)
noexcept =
default;
302 ~OutputStream() =
default;
305 std::unique_ptr<Response> response_;
306 impl::RawWriter<Request> stream_;
344template <
typename Request,
typename Response>
345class [[nodiscard]] BidirectionalStream final :
public CallAnyBase {
355 [[nodiscard]]
bool Read(Response& response);
376 [[nodiscard]]
bool Write(
const Request& request);
403 using RawStream = grpc::ClientAsyncReaderWriter<Request, Response>;
405 template <
typename PrepareAsyncMethod>
406 BidirectionalStream(impl::CallParams&& params, PrepareAsyncMethod prepare_async_method);
409 BidirectionalStream(BidirectionalStream&&)
noexcept =
default;
410 BidirectionalStream& operator=(BidirectionalStream&&)
noexcept =
default;
411 ~BidirectionalStream() =
default;
414 impl::RawReaderWriter<Request, Response> stream_;
417template <
typename RPC>
419 impl::CallState& state,
420 typename RPC::RawStream& stream,
421 const google::protobuf::Message* recv_message
423 : state_(&state), stream_(&stream), recv_message_(recv_message) {}
425template <
typename RPC>
428 : state_{std::exchange(other.state_,
nullptr)}, stream_(other.stream_), recv_message_{other.recv_message_} {}
430template <
typename RPC>
432 if (
this == &other)
return *
this;
433 [[maybe_unused]]
auto for_destruction = std::move(*
this);
435 state_ = std::exchange(other.state_,
nullptr);
436 stream_ = other.stream_;
437 recv_message_ = other.recv_message_;
441template <
typename RPC>
444 const impl::CallState::AsyncMethodInvocationGuard guard(*state_);
445 const auto wait_status =
446 impl::WaitAndTryCancelIfNeeded(state_->GetAsyncMethodInvocation(), state_->GetContext());
447 if (wait_status != impl::AsyncMethodInvocation::WaitStatus::kOk) {
448 if (wait_status == impl::AsyncMethodInvocation::WaitStatus::kCancelled) {
449 state_->GetStatsScope().OnCancelled();
451 impl::Finish(*stream_, *state_,
nullptr,
false);
454 impl::MiddlewarePipeline::PostRecvMessage(*state_, *recv_message_);
460template <
typename RPC>
462 UINVARIANT(state_,
"'Get' must be called only once");
463 const impl::CallState::AsyncMethodInvocationGuard guard(*state_);
464 auto*
const state = std::exchange(state_,
nullptr);
465 const auto result = impl::WaitAndTryCancelIfNeeded(state->GetAsyncMethodInvocation(), state->GetContext());
466 if (result == impl::AsyncMethodInvocation::WaitStatus::kCancelled) {
467 state->GetStatsScope().OnCancelled();
468 state->GetStatsScope().Flush();
469 }
else if (result == impl::AsyncMethodInvocation::WaitStatus::kError) {
472 impl::Finish(*stream_, *state,
nullptr,
true);
475 impl::MiddlewarePipeline::PostRecvMessage(*state, *recv_message_);
478 return result == impl::AsyncMethodInvocation::WaitStatus::kOk;
481template <
typename RPC>
483 UINVARIANT(state_,
"IsReady should be called only before 'Get'");
484 auto& method = state_->GetAsyncMethodInvocation();
485 return method.IsReady();
490template <
typename Response>
491template <
typename PrepareAsyncMethod,
typename Request,
typename... PrepareExtra>
492UnaryCall<Response>::UnaryCall(
493 impl::CallParams&& params,
494 PrepareAsyncMethod prepare_async_method,
495 const Request& request,
496 PrepareExtra&&... extra
498 :
CallAnyBase(std::move(params), impl::CallKind::kUnaryCall) {
499 impl::MiddlewarePipeline::PreStartCall(GetState());
500 if constexpr (std::is_base_of_v<google::protobuf::Message, Request>) {
501 impl::MiddlewarePipeline::PreSendMessage(GetState(), request);
504 reader_ = impl::PrepareCall(
505 GetState().GetStub(),
506 prepare_async_method,
507 &GetState().GetContext(),
509 &GetState().GetQueue(),
510 std::forward<PrepareExtra>(extra)...
512 reader_->StartCall();
514 GetState().SetWritesFinished();
519template <
typename Response>
520void UnaryCall<Response>::FinishAsync() {
522 auto response = std::make_unique<Response>();
524 PrepareFinish(GetState());
525 GetState().EmplaceFinishAsyncMethodInvocation();
526 auto& finish = GetState().GetFinishAsyncMethodInvocation();
527 auto& status = GetState().GetStatus();
528 reader_->Finish(response.get(), &status, finish.GetCompletionTag());
530 finish_future_.emplace(GetState(), std::move(response));
533template <
typename Response>
534UnaryFinishFuture<Response>& UnaryCall<Response>::GetFinishFuture() {
536 return *finish_future_;
539template <
typename Response>
540const UnaryFinishFuture<Response>& UnaryCall<Response>::GetFinishFuture()
const {
542 return *finish_future_;
547template <
typename Response>
548template <
typename PrepareAsyncMethod,
typename Request>
549InputStream<Response>::InputStream(
550 impl::CallParams&& params,
551 PrepareAsyncMethod prepare_async_method,
552 const Request& request
554 :
CallAnyBase(std::move(params), impl::CallKind::kInputStream) {
555 impl::MiddlewarePipeline::PreStartCall(GetState());
556 impl::MiddlewarePipeline::PreSendMessage(GetState(), request);
559 stream_ = impl::PrepareCall(
560 GetState().GetStub(), prepare_async_method, &GetState().GetContext(), request, &GetState().GetQueue()
562 impl::StartCall(*stream_, GetState());
564 GetState().SetWritesFinished();
567template <
typename Response>
568bool InputStream<Response>::
Read(Response& response) {
569 if (!GetState().IsReadAvailable()) {
575 if (impl::Read(*stream_, response, GetState())) {
576 impl::MiddlewarePipeline::PostRecvMessage(GetState(), response);
581 impl::Finish(*stream_, GetState(),
nullptr,
true);
568bool InputStream<Response>::
Read(Response& response) {
…}
586template <
typename Request,
typename Response>
587template <
typename PrepareAsyncMethod>
588OutputStream<Request, Response>::OutputStream(impl::CallParams&& params, PrepareAsyncMethod prepare_async_method)
589 :
CallAnyBase(std::move(params), impl::CallKind::kOutputStream), response_(std::make_unique<Response>()) {
590 impl::MiddlewarePipeline::PreStartCall(GetState());
594 stream_ = impl::PrepareCall(
595 GetState().GetStub(), prepare_async_method, &GetState().GetContext(), response_.get(), &GetState().GetQueue()
597 impl::StartCall(*stream_, GetState());
600template <
typename Request,
typename Response>
601bool OutputStream<Request, Response>::
Write(
const Request& request) {
602 if (!GetState().IsWriteAvailable()) {
608 impl::MiddlewarePipeline::PreSendMessage(GetState(), request);
612 const grpc::WriteOptions write_options{};
613 return impl::Write(*stream_, request, write_options, GetState());
601bool OutputStream<Request, Response>::
Write(
const Request& request) {
…}
616template <
typename Request,
typename Response>
617void OutputStream<Request, Response>::
WriteAndCheck(
const Request& request) {
618 if (!GetState().IsWriteAndCheckAvailable()) {
621 throw RpcError(GetState().GetCallName(),
"'WriteAndCheck' called on a finished or closed stream");
624 impl::MiddlewarePipeline::PreSendMessage(GetState(), request);
628 const grpc::WriteOptions write_options{};
629 if (!impl::Write(*stream_, request, write_options, GetState())) {
631 impl::Finish(*stream_, GetState(),
nullptr,
true);
635template <
typename Request,
typename Response>
636Response OutputStream<Request, Response>::
Finish() {
639 if (GetState().IsWriteAvailable()) {
640 impl::WritesDone(*stream_, GetState());
644 impl::Finish(*stream_, GetState(), impl::ToBaseMessage(*response_),
true);
646 return std::move(*response_);
636Response OutputStream<Request, Response>::
Finish() {
…}
649template <
typename Request,
typename Response>
650template <
typename PrepareAsyncMethod>
651BidirectionalStream<Request, Response>::BidirectionalStream(
652 impl::CallParams&& params,
653 PrepareAsyncMethod prepare_async_method
655 :
CallAnyBase(std::move(params), impl::CallKind::kBidirectionalStream) {
656 impl::MiddlewarePipeline::PreStartCall(GetState());
660 impl::PrepareCall(GetState().GetStub(), prepare_async_method, &GetState().GetContext(), &GetState().GetQueue());
661 impl::StartCall(*stream_, GetState());
664template <
typename Request,
typename Response>
668 if (!GetState().IsReadAvailable()) {
671 throw RpcError(GetState().GetCallName(),
"'ReadAsync' called on a finished call");
674 impl::ReadAsync(*stream_, response, GetState());
676 GetState(), *stream_, impl::ToBaseMessage(response)};
679template <
typename Request,
typename Response>
680bool BidirectionalStream<Request, Response>::
Read(Response& response) {
681 if (!GetState().IsReadAvailable()) {
680bool BidirectionalStream<Request, Response>::
Read(Response& response) {
…}
689template <
typename Request,
typename Response>
690bool BidirectionalStream<Request, Response>::
Write(
const Request& request) {
691 if (!GetState().IsWriteAvailable()) {
697 impl::MiddlewarePipeline::PreSendMessage(GetState(), request);
700 const grpc::WriteOptions write_options{};
701 return impl::Write(*stream_, request, write_options, GetState());
690bool BidirectionalStream<Request, Response>::
Write(
const Request& request) {
…}
704template <
typename Request,
typename Response>
705void BidirectionalStream<Request, Response>::
WriteAndCheck(
const Request& request) {
706 if (!GetState().IsWriteAndCheckAvailable()) {
709 throw RpcError(GetState().GetCallName(),
"'WriteAndCheck' called on a finished or closed stream");
712 impl::MiddlewarePipeline::PreSendMessage(GetState(), request);
715 const grpc::WriteOptions write_options{};
716 impl::WriteAndCheck(*stream_, request, write_options, GetState());
705void BidirectionalStream<Request, Response>::
WriteAndCheck(
const Request& request) {
…}
719template <
typename Request,
typename Response>
721 if (!GetState().IsWriteAvailable()) {
727 return impl::WritesDone(*stream_, GetState());