userver: userver/ugrpc/client/stream_read_future.hpp Source File
Loading...
Searching...
No Matches
stream_read_future.hpp
Go to the documentation of this file.
1#pragma once
2
3/// @file userver/ugrpc/client/stream_read_future.hpp
4/// @brief @copybrief ugrpc::client::StreamReadFuture
5
6#include <utility>
7
8#include <google/protobuf/message.h>
9
10#include <userver/utils/assert.hpp>
11
12#include <userver/ugrpc/client/impl/async_stream_methods.hpp>
13#include <userver/ugrpc/client/impl/call_state.hpp>
14#include <userver/ugrpc/client/impl/middleware_pipeline.hpp>
15
16USERVER_NAMESPACE_BEGIN
17
18namespace ugrpc::client {
19
20/// @brief StreamReadFuture for waiting a single read response from stream
21template <typename RawStream>
22class [[nodiscard]] StreamReadFuture {
23public:
24 /// @cond
25 StreamReadFuture(impl::StreamingCallState& state, RawStream& stream, const google::protobuf::Message* recv_message)
26 noexcept;
27 /// @endcond
28
29 StreamReadFuture(StreamReadFuture&& other) noexcept;
30 StreamReadFuture& operator=(StreamReadFuture&& other) noexcept;
31 StreamReadFuture(const StreamReadFuture&) = delete;
32 StreamReadFuture& operator=(const StreamReadFuture&) = delete;
33
34 ~StreamReadFuture();
35
36 /// @brief Await response
37 ///
38 /// Upon completion the result is available in `response` that was
39 /// specified when initiating the asynchronous read
40 ///
41 /// `Get` should not be called multiple times for the same StreamReadFuture.
42 ///
43 /// @throws ugrpc::client::RpcError on an RPC error
44 /// @throws ugrpc::client::RpcCancelledError on task cancellation
45 bool Get();
46
47 /// @brief Checks if the asynchronous call has completed
48 /// Note, that once user gets result, IsReady should not be called
49 /// @return true if result ready
50 [[nodiscard]] bool IsReady() const noexcept;
51
52private:
53 impl::StreamingCallState* state_{};
54 RawStream* stream_{};
55 const google::protobuf::Message* recv_message_;
56};
57
58template <typename RawStream>
59StreamReadFuture<RawStream>::StreamReadFuture(
60 impl::StreamingCallState& state,
61 RawStream& stream,
62 const google::protobuf::Message* recv_message
63) noexcept
64 : state_{&state}, stream_{&stream}, recv_message_{recv_message} {}
65
66template <typename RawStream>
67StreamReadFuture<RawStream>::StreamReadFuture(StreamReadFuture&& other) noexcept
68 // state_ == nullptr signals that *this is empty. Other fields may remain garbage in `other`.
69 : state_{std::exchange(other.state_, nullptr)},
70 stream_{other.stream_},
71 recv_message_{other.recv_message_}
72{}
73
74template <typename RawStream>
75StreamReadFuture<RawStream>& StreamReadFuture<RawStream>::operator=(StreamReadFuture&& other) noexcept {
76 if (this == &other) {
77 return *this;
78 }
79 [[maybe_unused]] auto for_destruction = std::move(*this);
80 // state_ == nullptr signals that *this is empty. Other fields may remain garbage in `other`.
81 state_ = std::exchange(other.state_, nullptr);
82 stream_ = other.stream_;
83 recv_message_ = other.recv_message_;
84 return *this;
85}
86
87template <typename RawStream>
88StreamReadFuture<RawStream>::~StreamReadFuture() {
89 if (state_) {
90 // StreamReadFuture::Get wasn't called => finish RPC.
91 impl::FinishAbandoned(*stream_, *state_);
92 }
93}
94
95template <typename RawStream>
96bool StreamReadFuture<RawStream>::Get() {
97 UINVARIANT(state_, "'Get' must be called only once");
98 const impl::StreamingCallState::AsyncMethodInvocationGuard guard(*state_);
99 auto* const state = std::exchange(state_, nullptr);
100 const auto result = impl::WaitAndTryCancelIfNeeded(state->GetAsyncMethodInvocation(), state->GetClientContext());
101 if (result == ugrpc::impl::AsyncMethodInvocation::WaitStatus::kCancelled) {
102 state->GetStatsScope().OnCancelled();
103 state->GetStatsScope().Flush();
104 } else if (result == ugrpc::impl::AsyncMethodInvocation::WaitStatus::kError) {
105 // Finish can only be called once all the data is read, otherwise the
106 // underlying gRPC driver hangs.
107 impl::Finish(*stream_, *state, /*final_response=*/nullptr, /*throw_on_error=*/true);
108 } else {
109 if (recv_message_) {
110 RunMiddlewarePipeline(*state, impl::RecvMessageHooks(*recv_message_));
111 }
112 }
113 return result == ugrpc::impl::AsyncMethodInvocation::WaitStatus::kOk;
114}
115
116template <typename RawStream>
117bool StreamReadFuture<RawStream>::IsReady() const noexcept {
118 UINVARIANT(state_, "IsReady should be called only before 'Get'");
119 auto& method = state_->GetAsyncMethodInvocation();
120 return method.IsReady();
121}
122
123} // namespace ugrpc::client
124
125USERVER_NAMESPACE_END