userver: userver/ugrpc/client/stream_read_future.hpp Source File
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages Concepts
stream_read_future.hpp
Go to the documentation of this file.
1#pragma once
2
3/// @file userver/ugrpc/client/stream_read_future.hpp
4/// @brief @copybrief ugrpc::client::StreamReadFuture
5
6#include <utility>
7
8#include <google/protobuf/message.h>
9
10#include <userver/utils/assert.hpp>
11
12#include <userver/ugrpc/client/impl/async_stream_methods.hpp>
13#include <userver/ugrpc/client/impl/call_state.hpp>
14#include <userver/ugrpc/client/impl/middleware_pipeline.hpp>
15
16USERVER_NAMESPACE_BEGIN
17
18namespace ugrpc::client {
19
20/// @brief StreamReadFuture for waiting a single read response from stream
21template <typename RawStream>
22class [[nodiscard]] StreamReadFuture {
23public:
24 /// @cond
25 StreamReadFuture(
26 impl::StreamingCallState& state,
27 RawStream& stream,
28 const google::protobuf::Message* recv_message
29 ) noexcept;
30 /// @endcond
31
32 StreamReadFuture(StreamReadFuture&& other) noexcept;
33 StreamReadFuture& operator=(StreamReadFuture&& other) noexcept;
34 StreamReadFuture(const StreamReadFuture&) = delete;
35 StreamReadFuture& operator=(const StreamReadFuture&) = delete;
36
37 ~StreamReadFuture();
38
39 /// @brief Await response
40 ///
41 /// Upon completion the result is available in `response` that was
42 /// specified when initiating the asynchronous read
43 ///
44 /// `Get` should not be called multiple times for the same StreamReadFuture.
45 ///
46 /// @throws ugrpc::client::RpcError on an RPC error
47 /// @throws ugrpc::client::RpcCancelledError on task cancellation
48 bool Get();
49
50 /// @brief Checks if the asynchronous call has completed
51 /// Note, that once user gets result, IsReady should not be called
52 /// @return true if result ready
53 [[nodiscard]] bool IsReady() const noexcept;
54
55private:
56 impl::StreamingCallState* state_{};
57 RawStream* stream_{};
58 const google::protobuf::Message* recv_message_;
59};
60
61template <typename RawStream>
62StreamReadFuture<RawStream>::StreamReadFuture(
63 impl::StreamingCallState& state,
64 RawStream& stream,
65 const google::protobuf::Message* recv_message
66) noexcept
67 : state_{&state}, stream_{&stream}, recv_message_{recv_message} {}
68
69template <typename RawStream>
70StreamReadFuture<RawStream>::StreamReadFuture(StreamReadFuture&& other) noexcept
71 // state_ == nullptr signals that *this is empty. Other fields may remain garbage in `other`.
72 : state_{std::exchange(other.state_, nullptr)}, stream_{other.stream_}, recv_message_{other.recv_message_} {}
73
74template <typename RawStream>
75StreamReadFuture<RawStream>& StreamReadFuture<RawStream>::operator=(StreamReadFuture&& other) noexcept {
76 if (this == &other) return *this;
77 [[maybe_unused]] auto for_destruction = std::move(*this);
78 // state_ == nullptr signals that *this is empty. Other fields may remain garbage in `other`.
79 state_ = std::exchange(other.state_, nullptr);
80 stream_ = other.stream_;
81 recv_message_ = other.recv_message_;
82 return *this;
83}
84
85template <typename RawStream>
86StreamReadFuture<RawStream>::~StreamReadFuture() {
87 if (state_) {
88 // StreamReadFuture::Get wasn't called => finish RPC.
89 impl::FinishAbandoned(*stream_, *state_);
90 }
91}
92
93template <typename RawStream>
94bool StreamReadFuture<RawStream>::Get() {
95 UINVARIANT(state_, "'Get' must be called only once");
96 const impl::StreamingCallState::AsyncMethodInvocationGuard guard(*state_);
97 auto* const state = std::exchange(state_, nullptr);
98 const auto result = impl::WaitAndTryCancelIfNeeded(state->GetAsyncMethodInvocation(), state->GetClientContext());
99 if (result == ugrpc::impl::AsyncMethodInvocation::WaitStatus::kCancelled) {
100 state->GetStatsScope().OnCancelled();
101 state->GetStatsScope().Flush();
102 } else if (result == ugrpc::impl::AsyncMethodInvocation::WaitStatus::kError) {
103 // Finish can only be called once all the data is read, otherwise the
104 // underlying gRPC driver hangs.
105 impl::Finish(*stream_, *state, /*final_response=*/nullptr, /*throw_on_error=*/true);
106 } else {
107 if (recv_message_) {
108 RunMiddlewarePipeline(*state, impl::RecvMessageHooks(*recv_message_));
109 }
110 }
111 return result == ugrpc::impl::AsyncMethodInvocation::WaitStatus::kOk;
112}
113
114template <typename RawStream>
115bool StreamReadFuture<RawStream>::IsReady() const noexcept {
116 UINVARIANT(state_, "IsReady should be called only before 'Get'");
117 auto& method = state_->GetAsyncMethodInvocation();
118 return method.IsReady();
119}
120
121} // namespace ugrpc::client
122
123USERVER_NAMESPACE_END