12#include <grpcpp/impl/codegen/proto_utils.h>
14#include <userver/engine/deadline.hpp>
15#include <userver/engine/future_status.hpp>
16#include <userver/utils/assert.hpp>
18#include <userver/ugrpc/client/call.hpp>
19#include <userver/ugrpc/client/impl/async_methods.hpp>
20#include <userver/ugrpc/client/middlewares/fwd.hpp>
21#include <userver/ugrpc/deadline_timepoint.hpp>
23USERVER_NAMESPACE_BEGIN
25namespace ugrpc::client {
29struct MiddlewarePipeline {
30 static void PreStartCall(impl::RpcData& data);
32 static void PreSendMessage(impl::RpcData& data,
const google::protobuf::Message& message);
33 static void PostRecvMessage(impl::RpcData& data,
const google::protobuf::Message& message);
35 static void PostFinish(impl::RpcData& data,
const grpc::Status& status);
39class [[nodiscard]] UnaryFuture {
44 std::function<
void(impl::RpcData& data,
const grpc::Status& status)> post_finish
48 UnaryFuture(UnaryFuture&&)
noexcept;
49 UnaryFuture& operator=(UnaryFuture&&)
noexcept;
50 UnaryFuture(
const UnaryFuture&) =
delete;
51 UnaryFuture& operator=(
const UnaryFuture&) =
delete;
58 [[nodiscard]]
bool IsReady()
const noexcept;
64 [[nodiscard]] engine::
FutureStatus WaitUntil(engine::Deadline deadline)
const noexcept;
79 engine::impl::ContextAccessor* TryGetContextAccessor()
noexcept;
83 impl::RpcData* data_{};
84 std::function<
void(impl::RpcData& data,
const grpc::Status& status)> post_finish_;
85 mutable std::exception_ptr exception_;
91template <
typename RPC>
95 explicit StreamReadFuture(
97 typename RPC::RawStream& stream,
98 std::function<
void(impl::RpcData& data)> post_recv_message,
99 std::function<
void(impl::RpcData& data,
const grpc::Status& status)> post_finish
127 impl::RpcData* data_{};
128 typename RPC::RawStream* stream_{};
129 std::function<
void(impl::RpcData& data)> post_recv_message_;
130 std::function<
void(impl::RpcData& data,
const grpc::Status& status)> post_finish_;
142template <
typename Response>
143class [[nodiscard]] UnaryCall final :
public CallAnyBase {
145 using ResponseType = Response;
165 UnaryFuture FinishAsync(Response& response);
169 template <
typename PrepareAsyncCall,
typename Request>
170 UnaryCall(impl::CallParams&& params, PrepareAsyncCall prepare_async_call,
const Request& req);
173 UnaryCall(UnaryCall&&)
noexcept =
default;
174 UnaryCall& operator=(UnaryCall&&)
noexcept =
default;
175 ~UnaryCall() =
default;
178 impl::RawResponseReader<Response> reader_;
195template <
typename Response>
205 [[nodiscard]]
bool Read(Response& response);
209 using RawStream = grpc::ClientAsyncReader<Response>;
211 template <
typename PrepareAsyncCall,
typename Request>
212 InputStream(impl::CallParams&& params, PrepareAsyncCall prepare_async_call,
const Request& req);
215 InputStream(InputStream&&)
noexcept =
default;
216 InputStream& operator=(InputStream&&)
noexcept =
default;
217 ~InputStream() =
default;
220 impl::RawReader<Response> stream_;
233template <
typename Request,
typename Response>
246 [[nodiscard]]
bool Write(
const Request& request);
277 using RawStream = grpc::ClientAsyncWriter<Request>;
279 template <
typename PrepareAsyncCall>
280 OutputStream(impl::CallParams&& params, PrepareAsyncCall prepare_async_call);
283 OutputStream(OutputStream&&)
noexcept =
default;
284 OutputStream& operator=(OutputStream&&)
noexcept =
default;
285 ~OutputStream() =
default;
288 std::unique_ptr<Response> final_response_;
289 impl::RawWriter<Request> stream_;
327template <
typename Request,
typename Response>
328class [[nodiscard]] BidirectionalStream final :
public CallAnyBase {
337 [[nodiscard]]
bool Read(Response& response);
356 [[nodiscard]]
bool Write(
const Request& request);
382 using RawStream = grpc::ClientAsyncReaderWriter<Request, Response>;
384 template <
typename PrepareAsyncCall>
385 BidirectionalStream(impl::CallParams&& params, PrepareAsyncCall prepare_async_call);
388 BidirectionalStream(BidirectionalStream&&)
noexcept =
default;
389 BidirectionalStream& operator=(BidirectionalStream&&)
noexcept =
default;
390 ~BidirectionalStream() =
default;
393 impl::RawReaderWriter<Request, Response> stream_;
396template <
typename RPC>
399 typename RPC::RawStream& stream,
400 std::function<
void(impl::RpcData& data)> post_recv_message,
401 std::function<
void(impl::RpcData& data,
const grpc::Status& status)> post_finish
405 post_recv_message_(std::move(post_recv_message)),
406 post_finish_(std::move(post_finish)) {}
408template <
typename RPC>
410 : data_{std::exchange(other.data_,
nullptr)},
411 stream_{other.stream_},
412 post_recv_message_{std::move(other.post_recv_message_)},
413 post_finish_{std::move(other.post_finish_)} {}
415template <
typename RPC>
417 if (
this == &other)
return *
this;
418 [[maybe_unused]]
auto for_destruction = std::move(*
this);
419 data_ = std::exchange(other.data_,
nullptr);
420 stream_ = other.stream_;
421 post_recv_message_ = std::move(other.post_recv_message_);
422 post_finish_ = std::move(other.post_finish_);
426template <
typename RPC>
429 impl::RpcData::AsyncMethodInvocationGuard guard(*data_);
430 const auto wait_status = impl::Wait(data_->GetAsyncMethodInvocation(), data_->GetContext());
431 if (wait_status != impl::AsyncMethodInvocation::WaitStatus::kOk) {
432 if (wait_status == impl::AsyncMethodInvocation::WaitStatus::kCancelled) {
433 data_->GetStatsScope().OnCancelled();
435 impl::Finish(*stream_, *data_, post_finish_,
false);
437 post_recv_message_(*data_);
442template <
typename RPC>
444 UINVARIANT(data_,
"'Get' must be called only once");
445 impl::RpcData::AsyncMethodInvocationGuard guard(*data_);
446 auto*
const data = std::exchange(data_,
nullptr);
447 const auto result = impl::Wait(data->GetAsyncMethodInvocation(), data->GetContext());
448 if (result == impl::AsyncMethodInvocation::WaitStatus::kCancelled) {
449 data->GetStatsScope().OnCancelled();
450 data->GetStatsScope().Flush();
451 }
else if (result == impl::AsyncMethodInvocation::WaitStatus::kError) {
454 impl::Finish(*stream_, *data, post_finish_,
true);
456 post_recv_message_(*data);
458 return result == impl::AsyncMethodInvocation::WaitStatus::kOk;
461template <
typename RPC>
463 UINVARIANT(data_,
"IsReady should be called only before 'Get'");
464 auto& method = data_->GetAsyncMethodInvocation();
465 return method.IsReady();
470template <
typename Response>
471template <
typename PrepareAsyncCall,
typename Request>
472UnaryCall<Response>::UnaryCall(impl::CallParams&& params, PrepareAsyncCall prepare_async_call,
const Request& req)
473 :
CallAnyBase(std::move(params), impl::CallKind::kUnaryCall) {
474 impl::MiddlewarePipeline::PreStartCall(GetData());
475 if constexpr (std::is_base_of_v<google::protobuf::Message, Request>) {
476 impl::MiddlewarePipeline::PreSendMessage(GetData(), req);
478 reader_ = prepare_async_call(GetData().GetStub(), &GetData().GetContext(), req, &GetData().GetQueue());
479 reader_->StartCall();
481 GetData().SetWritesFinished();
484template <
typename Response>
485Response UnaryCall<Response>::Finish() {
487 UnaryFuture future = FinishAsync(response);
492template <
typename Response>
493UnaryFuture UnaryCall<Response>::FinishAsync(Response& response) {
495 PrepareFinish(GetData());
496 GetData().EmplaceFinishAsyncMethodInvocation();
497 auto& finish = GetData().GetFinishAsyncMethodInvocation();
498 auto& status = GetData().GetStatus();
499 reader_->Finish(&response, &status, finish.GetTag());
500 auto post_finish = [&response](impl::RpcData& data,
const grpc::Status& status) {
502 if constexpr (std::is_base_of_v<google::protobuf::Message, Response>) {
503 impl::MiddlewarePipeline::PostRecvMessage(data, response);
508 impl::MiddlewarePipeline::PostFinish(data, status);
510 return UnaryFuture{GetData(), post_finish};
515template <
typename Response>
516template <
typename PrepareAsyncCall,
typename Request>
517InputStream<Response>::InputStream(impl::CallParams&& params, PrepareAsyncCall prepare_async_call,
const Request& req)
518 :
CallAnyBase(std::move(params), impl::CallKind::kInputStream) {
519 impl::MiddlewarePipeline::PreStartCall(GetData());
520 impl::MiddlewarePipeline::PreSendMessage(GetData(), req);
523 stream_ = prepare_async_call(GetData().GetStub(), &GetData().GetContext(), req, &GetData().GetQueue());
524 impl::StartCall(*stream_, GetData());
526 GetData().SetWritesFinished();
529template <
typename Response>
530bool InputStream<Response>::
Read(Response& response) {
531 if (impl::Read(*stream_, response, GetData())) {
532 impl::MiddlewarePipeline::PostRecvMessage(GetData(), response);
537 auto post_finish = [](impl::RpcData& data,
const grpc::Status& status) {
538 impl::MiddlewarePipeline::PostFinish(data, status);
540 impl::Finish(*stream_, GetData(), post_finish,
true);
530bool InputStream<Response>::
Read(Response& response) {
…}
545template <
typename Request,
typename Response>
546template <
typename PrepareAsyncCall>
547OutputStream<Request, Response>::OutputStream(impl::CallParams&& params, PrepareAsyncCall prepare_async_call)
548 :
CallAnyBase(std::move(params), impl::CallKind::kOutputStream), final_response_(std::make_unique<Response>()) {
549 impl::MiddlewarePipeline::PreStartCall(GetData());
554 prepare_async_call(GetData().GetStub(), &GetData().GetContext(), final_response_.get(), &GetData().GetQueue());
555 impl::StartCall(*stream_, GetData());
558template <
typename Request,
typename Response>
559bool OutputStream<Request, Response>::
Write(
const Request& request) {
560 impl::MiddlewarePipeline::PreSendMessage(GetData(), request);
564 grpc::WriteOptions write_options{};
565 return impl::Write(*stream_, request, write_options, GetData());
559bool OutputStream<Request, Response>::
Write(
const Request& request) {
…}
568template <
typename Request,
typename Response>
569void OutputStream<Request, Response>::
WriteAndCheck(
const Request& request) {
570 impl::MiddlewarePipeline::PreSendMessage(GetData(), request);
574 grpc::WriteOptions write_options{};
575 if (!impl::Write(*stream_, request, write_options, GetData())) {
576 auto post_finish = [](impl::RpcData& data,
const grpc::Status& status) {
577 impl::MiddlewarePipeline::PostFinish(data, status);
579 impl::Finish(*stream_, GetData(), post_finish,
true);
583template <
typename Request,
typename Response>
584Response OutputStream<Request, Response>::
Finish() {
587 if (!GetData().AreWritesFinished()) {
588 impl::WritesDone(*stream_, GetData());
591 auto post_finish = [
this](impl::RpcData& data,
const grpc::Status& status) {
593 if constexpr (std::is_base_of_v<google::protobuf::Message, Response>) {
595 impl::MiddlewarePipeline::PostRecvMessage(data, *final_response_);
600 impl::MiddlewarePipeline::PostFinish(data, status);
602 impl::Finish(*stream_, GetData(), post_finish,
true);
604 return std::move(*final_response_);
584Response OutputStream<Request, Response>::
Finish() {
…}
607template <
typename Request,
typename Response>
608template <
typename PrepareAsyncCall>
609BidirectionalStream<Request, Response>::BidirectionalStream(
610 impl::CallParams&& params,
611 PrepareAsyncCall prepare_async_call
613 :
CallAnyBase(std::move(params), impl::CallKind::kBidirectionalStream) {
614 impl::MiddlewarePipeline::PreStartCall(GetData());
617 stream_ = prepare_async_call(GetData().GetStub(), &GetData().GetContext(), &GetData().GetQueue());
618 impl::StartCall(*stream_, GetData());
621template <
typename Request,
typename Response>
625 impl::ReadAsync(*stream_, response, GetData());
626 auto post_recv_message = [&response](impl::RpcData& data) {
627 impl::MiddlewarePipeline::PostRecvMessage(data, response);
629 auto post_finish = [](impl::RpcData& data,
const grpc::Status& status) {
630 impl::MiddlewarePipeline::PostFinish(data, status);
633 GetData(), *stream_, post_recv_message, post_finish};
636template <
typename Request,
typename Response>
637bool BidirectionalStream<Request, Response>::
Read(Response& response) {
637bool BidirectionalStream<Request, Response>::
Read(Response& response) {
…}
642template <
typename Request,
typename Response>
643bool BidirectionalStream<Request, Response>::
Write(
const Request& request) {
644 impl::MiddlewarePipeline::PreSendMessage(GetData(), request);
647 grpc::WriteOptions write_options{};
648 return impl::Write(*stream_, request, write_options, GetData());
643bool BidirectionalStream<Request, Response>::
Write(
const Request& request) {
…}
651template <
typename Request,
typename Response>
652void BidirectionalStream<Request, Response>::
WriteAndCheck(
const Request& request) {
653 impl::MiddlewarePipeline::PreSendMessage(GetData(), request);
656 grpc::WriteOptions write_options{};
657 impl::WriteAndCheck(*stream_, request, write_options, GetData());
652void BidirectionalStream<Request, Response>::
WriteAndCheck(
const Request& request) {
…}
660template <
typename Request,
typename Response>
662 return impl::WritesDone(*stream_, GetData());