userver: userver/ugrpc/client/stream.hpp Source File
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages Concepts
stream.hpp
Go to the documentation of this file.
1#pragma once
2
3/// @file userver/ugrpc/client/stream.hpp
4/// @brief Client streaming interfaces
5
6#include <memory>
7
8#include <userver/ugrpc/client/impl/rpc.hpp>
9
10USERVER_NAMESPACE_BEGIN
11
12namespace ugrpc::client {
13
14/// @brief Client-side interface for server-streaming
15///
16/// This class is not thread-safe except for `GetContext`.
17///
18/// The RPC is cancelled on destruction unless the stream is closed (`Read` has
19/// returned `false`). In that case the connection is not closed (it will be
20/// reused for new RPCs), and the server receives `RpcInterruptedError`
21/// immediately. gRPC provides no way to early-close a server-streaming RPC
22/// gracefully.
23template <class Response>
24class [[nodiscard]] Reader final {
25public:
26 /// @cond
27 // For internal use only
28 template <typename Stub, typename Request>
29 Reader(
30 impl::CallParams&& params,
31 impl::PrepareServerStreamingCall<Stub, Request, Response> prepare_async_method,
32 const Request& request
33 )
34 : stream_(
35 std::make_unique<impl::InputStream<Response>>(std::move(params), std::move(prepare_async_method), request)
36 ) {}
37 /// @endcond
38
39 Reader(Reader&&) noexcept = default;
40 Reader& operator=(Reader&&) noexcept = default;
41
42 /// @brief Await and read the next incoming message
43 ///
44 /// On end-of-input, `Finish` is called automatically.
45 ///
46 /// @param response where to put response on success
47 /// @returns `true` on success, `false` on end-of-input, task cancellation,
48 // or if the stream is already closed for reads
49 /// @throws ugrpc::client::RpcError on an RPC error
50 [[nodiscard]] bool Read(Response& response) { return stream_->Read(response); }
51
52 /// @brief Get call context, useful e.g. for accessing metadata.
53 CallContext& GetContext() { return stream_->GetContext(); }
54 /// @overload
55 const CallContext& GetContext() const { return stream_->GetContext(); }
56
57private:
58 std::unique_ptr<impl::InputStream<Response>> stream_;
59};
60
61/// @brief Client-side interface for client-streaming
62///
63/// This class is not thread-safe except for `GetContext`.
64///
65/// The RPC is cancelled on destruction unless `Finish` has been called. In that
66/// case the connection is not closed (it will be reused for new RPCs), and the
67/// server receives `RpcInterruptedError` immediately.
68template <typename Request, typename Response>
69class [[nodiscard]] Writer final {
70public:
71 /// @cond
72 // For internal use only
73 template <typename Stub>
74 Writer(impl::CallParams&& params, impl::PrepareClientStreamingCall<Stub, Request, Response> prepare_async_method)
75 : stream_(std::make_unique<impl::OutputStream<Request, Response>>(
76 std::move(params),
77 std::move(prepare_async_method)
78 )) {}
79 /// @endcond
80
81 Writer(Writer&&) noexcept = default;
82 Writer& operator=(Writer&&) noexcept = default;
83
84 /// @brief Write the next outgoing message
85 ///
86 /// `Write` doesn't store any references to `request`, so it can be
87 /// deallocated right after the call.
88 ///
89 /// @param request the next message to write
90 /// @return true if the data is going to the wire; false if the write
91 /// operation failed (including due to task cancellation,
92 // or if the stream is already closed for writes),
93 /// in which case no more writes will be accepted,
94 /// and the error details can be fetched from Finish
95 [[nodiscard]] bool Write(const Request& request) { return stream_->Write(request); }
96
97 /// @brief Write the next outgoing message and check result
98 ///
99 /// `WriteAndCheck` doesn't store any references to `request`, so it can be
100 /// deallocated right after the call.
101 ///
102 /// `WriteAndCheck` verifies result of the write and generates exception
103 /// in case of issues.
104 ///
105 /// @param request the next message to write
106 /// @throws ugrpc::client::RpcError on an RPC error
107 /// @throws ugrpc::client::RpcCancelledError on task cancellation
108 /// @throws ugrpc::client::RpcError if the stream is already closed for writes
109 void WriteAndCheck(const Request& request) { stream_->WriteAndCheck(request); }
110
111 /// @brief Complete the RPC successfully
112 ///
113 /// Should be called once all the data is written. The server will then
114 /// send a single `Response`.
115 ///
116 /// `Finish` should not be called multiple times.
117 ///
118 /// The connection is not closed, it will be reused for new RPCs.
119 ///
120 /// @returns the single `Response` received after finishing the writes
121 /// @throws ugrpc::client::RpcError on an RPC error
122 /// @throws ugrpc::client::RpcCancelledError on task cancellation
123 Response Finish() { return stream_->Finish(); }
124
125 /// @brief Get call context, useful e.g. for accessing metadata.
126 CallContext& GetContext() { return stream_->GetContext(); }
127 /// @overload
128 const CallContext& GetContext() const { return stream_->GetContext(); }
129
130private:
131 std::unique_ptr<impl::OutputStream<Request, Response>> stream_;
132};
133
134/// @brief Client-side interface for bi-directional streaming
135///
136/// It is safe to call the following methods from different coroutines:
137///
138/// - `GetContext`;
139/// - one of (`Read`, `ReadAsync`);
140/// - one of (`Write`, `WritesDone`).
141///
142/// `WriteAndCheck` is NOT thread-safe.
143///
144/// The RPC is cancelled on destruction unless the stream is closed (`Read` has
145/// returned `false`). In that case the connection is not closed (it will be
146/// reused for new RPCs), and the server receives `RpcInterruptedError`
147/// immediately. gRPC provides no way to early-close a server-streaming RPC
148/// gracefully.
149///
150/// `Read` and `AsyncRead` can throw if error status is received from server.
151/// User MUST NOT call `Read` or `AsyncRead` again after failure of any of these
152/// operations.
153///
154/// `Write` and `WritesDone` methods do not throw, but indicate issues with
155/// the RPC by returning `false`.
156///
157/// `WriteAndCheck` is intended for ping-pong scenarios, when after write
158/// operation the user calls `Read` and vice versa.
159///
160/// If `Write` or `WritesDone` returns negative result, the user MUST NOT call
161/// any of these methods anymore.
162/// Instead the user SHOULD call `Read` method until the end of input. If
163/// `Write` or `WritesDone` finishes with negative result, finally `Read`
164/// will throw an exception.
165/// ## Usage example:
166///
167/// @snippet grpc/tests/stream_test.cpp concurrent bidirectional stream
168///
169template <typename Request, typename Response>
170class [[nodiscard]] ReaderWriter final {
171public:
172 using StreamReadFuture =
173 ugrpc::client::StreamReadFuture<typename impl::BidirectionalStream<Request, Response>::RawStream>;
174
175 /// @cond
176 // For internal use only
177 template <typename Stub>
178 ReaderWriter(
179 impl::CallParams&& params,
180 impl::PrepareBidiStreamingCall<Stub, Request, Response> prepare_async_method
181 )
182 : stream_(std::make_unique<impl::BidirectionalStream<Request, Response>>(
183 std::move(params),
184 std::move(prepare_async_method)
185 )) {}
186 /// @endcond
187
188 ReaderWriter(ReaderWriter&&) noexcept = default;
189 ReaderWriter& operator=(ReaderWriter&&) noexcept = default;
190
191 /// @brief Await and read the next incoming message
192 ///
193 /// On end-of-input, `Finish` is called automatically.
194 ///
195 /// @param response where to put response on success
196 /// @returns `true` on success, `false` on end-of-input, task cancellation,
197 /// or if the stream is already closed for reads
198 /// @throws ugrpc::client::RpcError on an RPC error
199 [[nodiscard]] bool Read(Response& response) { return stream_->Read(response); }
200
201 /// @brief Return future to read next incoming result
202 ///
203 /// @param response where to put response on success
204 /// @return StreamReadFuture future
205 /// @throws ugrpc::client::RpcError on an RPC error
206 /// @throws ugrpc::client::RpcError if the stream is already closed for reads
207 StreamReadFuture ReadAsync(Response& response) { return stream_->ReadAsync(response); }
208
209 /// @brief Write the next outgoing message
210 ///
211 /// RPC will be performed immediately. No references to `request` are
212 /// saved, so it can be deallocated right after the call.
213 ///
214 /// @param request the next message to write
215 /// @return true if the data is going to the wire; false if the write
216 /// operation failed (including due to task cancellation,
217 // or if the stream is already closed for writes),
218 /// in which case no more writes will be accepted,
219 /// but Read may still have some data and status code available
220 [[nodiscard]] bool Write(const Request& request) { return stream_->Write(request); }
221
222 /// @brief Write the next outgoing message and check result
223 ///
224 /// `WriteAndCheck` doesn't store any references to `request`, so it can be
225 /// deallocated right after the call.
226 ///
227 /// `WriteAndCheck` verifies result of the write and generates exception
228 /// in case of issues.
229 ///
230 /// @param request the next message to write
231 /// @throws ugrpc::client::RpcError on an RPC error
232 /// @throws ugrpc::client::RpcCancelledError on task cancellation
233 /// @throws ugrpc::client::RpcError if the stream is already closed for writes
234 void WriteAndCheck(const Request& request) { stream_->WriteAndCheck(request); }
235
236 /// @brief Announce end-of-output to the server
237 ///
238 /// Should be called to notify the server and receive the final response(s).
239 ///
240 /// @return true if the data is going to the wire; false if the operation
241 /// failed (including if the stream is already closed for writes),
242 /// but Read may still have some data and status code available
243 [[nodiscard]] bool WritesDone() { return stream_->WritesDone(); }
244
245 /// @brief Get call context, useful e.g. for accessing metadata.
246 CallContext& GetContext() { return stream_->GetContext(); }
247 /// @overload
248 const CallContext& GetContext() const { return stream_->GetContext(); }
249
250private:
251 std::unique_ptr<impl::BidirectionalStream<Request, Response>> stream_;
252};
253
254} // namespace ugrpc::client
255
256USERVER_NAMESPACE_END