userver: userver/ugrpc/client/stream.hpp Source File
Loading...
Searching...
No Matches
stream.hpp
Go to the documentation of this file.
1#pragma once
2
3/// @file userver/ugrpc/client/stream.hpp
4/// @brief Client streaming interfaces
5
6#include <memory>
7#include <utility>
8
9#include <userver/ugrpc/client/impl/rpc.hpp>
10#include <userver/ugrpc/client/stream_read_future.hpp>
11
12USERVER_NAMESPACE_BEGIN
13
14namespace ugrpc::client {
15
16/// @brief Client-side interface for server-streaming
17///
18/// This class is not thread-safe except for `GetContext`.
19///
20/// The RPC should be completed by reading until @ref ugrpc::client::Reader::Read returns `false`.
21/// If destroyed early, the RPC is cancelled. The server gets @ref ugrpc::client::RpcInterruptedError
22/// and the `abandoned-error` metric is incremented. The connection stays open for reuse.
23/// gRPC provides no way to early-close a server-streaming RPC gracefully.
24/// See @ref ugrpc::client::ReadRemainingAndFinish for graceful completion.
25template <class Response>
26class [[nodiscard]] Reader final {
27public:
28 /// @cond
29 // For internal use only
30 template <typename Stub, typename Request>
31 Reader(
32 impl::CallParams&& params,
33 impl::PrepareServerStreamingCall<Stub, Request, Response> prepare_async_method,
34 const Request& request
35 )
36 : stream_(std::make_unique<
37 impl::InputStream<Response>>(std::move(params), std::move(prepare_async_method), request))
38 {}
39 /// @endcond
40
41 Reader(Reader&&) noexcept = default;
42 Reader& operator=(Reader&&) noexcept = default;
43
44 /// @brief Await and read the next incoming message
45 ///
46 /// On end-of-input, `Finish` is called automatically.
47 ///
48 /// @param response where to put response on success
49 /// @returns `true` on success, `false` on end-of-input, task cancellation,
50 // or if the stream is already closed for reads
51 /// @throws ugrpc::client::RpcError on an RPC error
52 [[nodiscard]] bool Read(Response& response) { return stream_->Read(response); }
53
54 /// @brief Get call context, useful e.g. for accessing metadata.
55 CallContext& GetContext() { return stream_->GetContext(); }
56 /// @overload
57 const CallContext& GetContext() const { return stream_->GetContext(); }
58
59private:
60 std::unique_ptr<impl::InputStream<Response>> stream_;
61};
62
63/// @brief Client-side interface for client-streaming
64///
65/// This class is not thread-safe except for `GetContext`.
66///
67/// The RPC should be completed by calling @ref ugrpc::client::Writer::Finish.
68/// If destroyed early, the RPC is cancelled. The server gets @ref ugrpc::client::RpcInterruptedError
69/// and the `abandoned-error` metric is incremented. When properly finished, the connection stays open for reuse.
70template <typename Request, typename Response>
71class [[nodiscard]] Writer final {
72public:
73 /// @cond
74 // For internal use only
75 template <typename Stub>
76 Writer(impl::CallParams&& params, impl::PrepareClientStreamingCall<Stub, Request, Response> prepare_async_method)
77 : stream_(std::make_unique<
78 impl::OutputStream<Request, Response>>(std::move(params), std::move(prepare_async_method)))
79 {}
80 /// @endcond
81
82 Writer(Writer&&) noexcept = default;
83 Writer& operator=(Writer&&) noexcept = default;
84
85 /// @brief Write the next outgoing message
86 ///
87 /// `Write` doesn't store any references to `request`, so it can be
88 /// deallocated right after the call.
89 ///
90 /// @param request the next message to write
91 /// @return true if the data is going to the wire; false if the write
92 /// operation failed (including due to task cancellation,
93 // or if the stream is already closed for writes),
94 /// in which case no more writes will be accepted,
95 /// and the error details can be fetched from Finish
96 [[nodiscard]] bool Write(const Request& request) { return stream_->Write(request); }
97
98 /// @brief Write the next outgoing message and check result
99 ///
100 /// `WriteAndCheck` doesn't store any references to `request`, so it can be
101 /// deallocated right after the call.
102 ///
103 /// `WriteAndCheck` verifies result of the write and generates exception
104 /// in case of issues.
105 ///
106 /// @param request the next message to write
107 /// @throws ugrpc::client::RpcError on an RPC error
108 /// @throws ugrpc::client::RpcCancelledError on task cancellation
109 /// @throws ugrpc::client::RpcError if the stream is already closed for writes
110 void WriteAndCheck(const Request& request) { stream_->WriteAndCheck(request); }
111
112 /// @brief Complete the RPC successfully
113 ///
114 /// Should be called once all the data is written. The server will then
115 /// send a single `Response`.
116 ///
117 /// `Finish` should not be called multiple times.
118 ///
119 /// The connection is not closed, it will be reused for new RPCs.
120 ///
121 /// @returns the single `Response` received after finishing the writes
122 /// @throws ugrpc::client::RpcError on an RPC error
123 /// @throws ugrpc::client::RpcCancelledError on task cancellation
124 Response Finish() { return stream_->Finish(); }
125
126 /// @brief Get call context, useful e.g. for accessing metadata.
127 CallContext& GetContext() { return stream_->GetContext(); }
128 /// @overload
129 const CallContext& GetContext() const { return stream_->GetContext(); }
130
131private:
132 std::unique_ptr<impl::OutputStream<Request, Response>> stream_;
133};
134
135/// @brief Client-side interface for bi-directional streaming
136///
137/// It is safe to call the following methods from different coroutines:
138///
139/// - `GetContext`;
140/// - one of (`Read`, `ReadAsync`);
141/// - one of (`Write`, `WritesDone`).
142///
143/// `WriteAndCheck` is NOT thread-safe.
144///
145/// The RPC should be completed by reading until @ref ugrpc::client::Reader::Read returns `false`.
146/// If destroyed early, the RPC is cancelled. The server gets @ref ugrpc::client::RpcInterruptedError
147/// and the `abandoned-error` metric is incremented. The connection stays open for reuse.
148/// gRPC provides no way to early-close a server-streaming RPC gracefully.
149// See @ref ugrpc::client::ReadRemainingAndFinish and @ref ugrpc::client::PingPongFinish for graceful completion.
150///
151/// `Read` and `AsyncRead` can throw if error status is received from server.
152/// User MUST NOT call `Read` or `AsyncRead` again after failure of any of these
153/// operations.
154///
155/// `Write` and `WritesDone` methods do not throw, but indicate issues with
156/// the RPC by returning `false`.
157///
158/// `WriteAndCheck` is intended for ping-pong scenarios, when after write
159/// operation the user calls `Read` and vice versa.
160///
161/// If `Write` or `WritesDone` returns negative result, the user MUST NOT call
162/// any of these methods anymore.
163/// Instead the user SHOULD call `Read` method until the end of input. If
164/// `Write` or `WritesDone` finishes with negative result, finally `Read`
165/// will throw an exception.
166/// ## Usage example:
167///
168/// @snippet grpc/tests/stream_test.cpp concurrent bidirectional stream
169///
170template <typename Request, typename Response>
171class [[nodiscard]] ReaderWriter final {
172public:
173 /// @cond
174 // For internal use only
175 template <typename Stub>
176 ReaderWriter(
177 impl::CallParams&& params,
178 impl::PrepareBidiStreamingCall<Stub, Request, Response> prepare_async_method
179 )
180 : stream_(std::make_unique<
181 impl::BidirectionalStream<Request, Response>>(std::move(params), std::move(prepare_async_method)))
182 {}
183 /// @endcond
184
185 ReaderWriter(ReaderWriter&&) noexcept = default;
186 ReaderWriter& operator=(ReaderWriter&&) noexcept = default;
187
188 /// @brief Await and read the next incoming message
189 ///
190 /// On end-of-input, `Finish` is called automatically.
191 ///
192 /// @param response where to put response on success
193 /// @returns `true` on success, `false` on end-of-input, task cancellation,
194 /// or if the stream is already closed for reads
195 /// @throws ugrpc::client::RpcError on an RPC error
196 [[nodiscard]] bool Read(Response& response) { return stream_->Read(response); }
197
198 /// @brief Return future to read next incoming result
199 ///
200 /// @param response where to put response on success
201 /// @return StreamReadFuture future
202 /// @throws ugrpc::client::RpcError on an RPC error
203 /// @throws ugrpc::client::RpcError if the stream is already closed for reads
204 StreamReadFuture<Response> ReadAsync(Response& response) { return stream_->ReadAsync(response); }
205
206 /// @brief Write the next outgoing message
207 ///
208 /// RPC will be performed immediately. No references to `request` are
209 /// saved, so it can be deallocated right after the call.
210 ///
211 /// @param request the next message to write
212 /// @return true if the data is going to the wire; false if the write
213 /// operation failed (including due to task cancellation,
214 // or if the stream is already closed for writes),
215 /// in which case no more writes will be accepted,
216 /// but Read may still have some data and status code available
217 [[nodiscard]] bool Write(const Request& request) { return stream_->Write(request); }
218
219 /// @brief Write the next outgoing message and check result
220 ///
221 /// `WriteAndCheck` doesn't store any references to `request`, so it can be
222 /// deallocated right after the call.
223 ///
224 /// `WriteAndCheck` verifies result of the write and generates exception
225 /// in case of issues.
226 ///
227 /// @param request the next message to write
228 /// @throws ugrpc::client::RpcError on an RPC error
229 /// @throws ugrpc::client::RpcCancelledError on task cancellation
230 /// @throws ugrpc::client::RpcError if the stream is already closed for writes
231 void WriteAndCheck(const Request& request) { stream_->WriteAndCheck(request); }
232
233 /// @brief Announce end-of-output to the server
234 ///
235 /// Should be called to notify the server and receive the final response(s).
236 ///
237 /// @return true if the data is going to the wire; false if the operation
238 /// failed (including if the stream is already closed for writes),
239 /// but Read may still have some data and status code available
240 [[nodiscard]] bool WritesDone() { return stream_->WritesDone(); }
241
242 /// @brief Get call context, useful e.g. for accessing metadata.
243 CallContext& GetContext() { return stream_->GetContext(); }
244 /// @overload
245 const CallContext& GetContext() const { return stream_->GetContext(); }
246
247private:
248 std::unique_ptr<impl::BidirectionalStream<Request, Response>> stream_;
249};
250
251} // namespace ugrpc::client
252
253USERVER_NAMESPACE_END