userver: userver/ugrpc/client/stream.hpp Source File
Loading...
Searching...
No Matches
stream.hpp
Go to the documentation of this file.
1#pragma once
2
3/// @file userver/ugrpc/client/stream.hpp
4/// @brief Client streaming interfaces
5
6#include <memory>
7
8#include <userver/ugrpc/client/impl/rpc.hpp>
9
10USERVER_NAMESPACE_BEGIN
11
12namespace ugrpc::client {
13
14/// @brief Client-side interface for server-streaming
15///
16/// This class is not thread-safe except for `GetContext`.
17///
18/// The RPC should be completed by reading until @ref ugrpc::client::Reader::Read returns `false`.
19/// If destroyed early, the RPC is cancelled. The server gets @ref ugrpc::client::RpcInterruptedError
20/// and the `abandoned-error` metric is incremented. The connection stays open for reuse.
21/// gRPC provides no way to early-close a server-streaming RPC gracefully.
22/// See @ref ugrpc::client::ReadRemainingAndFinish for graceful completion.
23template <class Response>
24class [[nodiscard]] Reader final {
25public:
26 /// @cond
27 // For internal use only
28 template <typename Stub, typename Request>
29 Reader(
30 impl::CallParams&& params,
31 impl::PrepareServerStreamingCall<Stub, Request, Response> prepare_async_method,
32 const Request& request
33 )
34 : stream_(std::make_unique<
35 impl::InputStream<Response>>(std::move(params), std::move(prepare_async_method), request))
36 {}
37 /// @endcond
38
39 Reader(Reader&&) noexcept = default;
40 Reader& operator=(Reader&&) noexcept = default;
41
42 /// @brief Await and read the next incoming message
43 ///
44 /// On end-of-input, `Finish` is called automatically.
45 ///
46 /// @param response where to put response on success
47 /// @returns `true` on success, `false` on end-of-input, task cancellation,
48 // or if the stream is already closed for reads
49 /// @throws ugrpc::client::RpcError on an RPC error
50 [[nodiscard]] bool Read(Response& response) { return stream_->Read(response); }
51
52 /// @brief Get call context, useful e.g. for accessing metadata.
53 CallContext& GetContext() { return stream_->GetContext(); }
54 /// @overload
55 const CallContext& GetContext() const { return stream_->GetContext(); }
56
57private:
58 std::unique_ptr<impl::InputStream<Response>> stream_;
59};
60
61/// @brief Client-side interface for client-streaming
62///
63/// This class is not thread-safe except for `GetContext`.
64///
65/// The RPC should be completed by calling @ref ugrpc::client::Writer::Finish.
66/// If destroyed early, the RPC is cancelled. The server gets @ref ugrpc::client::RpcInterruptedError
67/// and the `abandoned-error` metric is incremented. When properly finished, the connection stays open for reuse.
68template <typename Request, typename Response>
69class [[nodiscard]] Writer final {
70public:
71 /// @cond
72 // For internal use only
73 template <typename Stub>
74 Writer(impl::CallParams&& params, impl::PrepareClientStreamingCall<Stub, Request, Response> prepare_async_method)
75 : stream_(std::make_unique<
76 impl::OutputStream<Request, Response>>(std::move(params), std::move(prepare_async_method)))
77 {}
78 /// @endcond
79
80 Writer(Writer&&) noexcept = default;
81 Writer& operator=(Writer&&) noexcept = default;
82
83 /// @brief Write the next outgoing message
84 ///
85 /// `Write` doesn't store any references to `request`, so it can be
86 /// deallocated right after the call.
87 ///
88 /// @param request the next message to write
89 /// @return true if the data is going to the wire; false if the write
90 /// operation failed (including due to task cancellation,
91 // or if the stream is already closed for writes),
92 /// in which case no more writes will be accepted,
93 /// and the error details can be fetched from Finish
94 [[nodiscard]] bool Write(const Request& request) { return stream_->Write(request); }
95
96 /// @brief Write the next outgoing message and check result
97 ///
98 /// `WriteAndCheck` doesn't store any references to `request`, so it can be
99 /// deallocated right after the call.
100 ///
101 /// `WriteAndCheck` verifies result of the write and generates exception
102 /// in case of issues.
103 ///
104 /// @param request the next message to write
105 /// @throws ugrpc::client::RpcError on an RPC error
106 /// @throws ugrpc::client::RpcCancelledError on task cancellation
107 /// @throws ugrpc::client::RpcError if the stream is already closed for writes
108 void WriteAndCheck(const Request& request) { stream_->WriteAndCheck(request); }
109
110 /// @brief Complete the RPC successfully
111 ///
112 /// Should be called once all the data is written. The server will then
113 /// send a single `Response`.
114 ///
115 /// `Finish` should not be called multiple times.
116 ///
117 /// The connection is not closed, it will be reused for new RPCs.
118 ///
119 /// @returns the single `Response` received after finishing the writes
120 /// @throws ugrpc::client::RpcError on an RPC error
121 /// @throws ugrpc::client::RpcCancelledError on task cancellation
122 Response Finish() { return stream_->Finish(); }
123
124 /// @brief Get call context, useful e.g. for accessing metadata.
125 CallContext& GetContext() { return stream_->GetContext(); }
126 /// @overload
127 const CallContext& GetContext() const { return stream_->GetContext(); }
128
129private:
130 std::unique_ptr<impl::OutputStream<Request, Response>> stream_;
131};
132
133/// @brief Client-side interface for bi-directional streaming
134///
135/// It is safe to call the following methods from different coroutines:
136///
137/// - `GetContext`;
138/// - one of (`Read`, `ReadAsync`);
139/// - one of (`Write`, `WritesDone`).
140///
141/// `WriteAndCheck` is NOT thread-safe.
142///
143/// The RPC should be completed by reading until @ref ugrpc::client::Reader::Read returns `false`.
144/// If destroyed early, the RPC is cancelled. The server gets @ref ugrpc::client::RpcInterruptedError
145/// and the `abandoned-error` metric is incremented. The connection stays open for reuse.
146/// gRPC provides no way to early-close a server-streaming RPC gracefully.
147// See @ref ugrpc::client::ReadRemainingAndFinish and @ref ugrpc::client::PingPongFinish for graceful completion.
148///
149/// `Read` and `AsyncRead` can throw if error status is received from server.
150/// User MUST NOT call `Read` or `AsyncRead` again after failure of any of these
151/// operations.
152///
153/// `Write` and `WritesDone` methods do not throw, but indicate issues with
154/// the RPC by returning `false`.
155///
156/// `WriteAndCheck` is intended for ping-pong scenarios, when after write
157/// operation the user calls `Read` and vice versa.
158///
159/// If `Write` or `WritesDone` returns negative result, the user MUST NOT call
160/// any of these methods anymore.
161/// Instead the user SHOULD call `Read` method until the end of input. If
162/// `Write` or `WritesDone` finishes with negative result, finally `Read`
163/// will throw an exception.
164/// ## Usage example:
165///
166/// @snippet grpc/tests/stream_test.cpp concurrent bidirectional stream
167///
168template <typename Request, typename Response>
169class [[nodiscard]] ReaderWriter final {
170public:
171 using StreamReadFuture = ugrpc::client::StreamReadFuture<
172 typename impl::BidirectionalStream<Request, Response>::RawStream>;
173
174 /// @cond
175 // For internal use only
176 template <typename Stub>
177 ReaderWriter(
178 impl::CallParams&& params,
179 impl::PrepareBidiStreamingCall<Stub, Request, Response> prepare_async_method
180 )
181 : stream_(std::make_unique<
182 impl::BidirectionalStream<Request, Response>>(std::move(params), std::move(prepare_async_method)))
183 {}
184 /// @endcond
185
186 ReaderWriter(ReaderWriter&&) noexcept = default;
187 ReaderWriter& operator=(ReaderWriter&&) noexcept = default;
188
189 /// @brief Await and read the next incoming message
190 ///
191 /// On end-of-input, `Finish` is called automatically.
192 ///
193 /// @param response where to put response on success
194 /// @returns `true` on success, `false` on end-of-input, task cancellation,
195 /// or if the stream is already closed for reads
196 /// @throws ugrpc::client::RpcError on an RPC error
197 [[nodiscard]] bool Read(Response& response) { return stream_->Read(response); }
198
199 /// @brief Return future to read next incoming result
200 ///
201 /// @param response where to put response on success
202 /// @return StreamReadFuture future
203 /// @throws ugrpc::client::RpcError on an RPC error
204 /// @throws ugrpc::client::RpcError if the stream is already closed for reads
205 StreamReadFuture ReadAsync(Response& response) { return stream_->ReadAsync(response); }
206
207 /// @brief Write the next outgoing message
208 ///
209 /// RPC will be performed immediately. No references to `request` are
210 /// saved, so it can be deallocated right after the call.
211 ///
212 /// @param request the next message to write
213 /// @return true if the data is going to the wire; false if the write
214 /// operation failed (including due to task cancellation,
215 // or if the stream is already closed for writes),
216 /// in which case no more writes will be accepted,
217 /// but Read may still have some data and status code available
218 [[nodiscard]] bool Write(const Request& request) { return stream_->Write(request); }
219
220 /// @brief Write the next outgoing message and check result
221 ///
222 /// `WriteAndCheck` doesn't store any references to `request`, so it can be
223 /// deallocated right after the call.
224 ///
225 /// `WriteAndCheck` verifies result of the write and generates exception
226 /// in case of issues.
227 ///
228 /// @param request the next message to write
229 /// @throws ugrpc::client::RpcError on an RPC error
230 /// @throws ugrpc::client::RpcCancelledError on task cancellation
231 /// @throws ugrpc::client::RpcError if the stream is already closed for writes
232 void WriteAndCheck(const Request& request) { stream_->WriteAndCheck(request); }
233
234 /// @brief Announce end-of-output to the server
235 ///
236 /// Should be called to notify the server and receive the final response(s).
237 ///
238 /// @return true if the data is going to the wire; false if the operation
239 /// failed (including if the stream is already closed for writes),
240 /// but Read may still have some data and status code available
241 [[nodiscard]] bool WritesDone() { return stream_->WritesDone(); }
242
243 /// @brief Get call context, useful e.g. for accessing metadata.
244 CallContext& GetContext() { return stream_->GetContext(); }
245 /// @overload
246 const CallContext& GetContext() const { return stream_->GetContext(); }
247
248private:
249 std::unique_ptr<impl::BidirectionalStream<Request, Response>> stream_;
250};
251
252} // namespace ugrpc::client
253
254USERVER_NAMESPACE_END