8#include <google/protobuf/message.h>
10#include <userver/utils/assert.hpp>
12#include <userver/ugrpc/client/impl/async_stream_methods.hpp>
13#include <userver/ugrpc/client/impl/call_state.hpp>
14#include <userver/ugrpc/client/impl/middleware_pipeline.hpp>
16USERVER_NAMESPACE_BEGIN
18namespace ugrpc::client {
21template <
typename RawStream>
25 StreamReadFuture(impl::StreamingCallState& state, RawStream& stream,
const google::protobuf::Message* recv_message)
53 impl::StreamingCallState* state_{};
55 const google::protobuf::Message* recv_message_;
58template <
typename RawStream>
60 impl::StreamingCallState& state,
62 const google::protobuf::Message* recv_message
64 : state_{&state}, stream_{&stream}, recv_message_{recv_message} {}
66template <
typename RawStream>
69 : state_{std::exchange(other.state_,
nullptr)},
70 stream_{other.stream_},
71 recv_message_{other.recv_message_}
74template <
typename RawStream>
79 [[maybe_unused]]
auto for_destruction = std::move(*
this);
81 state_ = std::exchange(other.state_,
nullptr);
82 stream_ = other.stream_;
83 recv_message_ = other.recv_message_;
87template <
typename RawStream>
91 impl::FinishAbandoned(*stream_, *state_);
95template <
typename RawStream>
97 UINVARIANT(state_,
"'Get' must be called only once");
98 const impl::StreamingCallState::AsyncMethodInvocationGuard guard(*state_);
99 auto*
const state = std::exchange(state_,
nullptr);
100 const auto result = impl::WaitAndTryCancelIfNeeded(state->GetAsyncMethodInvocation(), state->GetClientContext());
101 if (result == ugrpc::impl::AsyncMethodInvocation::WaitStatus::kCancelled) {
102 state->GetStatsScope().OnCancelled();
103 state->GetStatsScope().Flush();
104 }
else if (result == ugrpc::impl::AsyncMethodInvocation::WaitStatus::kError) {
107 impl::Finish(*stream_, *state,
nullptr,
true);
110 RunMiddlewarePipeline(*state, impl::RecvMessageHooks(*recv_message_));
113 return result == ugrpc::impl::AsyncMethodInvocation::WaitStatus::kOk;
116template <
typename RawStream>
118 UINVARIANT(state_,
"IsReady should be called only before 'Get'");
119 auto& method = state_->GetAsyncMethodInvocation();
120 return method.IsReady();