11#include <grpcpp/impl/codegen/proto_utils.h>
13#include <userver/dynamic_config/snapshot.hpp>
14#include <userver/utils/assert.hpp>
15#include <userver/utils/function_ref.hpp>
17#include <userver/ugrpc/client/exceptions.hpp>
18#include <userver/ugrpc/client/impl/async_methods.hpp>
19#include <userver/ugrpc/client/impl/call_params.hpp>
20#include <userver/ugrpc/client/impl/channel_cache.hpp>
21#include <userver/ugrpc/client/middlewares/fwd.hpp>
22#include <userver/ugrpc/impl/deadline_timepoint.hpp>
23#include <userver/ugrpc/impl/internal_tag_fwd.hpp>
24#include <userver/ugrpc/impl/statistics_scope.hpp>
26USERVER_NAMESPACE_BEGIN
34 explicit UnaryFuture(impl::RpcData& data)
noexcept;
40 ~UnaryFuture()
noexcept;
59 impl::FutureImpl impl_;
63template <
typename RPC>
67 explicit StreamReadFuture(impl::RpcData& data,
68 typename RPC::RawStream& stream)
noexcept;
74 ~StreamReadFuture()
noexcept;
93 impl::FutureImpl impl_;
94 typename RPC::RawStream* stream_;
101 CallAnyBase(impl::CallParams&& params)
102 : data_(std::make_unique<impl::RpcData>(std::move(params))) {}
120 impl::RpcData& GetData(ugrpc::impl::InternalTag);
124 impl::RpcData& GetData();
127 std::unique_ptr<impl::RpcData> data_;
137template <
typename Response>
162 template <
typename Stub,
typename Request>
164 impl::CallParams&& params, Stub& stub,
165 impl::RawResponseReaderPreparer<Stub, Request, Response> prepare_func,
169 UnaryCall(UnaryCall&&)
noexcept =
default;
170 UnaryCall& operator=(UnaryCall&&)
noexcept =
default;
171 ~UnaryCall() =
default;
174 impl::RawResponseReader<Response> reader_;
189template <
typename Response>
199 [[nodiscard]]
bool Read(Response& response);
203 using RawStream = grpc::ClientAsyncReader<Response>;
205 template <
typename Stub,
typename Request>
206 InputStream(impl::CallParams&& params, Stub& stub,
207 impl::RawReaderPreparer<Stub, Request, Response> prepare_func,
211 InputStream(InputStream&&)
noexcept =
default;
212 InputStream& operator=(InputStream&&)
noexcept =
default;
213 ~InputStream() =
default;
216 std::unique_ptr<impl::RpcData> data_;
217 impl::RawReader<Response> stream_;
230template <
typename Request,
typename Response>
243 [[nodiscard]]
bool Write(
const Request& request);
274 using RawStream = grpc::ClientAsyncWriter<Request>;
276 template <
typename Stub>
277 OutputStream(impl::CallParams&& params, Stub& stub,
278 impl::RawWriterPreparer<Stub, Request, Response> prepare_func);
281 OutputStream(OutputStream&&)
noexcept =
default;
282 OutputStream& operator=(OutputStream&&)
noexcept =
default;
283 ~OutputStream() =
default;
286 std::unique_ptr<impl::RpcData> data_;
287 std::unique_ptr<Response> final_response_;
288 impl::RawWriter<Request> stream_;
320template <
typename Request,
typename Response>
321class [[nodiscard]] BidirectionalStream final :
public CallAnyBase {
330 [[nodiscard]]
bool Read(Response& response);
349 [[nodiscard]]
bool Write(
const Request& request);
375 using RawStream = grpc::ClientAsyncReaderWriter<Request, Response>;
377 template <
typename Stub>
379 impl::CallParams&& params, Stub& stub,
380 impl::RawReaderWriterPreparer<Stub, Request, Response> prepare_func);
383 BidirectionalStream(BidirectionalStream&&)
noexcept =
default;
384 BidirectionalStream& operator=(BidirectionalStream&&)
noexcept =
default;
385 ~BidirectionalStream() =
default;
388 std::unique_ptr<impl::RpcData> data_;
389 impl::RawReaderWriter<Request, Response> stream_;
395void CallMiddlewares(
const Middlewares& mws,
CallAnyBase& call,
396 utils::function_ref<
void()> user_call,
397 const ::google::protobuf::Message* request);
400template <
typename RPC>
402 impl::RpcData& data,
typename RPC::RawStream& stream)
noexcept
403 : impl_(data), stream_(&stream) {}
405template <
typename RPC>
407 if (
auto*
const data = impl_.GetData()) {
408 impl::RpcData::AsyncMethodInvocationGuard guard(*data);
409 const auto wait_status =
410 impl::Wait(data->GetAsyncMethodInvocation(), data->GetContext());
411 if (wait_status != impl::AsyncMethodInvocation::WaitStatus::kOk) {
412 if (wait_status == impl::AsyncMethodInvocation::WaitStatus::kCancelled) {
413 data->GetStatsScope().OnCancelled();
415 impl::Finish(*stream_, *data,
false);
420template <
typename RPC>
423 if (
this == &other)
return *
this;
424 [[maybe_unused]]
auto for_destruction = std::move(*
this);
425 impl_ = std::move(other.impl_);
426 stream_ = other.stream_;
430template <
typename RPC>
432 auto*
const data = impl_.GetData();
433 UINVARIANT(data,
"'Get' must be called only once");
434 impl::RpcData::AsyncMethodInvocationGuard guard(*data);
437 impl::Wait(data->GetAsyncMethodInvocation(), data->GetContext());
438 if (result == impl::AsyncMethodInvocation::WaitStatus::kCancelled) {
439 data->GetStatsScope().OnCancelled();
440 data->GetStatsScope().Flush();
442 if (result == impl::AsyncMethodInvocation::WaitStatus::kError) {
445 impl::Finish(*stream_, *data,
true);
447 return result == impl::AsyncMethodInvocation::WaitStatus::kOk;
450template <
typename RPC>
452 return impl_.IsReady();
456template <
typename Stub,
typename Request>
457UnaryCall<Response>::UnaryCall(
458 impl::CallParams&& params, Stub& stub,
459 impl::RawResponseReaderPreparer<Stub, Request, Response> prepare_func,
462 impl::CallMiddlewares(
463 GetData().GetMiddlewares(), *
this,
465 reader_ = (stub.*prepare_func)(&GetData().GetContext(), req,
466 &GetData().GetQueue());
467 reader_->StartCall();
470 GetData().SetWritesFinished();
473template <
typename Response>
481template <
typename Response>
484 PrepareFinish(GetData());
485 GetData().EmplaceFinishAsyncMethodInvocation();
486 auto& finish = GetData().GetFinishAsyncMethodInvocation();
487 auto& status = GetData().GetStatus();
488 reader_->Finish(&response, &status, finish.GetTag());
493template <
typename Stub,
typename Request>
494InputStream<Response>::InputStream(
495 impl::CallParams&& params, Stub& stub,
496 impl::RawReaderPreparer<Stub, Request, Response> prepare_func,
499 impl::CallMiddlewares(
500 GetData().GetMiddlewares(), *
this,
502 stream_ = (stub.*prepare_func)(&GetData().GetContext(), req,
503 &GetData().GetQueue());
504 impl::StartCall(*stream_, GetData());
507 GetData().SetWritesFinished();
510template <
typename Response>
511bool InputStream<Response>::
Read(Response& response) {
512 if (impl::Read(*stream_, response, GetData())) {
517 impl::Finish(*stream_, GetData(),
true);
523template <
typename Stub>
524OutputStream<Request, Response>::OutputStream(
525 impl::CallParams&& params, Stub& stub,
526 impl::RawWriterPreparer<Stub, Request, Response> prepare_func)
528 final_response_(std::make_unique<Response>()) {
529 impl::CallMiddlewares(
530 GetData().GetMiddlewares(), *
this,
534 (stub.*prepare_func)(&GetData().GetContext(), final_response_.get(),
535 &GetData().GetQueue());
536 impl::StartCall(*stream_, GetData());
541template <
typename Request,
typename Response>
542bool OutputStream<Request, Response>::
Write(
const Request& request) {
545 grpc::WriteOptions write_options{};
547 return impl::Write(*stream_, request, write_options, GetData());
550template <
typename Request,
typename Response>
551void OutputStream<Request, Response>::
WriteAndCheck(
const Request& request) {
554 grpc::WriteOptions write_options{};
556 if (!impl::Write(*stream_, request, write_options, GetData())) {
557 impl::Finish(*stream_, GetData(),
true);
561template <
typename Request,
typename Response>
562Response OutputStream<Request, Response>::
Finish() {
565 if (!GetData().AreWritesFinished()) {
566 impl::WritesDone(*stream_, GetData());
569 impl::Finish(*stream_, GetData(),
true);
571 return std::move(*final_response_);
575template <
typename Stub>
576BidirectionalStream<Request, Response>::BidirectionalStream(
577 impl::CallParams&& params, Stub& stub,
578 impl::RawReaderWriterPreparer<Stub, Request, Response> prepare_func)
580 impl::CallMiddlewares(
581 GetData().GetMiddlewares(), *
this,
583 stream_ = (stub.*prepare_func)(&GetData().GetContext(),
584 &GetData().GetQueue());
585 impl::StartCall(*stream_, GetData());
590template <
typename Request,
typename Response>
592BidirectionalStream<Request, Response>::
ReadAsync(Response& response)
noexcept {
593 impl::ReadAsync(*stream_, response, GetData());
594 return StreamReadFuture<BidirectionalStream<Request, Response>>{GetData(),
598template <
typename Request,
typename Response>
599bool BidirectionalStream<Request, Response>::
Read(Response& response) {
604template <
typename Request,
typename Response>
605bool BidirectionalStream<Request, Response>::
Write(
const Request& request) {
607 grpc::WriteOptions write_options{};
609 return impl::Write(*stream_, request, write_options, GetData());
612template <
typename Request,
typename Response>
614 const Request& request) {
616 grpc::WriteOptions write_options{};
618 impl::WriteAndCheck(*stream_, request, write_options, GetData());
621template <
typename Request,
typename Response>
623 return impl::WritesDone(*stream_, GetData());