11#include <grpcpp/impl/codegen/proto_utils.h>
13#include <userver/dynamic_config/snapshot.hpp>
14#include <userver/engine/deadline.hpp>
15#include <userver/engine/future_status.hpp>
16#include <userver/utils/assert.hpp>
17#include <userver/utils/function_ref.hpp>
19#include <userver/ugrpc/client/exceptions.hpp>
20#include <userver/ugrpc/client/impl/async_methods.hpp>
21#include <userver/ugrpc/client/impl/call_params.hpp>
22#include <userver/ugrpc/client/impl/channel_cache.hpp>
23#include <userver/ugrpc/client/middlewares/fwd.hpp>
24#include <userver/ugrpc/impl/deadline_timepoint.hpp>
25#include <userver/ugrpc/impl/internal_tag_fwd.hpp>
26#include <userver/ugrpc/impl/statistics_scope.hpp>
28USERVER_NAMESPACE_BEGIN
34struct MiddlewarePipeline {
35 static void PreStartCall(impl::RpcData& data);
37 static void PreSendMessage(impl::RpcData& data,
const google::protobuf::Message& message);
38 static void PostRecvMessage(impl::RpcData& data,
const google::protobuf::Message& message);
40 static void PostFinish(impl::RpcData& data,
const grpc::Status& status);
51 std::function<
void(impl::RpcData& data,
const grpc::Status& status)> post_finish
60 ~UnaryFuture()
noexcept;
91 engine::impl::ContextAccessor* TryGetContextAccessor()
noexcept;
95 impl::FutureImpl impl_;
96 std::function<
void(impl::RpcData& data,
const grpc::Status& status)> post_finish_;
100template <
typename RPC>
104 explicit StreamReadFuture(
106 typename RPC::RawStream& stream,
107 std::function<
void(impl::RpcData& data)> post_recv_message,
108 std::function<
void(impl::RpcData& data,
const grpc::Status& status)> post_finish
115 ~StreamReadFuture()
noexcept;
134 impl::FutureImpl impl_;
135 typename RPC::RawStream* stream_;
136 std::function<
void(impl::RpcData& data)> post_recv_message_;
137 std::function<
void(impl::RpcData& data,
const grpc::Status& status)> post_finish_;
144 CallAnyBase(impl::CallParams&& params,
CallKind call_kind)
145 : data_(std::make_unique<impl::RpcData>(std::move(params), call_kind)) {}
162 impl::RpcData& GetData();
165 std::unique_ptr<impl::RpcData> data_;
175template <
typename Response>
178 using ResponseType = Response;
202 template <
typename PrepareFunc,
typename Request>
203 UnaryCall(impl::CallParams&& params, PrepareFunc prepare_func,
const Request& req);
206 UnaryCall(UnaryCall&&)
noexcept =
default;
207 UnaryCall& operator=(UnaryCall&&)
noexcept =
default;
208 ~UnaryCall() =
default;
211 impl::RawResponseReader<Response> reader_;
226template <
typename Response>
236 [[nodiscard]]
bool Read(Response& response);
240 using RawStream = grpc::ClientAsyncReader<Response>;
242 template <
typename PrepareFunc,
typename Request>
243 InputStream(impl::CallParams&& params, PrepareFunc prepare_func,
const Request& req);
246 InputStream(InputStream&&)
noexcept =
default;
247 InputStream& operator=(InputStream&&)
noexcept =
default;
248 ~InputStream() =
default;
251 impl::RawReader<Response> stream_;
264template <
typename Request,
typename Response>
277 [[nodiscard]]
bool Write(
const Request& request);
308 using RawStream = grpc::ClientAsyncWriter<Request>;
310 template <
typename PrepareFunc>
311 OutputStream(impl::CallParams&& params, PrepareFunc prepare_func);
314 OutputStream(OutputStream&&)
noexcept =
default;
315 OutputStream& operator=(OutputStream&&)
noexcept =
default;
316 ~OutputStream() =
default;
319 std::unique_ptr<Response> final_response_;
320 impl::RawWriter<Request> stream_;
358template <
typename Request,
typename Response>
359class [[nodiscard]] BidirectionalStream final :
public CallAnyBase {
368 [[nodiscard]]
bool Read(Response& response);
387 [[nodiscard]]
bool Write(
const Request& request);
413 using RawStream = grpc::ClientAsyncReaderWriter<Request, Response>;
415 template <
typename PrepareFunc>
416 BidirectionalStream(impl::CallParams&& params, PrepareFunc prepare_func);
419 BidirectionalStream(BidirectionalStream&&)
noexcept =
default;
420 BidirectionalStream& operator=(BidirectionalStream&&)
noexcept =
default;
421 ~BidirectionalStream() =
default;
424 impl::RawReaderWriter<Request, Response> stream_;
429template <
typename RPC>
432 typename RPC::RawStream& stream,
433 std::function<
void(impl::RpcData& data)> post_recv_message,
434 std::function<
void(impl::RpcData& data,
const grpc::Status& status)> post_finish
438 post_recv_message_(std::move(post_recv_message)),
439 post_finish_(std::move(post_finish)) {}
441template <
typename RPC>
443 if (
auto*
const data = impl_.GetData()) {
444 impl::RpcData::AsyncMethodInvocationGuard guard(*data);
445 const auto wait_status = impl::Wait(data->GetAsyncMethodInvocation(), data->GetContext());
446 if (wait_status != impl::AsyncMethodInvocation::WaitStatus::kOk) {
447 if (wait_status == impl::AsyncMethodInvocation::WaitStatus::kCancelled) {
448 data->GetStatsScope().OnCancelled();
450 impl::Finish(*stream_, *data, post_finish_,
false);
452 post_recv_message_(*data);
457template <
typename RPC>
459 if (
this == &other)
return *
this;
460 [[maybe_unused]]
auto for_destruction = std::move(*
this);
461 impl_ = std::move(other.impl_);
462 stream_ = other.stream_;
463 post_recv_message_ = std::move(other.post_recv_message_);
464 post_finish_ = std::move(other.post_finish_);
468template <
typename RPC>
470 auto*
const data = impl_.GetData();
471 UINVARIANT(data,
"'Get' must be called only once");
472 impl::RpcData::AsyncMethodInvocationGuard guard(*data);
474 const auto result = impl::Wait(data->GetAsyncMethodInvocation(), data->GetContext());
475 if (result == impl::AsyncMethodInvocation::WaitStatus::kCancelled) {
476 data->GetStatsScope().OnCancelled();
477 data->GetStatsScope().Flush();
478 }
else if (result == impl::AsyncMethodInvocation::WaitStatus::kError) {
481 impl::Finish(*stream_, *data, post_finish_,
true);
483 post_recv_message_(*data);
485 return result == impl::AsyncMethodInvocation::WaitStatus::kOk;
488template <
typename RPC>
490 auto*
const data = impl_.GetData();
491 UINVARIANT(data,
"IsReady should be called only before 'Get'");
492 auto& method = data->GetAsyncMethodInvocation();
493 return method.IsReady();
497template <
typename PrepareFunc,
typename Request>
498UnaryCall<Response>::UnaryCall(impl::CallParams&& params, PrepareFunc prepare_func,
const Request& req)
500 impl::MiddlewarePipeline::PreStartCall(GetData());
501 if constexpr (std::is_base_of_v<google::protobuf::Message, Request>) {
502 impl::MiddlewarePipeline::PreSendMessage(GetData(), req);
505 reader_ = prepare_func(&GetData().GetContext(), req, &GetData().GetQueue());
506 reader_->StartCall();
508 GetData().SetWritesFinished();
511template <
typename Response>
519template <
typename Response>
522 PrepareFinish(GetData());
523 GetData().EmplaceFinishAsyncMethodInvocation();
524 auto& finish = GetData().GetFinishAsyncMethodInvocation();
525 auto& status = GetData().GetStatus();
526 reader_->Finish(&response, &status, finish.GetTag());
527 auto post_finish = [&response](impl::RpcData& data,
const grpc::Status& status) {
528 if constexpr (std::is_base_of_v<google::protobuf::Message, Response>) {
529 impl::MiddlewarePipeline::PostRecvMessage(data, response);
533 impl::MiddlewarePipeline::PostFinish(data, status);
539template <
typename PrepareFunc,
typename Request>
540InputStream<Response>::InputStream(impl::CallParams&& params, PrepareFunc prepare_func,
const Request& req)
542 impl::MiddlewarePipeline::PreStartCall(GetData());
543 impl::MiddlewarePipeline::PreSendMessage(GetData(), req);
546 stream_ = prepare_func(&GetData().GetContext(), req, &GetData().GetQueue());
547 impl::StartCall(*stream_, GetData());
549 GetData().SetWritesFinished();
552template <
typename Response>
553bool InputStream<Response>::
Read(Response& response) {
554 if (impl::Read(*stream_, response, GetData())) {
555 impl::MiddlewarePipeline::PostRecvMessage(GetData(), response);
560 auto post_finish = [](impl::RpcData& data,
const grpc::Status& status) {
561 impl::MiddlewarePipeline::PostFinish(data, status);
563 impl::Finish(*stream_, GetData(), post_finish,
true);
569template <
typename PrepareFunc>
570OutputStream<Request, Response>::OutputStream(impl::CallParams&& params, PrepareFunc prepare_func)
571 :
CallAnyBase(std::move(params),
CallKind::kOutputStream), final_response_(std::make_unique<Response>()) {
572 impl::MiddlewarePipeline::PreStartCall(GetData());
576 stream_ = prepare_func(&GetData().GetContext(), final_response_.get(), &GetData().GetQueue());
577 impl::StartCall(*stream_, GetData());
580template <
typename Request,
typename Response>
581bool OutputStream<Request, Response>::
Write(
const Request& request) {
582 impl::MiddlewarePipeline::PreSendMessage(GetData(), request);
586 grpc::WriteOptions write_options{};
587 return impl::Write(*stream_, request, write_options, GetData());
590template <
typename Request,
typename Response>
591void OutputStream<Request, Response>::
WriteAndCheck(
const Request& request) {
592 impl::MiddlewarePipeline::PreSendMessage(GetData(), request);
596 grpc::WriteOptions write_options{};
597 if (!impl::Write(*stream_, request, write_options, GetData())) {
598 auto post_finish = [](impl::RpcData& data,
const grpc::Status& status) {
599 impl::MiddlewarePipeline::PostFinish(data, status);
601 impl::Finish(*stream_, GetData(), post_finish,
true);
605template <
typename Request,
typename Response>
606Response OutputStream<Request, Response>::
Finish() {
609 if (!GetData().AreWritesFinished()) {
610 impl::WritesDone(*stream_, GetData());
613 auto post_finish = [](impl::RpcData& data,
const grpc::Status& status) {
614 impl::MiddlewarePipeline::PostFinish(data, status);
616 impl::Finish(*stream_, GetData(), post_finish,
true);
618 return std::move(*final_response_);
622template <
typename PrepareFunc>
623BidirectionalStream<Request, Response>::BidirectionalStream(impl::CallParams&& params, PrepareFunc prepare_func)
625 impl::MiddlewarePipeline::PreStartCall(GetData());
628 stream_ = prepare_func(&GetData().GetContext(), &GetData().GetQueue());
629 impl::StartCall(*stream_, GetData());
632template <
typename Request,
typename Response>
636 impl::ReadAsync(*stream_, response, GetData());
637 auto post_recv_message = [&response](impl::RpcData& data) {
638 impl::MiddlewarePipeline::PostRecvMessage(data, response);
640 auto post_finish = [](impl::RpcData& data,
const grpc::Status& status) {
641 impl::MiddlewarePipeline::PostFinish(data, status);
643 return StreamReadFuture<BidirectionalStream<Request, Response>>{
644 GetData(), *stream_, post_recv_message, post_finish};
647template <
typename Request,
typename Response>
648bool BidirectionalStream<Request, Response>::
Read(Response& response) {
653template <
typename Request,
typename Response>
654bool BidirectionalStream<Request, Response>::
Write(
const Request& request) {
655 impl::MiddlewarePipeline::PreSendMessage(GetData(), request);
658 grpc::WriteOptions write_options{};
659 return impl::Write(*stream_, request, write_options, GetData());
662template <
typename Request,
typename Response>
663void BidirectionalStream<Request, Response>::
WriteAndCheck(
const Request& request) {
664 impl::MiddlewarePipeline::PreSendMessage(GetData(), request);
667 grpc::WriteOptions write_options{};
668 impl::WriteAndCheck(*stream_, request, write_options, GetData());
671template <
typename Request,
typename Response>
673 return impl::WritesDone(*stream_, GetData());