11#include <grpcpp/impl/codegen/proto_utils.h>
13#include <userver/dynamic_config/snapshot.hpp>
14#include <userver/engine/deadline.hpp>
15#include <userver/engine/future_status.hpp>
16#include <userver/utils/assert.hpp>
17#include <userver/utils/function_ref.hpp>
19#include <userver/ugrpc/client/exceptions.hpp>
20#include <userver/ugrpc/client/impl/async_methods.hpp>
21#include <userver/ugrpc/client/impl/call_params.hpp>
22#include <userver/ugrpc/client/impl/channel_cache.hpp>
23#include <userver/ugrpc/client/middlewares/fwd.hpp>
24#include <userver/ugrpc/impl/deadline_timepoint.hpp>
25#include <userver/ugrpc/impl/internal_tag_fwd.hpp>
26#include <userver/ugrpc/impl/statistics_scope.hpp>
28USERVER_NAMESPACE_BEGIN
34struct MiddlewarePipeline {
35 static void PreStartCall(impl::RpcData& data);
37 static void PreSendMessage(impl::RpcData& data,
const google::protobuf::Message& message);
38 static void PostRecvMessage(impl::RpcData& data,
const google::protobuf::Message& message);
40 static void PostFinish(impl::RpcData& data,
const grpc::Status& status);
51 std::function<
void(impl::RpcData& data,
const grpc::Status& status)> post_finish
60 ~UnaryFuture()
noexcept;
91 engine::
impl::ContextAccessor* TryGetContextAccessor()
noexcept;
95 impl::FutureImpl impl_;
96 std::function<
void(impl::RpcData& data,
const grpc::Status& status)> post_finish_;
100template <
typename RPC>
104 explicit StreamReadFuture(
106 typename RPC::RawStream& stream,
107 std::function<
void(impl::RpcData& data)> post_recv_message,
108 std::function<
void(impl::RpcData& data,
const grpc::Status& status)> post_finish
115 ~StreamReadFuture()
noexcept;
134 impl::FutureImpl impl_;
135 typename RPC::RawStream* stream_;
136 std::function<
void(impl::RpcData& data)> post_recv_message_;
137 std::function<
void(impl::RpcData& data,
const grpc::Status& status)> post_finish_;
144 CallAnyBase(impl::CallParams&& params,
CallKind call_kind)
145 : data_(std::make_unique<impl::RpcData>(std::move(params), call_kind)) {}
162 impl::RpcData& GetData();
165 std::unique_ptr<impl::RpcData> data_;
175template <
typename Response>
178 using ResponseType = Response;
202 template <
typename PrepareFunc,
typename Request>
203 UnaryCall(impl::CallParams&& params, PrepareFunc prepare_func,
const Request& req);
206 UnaryCall(UnaryCall&&)
noexcept =
default;
207 UnaryCall& operator=(UnaryCall&&)
noexcept =
default;
208 ~UnaryCall() =
default;
211 impl::RawResponseReader<Response> reader_;
226template <
typename Response>
236 [[nodiscard]]
bool Read(Response& response);
240 using RawStream = grpc::ClientAsyncReader<Response>;
242 template <
typename PrepareFunc,
typename Request>
243 InputStream(impl::CallParams&& params, PrepareFunc prepare_func,
const Request& req);
246 InputStream(InputStream&&)
noexcept =
default;
247 InputStream& operator=(InputStream&&)
noexcept =
default;
248 ~InputStream() =
default;
251 impl::RawReader<Response> stream_;
264template <
typename Request,
typename Response>
277 [[nodiscard]]
bool Write(
const Request& request);
308 using RawStream = grpc::ClientAsyncWriter<Request>;
310 template <
typename PrepareFunc>
311 OutputStream(impl::CallParams&& params, PrepareFunc prepare_func);
314 OutputStream(OutputStream&&)
noexcept =
default;
315 OutputStream& operator=(OutputStream&&)
noexcept =
default;
316 ~OutputStream() =
default;
319 std::unique_ptr<Response> final_response_;
320 impl::RawWriter<Request> stream_;
358template <
typename Request,
typename Response>
359class [[nodiscard]] BidirectionalStream final :
public CallAnyBase {
368 [[nodiscard]]
bool Read(Response& response);
387 [[nodiscard]]
bool Write(
const Request& request);
413 using RawStream = grpc::ClientAsyncReaderWriter<Request, Response>;
415 template <
typename PrepareFunc>
416 BidirectionalStream(impl::CallParams&& params, PrepareFunc prepare_func);
419 BidirectionalStream(BidirectionalStream&&)
noexcept =
default;
420 BidirectionalStream& operator=(BidirectionalStream&&)
noexcept =
default;
421 ~BidirectionalStream() =
default;
424 impl::RawReaderWriter<Request, Response> stream_;
429template <
typename RPC>
432 typename RPC::RawStream& stream,
433 std::function<
void(impl::RpcData& data)> post_recv_message,
434 std::function<
void(impl::RpcData& data,
const grpc::Status& status)> post_finish
438 post_recv_message_(std::move(post_recv_message)),
439 post_finish_(std::move(post_finish)) {}
441template <
typename RPC>
443 if (
auto*
const data = impl_.GetData()) {
444 impl::RpcData::AsyncMethodInvocationGuard guard(*data);
445 const auto wait_status = impl::Wait(data->GetAsyncMethodInvocation(), data->GetContext());
446 if (wait_status != impl::AsyncMethodInvocation::WaitStatus::kOk) {
447 if (wait_status == impl::AsyncMethodInvocation::WaitStatus::kCancelled) {
448 data->GetStatsScope().OnCancelled();
450 impl::Finish(*stream_, *data, post_finish_,
false);
452 post_recv_message_(*data);
457template <
typename RPC>
459 if (
this == &other)
return *
this;
460 [[maybe_unused]]
auto for_destruction = std::move(*
this);
461 impl_ = std::move(other.impl_);
462 stream_ = other.stream_;
463 post_recv_message_ = std::move(other.post_recv_message_);
464 post_finish_ = std::move(other.post_finish_);
468template <
typename RPC>
470 auto*
const data = impl_.GetData();
471 UINVARIANT(data,
"'Get' must be called only once");
472 impl::RpcData::AsyncMethodInvocationGuard guard(*data);
474 const auto result = impl::Wait(data->GetAsyncMethodInvocation(), data->GetContext());
475 if (result == impl::AsyncMethodInvocation::WaitStatus::kCancelled) {
476 data->GetStatsScope().OnCancelled();
477 data->GetStatsScope().Flush();
478 }
else if (result == impl::AsyncMethodInvocation::WaitStatus::kError) {
481 impl::Finish(*stream_, *data, post_finish_,
true);
483 post_recv_message_(*data);
485 return result == impl::AsyncMethodInvocation::WaitStatus::kOk;
488template <
typename RPC>
490 return impl_.IsReady();
494template <
typename PrepareFunc,
typename Request>
495UnaryCall<Response>::UnaryCall(impl::CallParams&& params, PrepareFunc prepare_func,
const Request& req)
497 impl::MiddlewarePipeline::PreStartCall(GetData());
498 if constexpr (std::is_base_of_v<google::protobuf::Message, Request>) {
499 impl::MiddlewarePipeline::PreSendMessage(GetData(), req);
502 reader_ = prepare_func(&GetData().GetContext(), req, &GetData().GetQueue());
503 reader_->StartCall();
505 GetData().SetWritesFinished();
508template <
typename Response>
516template <
typename Response>
519 PrepareFinish(GetData());
520 GetData().EmplaceFinishAsyncMethodInvocation();
521 auto& finish = GetData().GetFinishAsyncMethodInvocation();
522 auto& status = GetData().GetStatus();
523 reader_->Finish(&response, &status, finish.GetTag());
524 auto post_finish = [&response](impl::RpcData& data,
const grpc::Status& status) {
525 if constexpr (std::is_base_of_v<google::protobuf::Message, Response>) {
526 impl::MiddlewarePipeline::PostRecvMessage(data, response);
530 impl::MiddlewarePipeline::PostFinish(data, status);
536template <
typename PrepareFunc,
typename Request>
537InputStream<Response>::InputStream(impl::CallParams&& params, PrepareFunc prepare_func,
const Request& req)
539 impl::MiddlewarePipeline::PreStartCall(GetData());
540 impl::MiddlewarePipeline::PreSendMessage(GetData(), req);
543 stream_ = prepare_func(&GetData().GetContext(), req, &GetData().GetQueue());
544 impl::StartCall(*stream_, GetData());
546 GetData().SetWritesFinished();
549template <
typename Response>
550bool InputStream<Response>::
Read(Response& response) {
551 if (impl::Read(*stream_, response, GetData())) {
552 impl::MiddlewarePipeline::PostRecvMessage(GetData(), response);
557 auto post_finish = [](impl::RpcData& data,
const grpc::Status& status) {
558 impl::MiddlewarePipeline::PostFinish(data, status);
560 impl::Finish(*stream_, GetData(), post_finish,
true);
566template <
typename PrepareFunc>
567OutputStream<Request, Response>::OutputStream(impl::CallParams&& params, PrepareFunc prepare_func)
568 :
CallAnyBase(std::move(params),
CallKind::kOutputStream), final_response_(std::make_unique<Response>()) {
569 impl::MiddlewarePipeline::PreStartCall(GetData());
573 stream_ = prepare_func(&GetData().GetContext(), final_response_.get(), &GetData().GetQueue());
574 impl::StartCall(*stream_, GetData());
577template <
typename Request,
typename Response>
578bool OutputStream<Request, Response>::
Write(
const Request& request) {
579 impl::MiddlewarePipeline::PreSendMessage(GetData(), request);
583 grpc::WriteOptions write_options{};
584 return impl::Write(*stream_, request, write_options, GetData());
587template <
typename Request,
typename Response>
588void OutputStream<Request, Response>::
WriteAndCheck(
const Request& request) {
589 impl::MiddlewarePipeline::PreSendMessage(GetData(), request);
593 grpc::WriteOptions write_options{};
594 if (!impl::Write(*stream_, request, write_options, GetData())) {
595 auto post_finish = [](impl::RpcData& data,
const grpc::Status& status) {
596 impl::MiddlewarePipeline::PostFinish(data, status);
598 impl::Finish(*stream_, GetData(), post_finish,
true);
602template <
typename Request,
typename Response>
603Response OutputStream<Request, Response>::
Finish() {
606 if (!GetData().AreWritesFinished()) {
607 impl::WritesDone(*stream_, GetData());
610 auto post_finish = [](impl::RpcData& data,
const grpc::Status& status) {
611 impl::MiddlewarePipeline::PostFinish(data, status);
613 impl::Finish(*stream_, GetData(), post_finish,
true);
615 return std::move(*final_response_);
619template <
typename PrepareFunc>
620BidirectionalStream<Request, Response>::BidirectionalStream(impl::CallParams&& params, PrepareFunc prepare_func)
622 impl::MiddlewarePipeline::PreStartCall(GetData());
625 stream_ = prepare_func(&GetData().GetContext(), &GetData().GetQueue());
626 impl::StartCall(*stream_, GetData());
629template <
typename Request,
typename Response>
633 impl::ReadAsync(*stream_, response, GetData());
634 auto post_recv_message = [&response](impl::RpcData& data) {
635 impl::MiddlewarePipeline::PostRecvMessage(data, response);
637 auto post_finish = [](impl::RpcData& data,
const grpc::Status& status) {
638 impl::MiddlewarePipeline::PostFinish(data, status);
640 return StreamReadFuture<BidirectionalStream<Request, Response>>{
641 GetData(), *stream_, post_recv_message, post_finish};
644template <
typename Request,
typename Response>
645bool BidirectionalStream<Request, Response>::
Read(Response& response) {
650template <
typename Request,
typename Response>
651bool BidirectionalStream<Request, Response>::
Write(
const Request& request) {
652 impl::MiddlewarePipeline::PreSendMessage(GetData(), request);
655 grpc::WriteOptions write_options{};
656 return impl::Write(*stream_, request, write_options, GetData());
659template <
typename Request,
typename Response>
660void BidirectionalStream<Request, Response>::
WriteAndCheck(
const Request& request) {
661 impl::MiddlewarePipeline::PreSendMessage(GetData(), request);
664 grpc::WriteOptions write_options{};
665 impl::WriteAndCheck(*stream_, request, write_options, GetData());
668template <
typename Request,
typename Response>
670 return impl::WritesDone(*stream_, GetData());