userver: userver/ugrpc/client/rpc.hpp Source File
Loading...
Searching...
No Matches
rpc.hpp
Go to the documentation of this file.
1#pragma once
2
3/// @file userver/ugrpc/client/rpc.hpp
4/// @brief Classes representing an outgoing RPC
5
6#include <memory>
7#include <string_view>
8#include <utility>
9#include <vector>
10
11#include <grpcpp/impl/codegen/proto_utils.h>
12
13#include <userver/dynamic_config/snapshot.hpp>
14#include <userver/utils/assert.hpp>
15#include <userver/utils/function_ref.hpp>
16
17#include <userver/ugrpc/client/exceptions.hpp>
18#include <userver/ugrpc/client/impl/async_methods.hpp>
19#include <userver/ugrpc/client/impl/call_params.hpp>
20#include <userver/ugrpc/client/impl/channel_cache.hpp>
21#include <userver/ugrpc/client/middlewares/fwd.hpp>
22#include <userver/ugrpc/impl/deadline_timepoint.hpp>
23#include <userver/ugrpc/impl/internal_tag_fwd.hpp>
24#include <userver/ugrpc/impl/statistics_scope.hpp>
25
26USERVER_NAMESPACE_BEGIN
27
28namespace ugrpc::client {
29
30/// @brief UnaryFuture for waiting a single response RPC
31class [[nodiscard]] UnaryFuture {
32 public:
33 /// @cond
34 explicit UnaryFuture(impl::RpcData& data) noexcept;
35 /// @endcond
36
37 UnaryFuture(UnaryFuture&&) noexcept = default;
38 UnaryFuture& operator=(UnaryFuture&&) noexcept;
39
40 ~UnaryFuture() noexcept;
41
42 /// @brief Await response
43 ///
44 /// Upon completion result is available in `response` when initiating the
45 /// asynchronous operation, e.g. FinishAsync.
46 ///
47 /// `Get` should not be called multiple times for the same UnaryFuture.
48 ///
49 /// @throws ugrpc::client::RpcError on an RPC error
50 /// @throws ugrpc::client::RpcCancelledError on task cancellation
51 void Get();
52
53 /// @brief Checks if the asynchronous call has completed
54 /// Note, that once user gets result, IsReady should not be called
55 /// @return true if result ready
56 [[nodiscard]] bool IsReady() const noexcept;
57
58 private:
59 impl::FutureImpl impl_;
60};
61
62/// @brief StreamReadFuture for waiting a single read response from stream
63template <typename RPC>
64class [[nodiscard]] StreamReadFuture {
65 public:
66 /// @cond
67 explicit StreamReadFuture(impl::RpcData& data,
68 typename RPC::RawStream& stream) noexcept;
69 /// @endcond
70
71 StreamReadFuture(StreamReadFuture&& other) noexcept = default;
72 StreamReadFuture& operator=(StreamReadFuture&& other) noexcept;
73
74 ~StreamReadFuture() noexcept;
75
76 /// @brief Await response
77 ///
78 /// Upon completion the result is available in `response` that was
79 /// specified when initiating the asynchronous read
80 ///
81 /// `Get` should not be called multiple times for the same StreamReadFuture.
82 ///
83 /// @throws ugrpc::client::RpcError on an RPC error
84 /// @throws ugrpc::client::RpcCancelledError on task cancellation
85 bool Get();
86
87 /// @brief Checks if the asynchronous call has completed
88 /// Note, that once user gets result, IsReady should not be called
89 /// @return true if result ready
90 [[nodiscard]] bool IsReady() const noexcept;
91
92 private:
93 impl::FutureImpl impl_;
94 typename RPC::RawStream* stream_;
95};
96
97/// @brief Base class for any RPC
99 protected:
100 /// @cond
101 CallAnyBase(impl::CallParams&& params)
102 : data_(std::make_unique<impl::RpcData>(std::move(params))) {}
103 /// @endcond
104
105 public:
106 /// @returns the `ClientContext` used for this RPC
107 grpc::ClientContext& GetContext();
108
109 /// @returns client name
111
112 /// @returns RPC name
114
115 /// @returns RPC span
116 tracing::Span& GetSpan();
117
118 /// @cond
119 // For internal use only
120 impl::RpcData& GetData(ugrpc::impl::InternalTag);
121 /// @endcond
122
123 protected:
124 impl::RpcData& GetData();
125
126 private:
127 std::unique_ptr<impl::RpcData> data_;
128};
129
130/// @brief Controls a single request -> single response RPC
131///
132/// This class is not thread-safe except for `GetContext`.
133///
134/// The RPC is cancelled on destruction unless `Finish` or `FinishAsync`. In
135/// that case the connection is not closed (it will be reused for new RPCs), and
136/// the server receives `RpcInterruptedError` immediately.
137template <typename Response>
138class [[nodiscard]] UnaryCall final : public CallAnyBase {
139 public:
140 /// @brief Await and read the response
141 ///
142 /// `Finish` should not be called multiple times for the same RPC.
143 ///
144 /// The connection is not closed, it will be reused for new RPCs.
145 ///
146 /// @returns the response on success
147 /// @throws ugrpc::client::RpcError on an RPC error
148 /// @throws ugrpc::client::RpcCancelledError on task cancellation
149 Response Finish();
150
151 /// @brief Asynchronously finish the call
152 ///
153 /// `FinishAsync` should not be called multiple times for the same RPC.
154 ///
155 /// `Finish` and `FinishAsync` should not be called together for the same RPC.
156 ///
157 /// @returns the future for the single response
158 UnaryFuture FinishAsync(Response& response);
159
160 /// @cond
161 // For internal use only
162 template <typename Stub, typename Request>
163 UnaryCall(
164 impl::CallParams&& params, Stub& stub,
165 impl::RawResponseReaderPreparer<Stub, Request, Response> prepare_func,
166 const Request& req);
167 /// @endcond
168
169 UnaryCall(UnaryCall&&) noexcept = default;
170 UnaryCall& operator=(UnaryCall&&) noexcept = default;
171 ~UnaryCall() = default;
172
173 private:
174 impl::RawResponseReader<Response> reader_;
175};
176
177/// @brief Controls a single request -> response stream RPC
178///
179/// This class is not thread-safe except for `GetContext`.
180///
181/// The RPC is cancelled on destruction unless the stream is closed (`Read` has
182/// returned `false`). In that case the connection is not closed (it will be
183/// reused for new RPCs), and the server receives `RpcInterruptedError`
184/// immediately. gRPC provides no way to early-close a server-streaming RPC
185/// gracefully.
186///
187/// If any method throws, further methods must not be called on the same stream,
188/// except for `GetContext`.
189template <typename Response>
190class [[nodiscard]] InputStream final : public CallAnyBase {
191 public:
192 /// @brief Await and read the next incoming message
193 ///
194 /// On end-of-input, `Finish` is called automatically.
195 ///
196 /// @param response where to put response on success
197 /// @returns `true` on success, `false` on end-of-input or task cancellation
198 /// @throws ugrpc::client::RpcError on an RPC error
199 [[nodiscard]] bool Read(Response& response);
200
201 /// @cond
202 // For internal use only
203 using RawStream = grpc::ClientAsyncReader<Response>;
204
205 template <typename Stub, typename Request>
206 InputStream(impl::CallParams&& params, Stub& stub,
207 impl::RawReaderPreparer<Stub, Request, Response> prepare_func,
208 const Request& req);
209 /// @endcond
210
211 InputStream(InputStream&&) noexcept = default;
212 InputStream& operator=(InputStream&&) noexcept = default;
213 ~InputStream() = default;
214
215 private:
216 std::unique_ptr<impl::RpcData> data_;
217 impl::RawReader<Response> stream_;
218};
219
220/// @brief Controls a request stream -> single response RPC
221///
222/// This class is not thread-safe except for `GetContext`.
223///
224/// The RPC is cancelled on destruction unless `Finish` has been called. In that
225/// case the connection is not closed (it will be reused for new RPCs), and the
226/// server receives `RpcInterruptedError` immediately.
227///
228/// If any method throws, further methods must not be called on the same stream,
229/// except for `GetContext`.
230template <typename Request, typename Response>
231class [[nodiscard]] OutputStream final : public CallAnyBase {
232 public:
233 /// @brief Write the next outgoing message
234 ///
235 /// `Write` doesn't store any references to `request`, so it can be
236 /// deallocated right after the call.
237 ///
238 /// @param request the next message to write
239 /// @return true if the data is going to the wire; false if the write
240 /// operation failed (including due to task cancellation),
241 /// in which case no more writes will be accepted,
242 /// and the error details can be fetched from Finish
243 [[nodiscard]] bool Write(const Request& request);
244
245 /// @brief Write the next outgoing message and check result
246 ///
247 /// `WriteAndCheck` doesn't store any references to `request`, so it can be
248 /// deallocated right after the call.
249 ///
250 /// `WriteAndCheck` verifies result of the write and generates exception
251 /// in case of issues.
252 ///
253 /// @param request the next message to write
254 /// @throws ugrpc::client::RpcError on an RPC error
255 /// @throws ugrpc::client::RpcCancelledError on task cancellation
256 void WriteAndCheck(const Request& request);
257
258 /// @brief Complete the RPC successfully
259 ///
260 /// Should be called once all the data is written. The server will then
261 /// send a single `Response`.
262 ///
263 /// `Finish` should not be called multiple times.
264 ///
265 /// The connection is not closed, it will be reused for new RPCs.
266 ///
267 /// @returns the single `Response` received after finishing the writes
268 /// @throws ugrpc::client::RpcError on an RPC error
269 /// @throws ugrpc::client::RpcCancelledError on task cancellation
270 Response Finish();
271
272 /// @cond
273 // For internal use only
274 using RawStream = grpc::ClientAsyncWriter<Request>;
275
276 template <typename Stub>
277 OutputStream(impl::CallParams&& params, Stub& stub,
278 impl::RawWriterPreparer<Stub, Request, Response> prepare_func);
279 /// @endcond
280
281 OutputStream(OutputStream&&) noexcept = default;
282 OutputStream& operator=(OutputStream&&) noexcept = default;
283 ~OutputStream() = default;
284
285 private:
286 std::unique_ptr<impl::RpcData> data_;
287 std::unique_ptr<Response> final_response_;
288 impl::RawWriter<Request> stream_;
289};
290
291/// @brief Controls a request stream -> response stream RPC
292///
293/// This class allows the following concurrent calls:
294/// - `GetContext`
295/// - Concurrent call of one of (`Read`, `ReadAsync`) with one of (`Write`,
296/// `WritesDone`)
297/// `WriteAndCheck` is not thread-safe
298///
299/// The RPC is cancelled on destruction unless the stream is closed (`Read` has
300/// returned `false`). In that case the connection is not closed (it will be
301/// reused for new RPCs), and the server receives `RpcInterruptedError`
302/// immediately. gRPC provides no way to early-close a server-streaming RPC
303/// gracefully.
304///
305/// `Read` and `AsyncRead` can throw if error status is received from server.
306/// User MUST NOT call `Read` or `AsyncRead` again after failure of any of these
307/// operations.
308///
309/// `Write` and `WritesDone` methods do not throw, but indicate issues with
310/// the RPC by returning `false`.
311///
312/// `WriteAndCheck` is intended for ping-pong scenarios, when after write
313/// operation the user calls `Read` and vice versa.
314///
315/// If `Write` or `WritesDone` returns negative result, the user MUST NOT call
316/// any of these methods anymore.
317/// Instead the user SHOULD call `Read` method until the end of input. If
318/// `Write` or `WritesDone` finishes with negative result, finally `Read`
319/// will throw an exception.
320template <typename Request, typename Response>
321class [[nodiscard]] BidirectionalStream final : public CallAnyBase {
322 public:
323 /// @brief Await and read the next incoming message
324 ///
325 /// On end-of-input, `Finish` is called automatically.
326 ///
327 /// @param response where to put response on success
328 /// @returns `true` on success, `false` on end-of-input or task cancellation
329 /// @throws ugrpc::client::RpcError on an RPC error
330 [[nodiscard]] bool Read(Response& response);
331
332 /// @brief Return future to read next incoming result
333 ///
334 /// @param response where to put response on success
335 /// @return StreamReadFuture future
336 /// @throws ugrpc::client::RpcError on an RPC error
337 StreamReadFuture<BidirectionalStream> ReadAsync(Response& response) noexcept;
338
339 /// @brief Write the next outgoing message
340 ///
341 /// RPC will be performed immediately. No references to `request` are
342 /// saved, so it can be deallocated right after the call.
343 ///
344 /// @param request the next message to write
345 /// @return true if the data is going to the wire; false if the write
346 /// operation failed (including due to task cancellation),
347 /// in which case no more writes will be accepted,
348 /// but Read may still have some data and status code available
349 [[nodiscard]] bool Write(const Request& request);
350
351 /// @brief Write the next outgoing message and check result
352 ///
353 /// `WriteAndCheck` doesn't store any references to `request`, so it can be
354 /// deallocated right after the call.
355 ///
356 /// `WriteAndCheck` verifies result of the write and generates exception
357 /// in case of issues.
358 ///
359 /// @param request the next message to write
360 /// @throws ugrpc::client::RpcError on an RPC error
361 /// @throws ugrpc::client::RpcCancelledError on task cancellation
362 void WriteAndCheck(const Request& request);
363
364 /// @brief Announce end-of-output to the server
365 ///
366 /// Should be called to notify the server and receive the final response(s).
367 ///
368 /// @return true if the data is going to the wire; false if the operation
369 /// failed, but Read may still have some data and status code
370 /// available
371 [[nodiscard]] bool WritesDone();
372
373 /// @cond
374 // For internal use only
375 using RawStream = grpc::ClientAsyncReaderWriter<Request, Response>;
376
377 template <typename Stub>
378 BidirectionalStream(
379 impl::CallParams&& params, Stub& stub,
380 impl::RawReaderWriterPreparer<Stub, Request, Response> prepare_func);
381 /// @endcond
382
383 BidirectionalStream(BidirectionalStream&&) noexcept = default;
384 BidirectionalStream& operator=(BidirectionalStream&&) noexcept = default;
385 ~BidirectionalStream() = default;
386
387 private:
388 std::unique_ptr<impl::RpcData> data_;
389 impl::RawReaderWriter<Request, Response> stream_;
390};
391
392// ========================== Implementation follows ==========================
393
394namespace impl {
395void CallMiddlewares(const Middlewares& mws, CallAnyBase& call,
396 utils::function_ref<void()> user_call,
397 const ::google::protobuf::Message* request);
398} // namespace impl
399
400template <typename RPC>
401StreamReadFuture<RPC>::StreamReadFuture(
402 impl::RpcData& data, typename RPC::RawStream& stream) noexcept
403 : impl_(data), stream_(&stream) {}
404
405template <typename RPC>
406StreamReadFuture<RPC>::~StreamReadFuture() noexcept {
407 if (auto* const data = impl_.GetData()) {
408 impl::RpcData::AsyncMethodInvocationGuard guard(*data);
409 const auto wait_status =
410 impl::Wait(data->GetAsyncMethodInvocation(), data->GetContext());
411 if (wait_status != impl::AsyncMethodInvocation::WaitStatus::kOk) {
412 if (wait_status == impl::AsyncMethodInvocation::WaitStatus::kCancelled) {
413 data->GetStatsScope().OnCancelled();
414 }
415 impl::Finish(*stream_, *data, false);
416 }
417 }
418}
419
420template <typename RPC>
421StreamReadFuture<RPC>& StreamReadFuture<RPC>::operator=(
422 StreamReadFuture<RPC>&& other) noexcept {
423 if (this == &other) return *this;
424 [[maybe_unused]] auto for_destruction = std::move(*this);
425 impl_ = std::move(other.impl_);
426 stream_ = other.stream_;
427 return *this;
428}
429
430template <typename RPC>
431bool StreamReadFuture<RPC>::Get() {
432 auto* const data = impl_.GetData();
433 UINVARIANT(data, "'Get' must be called only once");
434 impl::RpcData::AsyncMethodInvocationGuard guard(*data);
435 impl_.ClearData();
436 const auto result =
437 impl::Wait(data->GetAsyncMethodInvocation(), data->GetContext());
438 if (result == impl::AsyncMethodInvocation::WaitStatus::kCancelled) {
439 data->GetStatsScope().OnCancelled();
440 data->GetStatsScope().Flush();
441 }
442 if (result == impl::AsyncMethodInvocation::WaitStatus::kError) {
443 // Finish can only be called once all the data is read, otherwise the
444 // underlying gRPC driver hangs.
445 impl::Finish(*stream_, *data, true);
446 }
447 return result == impl::AsyncMethodInvocation::WaitStatus::kOk;
448}
449
450template <typename RPC>
451bool StreamReadFuture<RPC>::IsReady() const noexcept {
452 return impl_.IsReady();
453}
454
455template <typename Response>
456template <typename Stub, typename Request>
457UnaryCall<Response>::UnaryCall(
458 impl::CallParams&& params, Stub& stub,
459 impl::RawResponseReaderPreparer<Stub, Request, Response> prepare_func,
460 const Request& req)
461 : CallAnyBase(std::move(params)) {
462 impl::CallMiddlewares(
463 GetData().GetMiddlewares(), *this,
464 [&] {
465 reader_ = (stub.*prepare_func)(&GetData().GetContext(), req,
466 &GetData().GetQueue());
467 reader_->StartCall();
468 },
469 &req);
470 GetData().SetWritesFinished();
471}
472
473template <typename Response>
474Response UnaryCall<Response>::Finish() {
475 Response response;
476 UnaryFuture future = FinishAsync(response);
477 future.Get();
478 return response;
479}
480
481template <typename Response>
482UnaryFuture UnaryCall<Response>::FinishAsync(Response& response) {
483 UASSERT(reader_);
484 PrepareFinish(GetData());
485 GetData().EmplaceFinishAsyncMethodInvocation();
486 auto& finish = GetData().GetFinishAsyncMethodInvocation();
487 auto& status = GetData().GetStatus();
488 reader_->Finish(&response, &status, finish.GetTag());
489 return UnaryFuture{GetData()};
490}
491
492template <typename Response>
493template <typename Stub, typename Request>
494InputStream<Response>::InputStream(
495 impl::CallParams&& params, Stub& stub,
496 impl::RawReaderPreparer<Stub, Request, Response> prepare_func,
497 const Request& req)
498 : CallAnyBase(std::move(params)) {
499 impl::CallMiddlewares(
500 GetData().GetMiddlewares(), *this,
501 [&] {
502 stream_ = (stub.*prepare_func)(&GetData().GetContext(), req,
503 &GetData().GetQueue());
504 impl::StartCall(*stream_, GetData());
505 },
506 &req);
507 GetData().SetWritesFinished();
508}
509
510template <typename Response>
511bool InputStream<Response>::Read(Response& response) {
512 if (impl::Read(*stream_, response, GetData())) {
513 return true;
514 } else {
515 // Finish can only be called once all the data is read, otherwise the
516 // underlying gRPC driver hangs.
517 impl::Finish(*stream_, GetData(), true);
518 return false;
519 }
520}
521
522template <typename Request, typename Response>
523template <typename Stub>
524OutputStream<Request, Response>::OutputStream(
525 impl::CallParams&& params, Stub& stub,
526 impl::RawWriterPreparer<Stub, Request, Response> prepare_func)
527 : CallAnyBase(std::move(params)),
528 final_response_(std::make_unique<Response>()) {
529 impl::CallMiddlewares(
530 GetData().GetMiddlewares(), *this,
531 [&] {
532 // 'final_response_' will be filled upon successful 'Finish' async call
533 stream_ =
534 (stub.*prepare_func)(&GetData().GetContext(), final_response_.get(),
535 &GetData().GetQueue());
536 impl::StartCall(*stream_, GetData());
537 },
538 nullptr);
539}
540
541template <typename Request, typename Response>
542bool OutputStream<Request, Response>::Write(const Request& request) {
543 // Don't buffer writes, otherwise in an event subscription scenario, events
544 // may never actually be delivered
545 grpc::WriteOptions write_options{};
546
547 return impl::Write(*stream_, request, write_options, GetData());
548}
549
550template <typename Request, typename Response>
551void OutputStream<Request, Response>::WriteAndCheck(const Request& request) {
552 // Don't buffer writes, otherwise in an event subscription scenario, events
553 // may never actually be delivered
554 grpc::WriteOptions write_options{};
555
556 if (!impl::Write(*stream_, request, write_options, GetData())) {
557 impl::Finish(*stream_, GetData(), true);
558 }
559}
560
561template <typename Request, typename Response>
562Response OutputStream<Request, Response>::Finish() {
563 // gRPC does not implicitly call `WritesDone` in `Finish`,
564 // contrary to the documentation
565 if (!GetData().AreWritesFinished()) {
566 impl::WritesDone(*stream_, GetData());
567 }
568
569 impl::Finish(*stream_, GetData(), true);
570
571 return std::move(*final_response_);
572}
573
574template <typename Request, typename Response>
575template <typename Stub>
576BidirectionalStream<Request, Response>::BidirectionalStream(
577 impl::CallParams&& params, Stub& stub,
578 impl::RawReaderWriterPreparer<Stub, Request, Response> prepare_func)
579 : CallAnyBase(std::move(params)) {
580 impl::CallMiddlewares(
581 GetData().GetMiddlewares(), *this,
582 [&] {
583 stream_ = (stub.*prepare_func)(&GetData().GetContext(),
584 &GetData().GetQueue());
585 impl::StartCall(*stream_, GetData());
586 },
587 nullptr);
588}
589
590template <typename Request, typename Response>
591StreamReadFuture<BidirectionalStream<Request, Response>>
592BidirectionalStream<Request, Response>::ReadAsync(Response& response) noexcept {
593 impl::ReadAsync(*stream_, response, GetData());
594 return StreamReadFuture<BidirectionalStream<Request, Response>>{GetData(),
595 *stream_};
596}
597
598template <typename Request, typename Response>
599bool BidirectionalStream<Request, Response>::Read(Response& response) {
600 auto future = ReadAsync(response);
601 return future.Get();
602}
603
604template <typename Request, typename Response>
605bool BidirectionalStream<Request, Response>::Write(const Request& request) {
606 // Don't buffer writes, optimize for ping-pong-style interaction
607 grpc::WriteOptions write_options{};
608
609 return impl::Write(*stream_, request, write_options, GetData());
610}
611
612template <typename Request, typename Response>
613void BidirectionalStream<Request, Response>::WriteAndCheck(
614 const Request& request) {
615 // Don't buffer writes, optimize for ping-pong-style interaction
616 grpc::WriteOptions write_options{};
617
618 impl::WriteAndCheck(*stream_, request, write_options, GetData());
619}
620
621template <typename Request, typename Response>
622bool BidirectionalStream<Request, Response>::WritesDone() {
623 return impl::WritesDone(*stream_, GetData());
624}
625
626} // namespace ugrpc::client
627
628USERVER_NAMESPACE_END