userver: userver/ugrpc/client/rpc.hpp Source File
⚠️ This is the documentation for an old userver version. Click here to switch to the latest version.
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages Concepts
rpc.hpp
Go to the documentation of this file.
1#pragma once
2
3/// @file userver/ugrpc/client/rpc.hpp
4/// @brief Classes representing an outgoing RPC
5
6#include <memory>
7#include <string_view>
8#include <utility>
9#include <vector>
10
11#include <grpcpp/impl/codegen/proto_utils.h>
12
13#include <userver/dynamic_config/snapshot.hpp>
14#include <userver/engine/deadline.hpp>
15#include <userver/engine/future_status.hpp>
16#include <userver/utils/assert.hpp>
17#include <userver/utils/function_ref.hpp>
18
19#include <userver/ugrpc/client/exceptions.hpp>
20#include <userver/ugrpc/client/impl/async_methods.hpp>
21#include <userver/ugrpc/client/impl/call_params.hpp>
22#include <userver/ugrpc/client/impl/channel_cache.hpp>
23#include <userver/ugrpc/client/middlewares/fwd.hpp>
24#include <userver/ugrpc/impl/deadline_timepoint.hpp>
25#include <userver/ugrpc/impl/internal_tag_fwd.hpp>
26#include <userver/ugrpc/impl/statistics_scope.hpp>
27
28USERVER_NAMESPACE_BEGIN
29
30namespace ugrpc::client {
31
32/// @brief UnaryFuture for waiting a single response RPC
33class [[nodiscard]] UnaryFuture {
34 public:
35 /// @cond
36 explicit UnaryFuture(impl::RpcData& data) noexcept;
37 /// @endcond
38
39 UnaryFuture(UnaryFuture&&) noexcept = default;
40 UnaryFuture& operator=(UnaryFuture&&) noexcept;
41 UnaryFuture(const UnaryFuture&) = delete;
42 UnaryFuture& operator=(const UnaryFuture&) = delete;
43
44 ~UnaryFuture() noexcept;
45
46 /// @brief Await response
47 ///
48 /// Upon completion result is available in `response` when initiating the
49 /// asynchronous operation, e.g. FinishAsync.
50 ///
51 /// `Get` should not be called multiple times for the same UnaryFuture.
52 ///
53 /// @throws ugrpc::client::RpcError on an RPC error
54 /// @throws ugrpc::client::RpcCancelledError on task cancellation
55 void Get();
56
57 /// @brief Await response until specified timepoint
58 ///
59 /// Once `kReady` is returned, result is available in `response` when
60 /// initiating the asynchronous operation, e.g. FinishAsync.
61 ///
62 /// In case of 'kReady/kCancelled' answer or exception `Get` should not
63 /// be called anymore.
64 ///
65 /// @throws ugrpc::client::RpcError on an RPC error
66 [[nodiscard]] engine::FutureStatus Get(engine::Deadline deadline);
67
68 /// @brief Checks if the asynchronous call has completed
69 /// Note, that once user gets result, IsReady should not be called
70 /// @return true if result ready
71 [[nodiscard]] bool IsReady() const noexcept;
72
73 /// @cond
74 // For internal use only.
75 engine::impl::ContextAccessor* TryGetContextAccessor() noexcept;
76 /// @endcond
77 private:
78 impl::FutureImpl impl_;
79};
80
81/// @brief StreamReadFuture for waiting a single read response from stream
82template <typename RPC>
83class [[nodiscard]] StreamReadFuture {
84 public:
85 /// @cond
86 explicit StreamReadFuture(impl::RpcData& data,
87 typename RPC::RawStream& stream) noexcept;
88 /// @endcond
89
90 StreamReadFuture(StreamReadFuture&& other) noexcept = default;
91 StreamReadFuture& operator=(StreamReadFuture&& other) noexcept;
92
93 ~StreamReadFuture() noexcept;
94
95 /// @brief Await response
96 ///
97 /// Upon completion the result is available in `response` that was
98 /// specified when initiating the asynchronous read
99 ///
100 /// `Get` should not be called multiple times for the same StreamReadFuture.
101 ///
102 /// @throws ugrpc::client::RpcError on an RPC error
103 /// @throws ugrpc::client::RpcCancelledError on task cancellation
104 bool Get();
105
106 /// @brief Checks if the asynchronous call has completed
107 /// Note, that once user gets result, IsReady should not be called
108 /// @return true if result ready
109 [[nodiscard]] bool IsReady() const noexcept;
110
111 private:
112 impl::FutureImpl impl_;
113 typename RPC::RawStream* stream_;
114};
115
116/// @brief Base class for any RPC
118 protected:
119 /// @cond
120 CallAnyBase(impl::CallParams&& params)
121 : data_(std::make_unique<impl::RpcData>(std::move(params))) {}
122 /// @endcond
123
124 public:
125 /// @returns the `ClientContext` used for this RPC
126 grpc::ClientContext& GetContext();
127
128 /// @returns client name
130
131 /// @returns RPC name
133
134 /// @returns RPC span
135 tracing::Span& GetSpan();
136
137 /// @cond
138 // For internal use only
139 impl::RpcData& GetData(ugrpc::impl::InternalTag);
140 /// @endcond
141
142 protected:
143 impl::RpcData& GetData();
144
145 private:
146 std::unique_ptr<impl::RpcData> data_;
147};
148
149/// @brief Controls a single request -> single response RPC
150///
151/// This class is not thread-safe except for `GetContext`.
152///
153/// The RPC is cancelled on destruction unless `Finish` or `FinishAsync`. In
154/// that case the connection is not closed (it will be reused for new RPCs), and
155/// the server receives `RpcInterruptedError` immediately.
156template <typename Response>
157class [[nodiscard]] UnaryCall final : public CallAnyBase {
158 public:
159 /// @brief Await and read the response
160 ///
161 /// `Finish` should not be called multiple times for the same RPC.
162 ///
163 /// The connection is not closed, it will be reused for new RPCs.
164 ///
165 /// @returns the response on success
166 /// @throws ugrpc::client::RpcError on an RPC error
167 /// @throws ugrpc::client::RpcCancelledError on task cancellation
168 Response Finish();
169
170 /// @brief Asynchronously finish the call
171 ///
172 /// `FinishAsync` should not be called multiple times for the same RPC.
173 ///
174 /// `Finish` and `FinishAsync` should not be called together for the same RPC.
175 ///
176 /// @returns the future for the single response
177 UnaryFuture FinishAsync(Response& response);
178
179 /// @cond
180 // For internal use only
181 template <typename Stub, typename Request>
182 UnaryCall(
183 impl::CallParams&& params, Stub& stub,
184 impl::RawResponseReaderPreparer<Stub, Request, Response> prepare_func,
185 const Request& req);
186 /// @endcond
187
188 UnaryCall(UnaryCall&&) noexcept = default;
189 UnaryCall& operator=(UnaryCall&&) noexcept = default;
190 ~UnaryCall() = default;
191
192 private:
193 impl::RawResponseReader<Response> reader_;
194};
195
196/// @brief Controls a single request -> response stream RPC
197///
198/// This class is not thread-safe except for `GetContext`.
199///
200/// The RPC is cancelled on destruction unless the stream is closed (`Read` has
201/// returned `false`). In that case the connection is not closed (it will be
202/// reused for new RPCs), and the server receives `RpcInterruptedError`
203/// immediately. gRPC provides no way to early-close a server-streaming RPC
204/// gracefully.
205///
206/// If any method throws, further methods must not be called on the same stream,
207/// except for `GetContext`.
208template <typename Response>
209class [[nodiscard]] InputStream final : public CallAnyBase {
210 public:
211 /// @brief Await and read the next incoming message
212 ///
213 /// On end-of-input, `Finish` is called automatically.
214 ///
215 /// @param response where to put response on success
216 /// @returns `true` on success, `false` on end-of-input or task cancellation
217 /// @throws ugrpc::client::RpcError on an RPC error
218 [[nodiscard]] bool Read(Response& response);
219
220 /// @cond
221 // For internal use only
222 using RawStream = grpc::ClientAsyncReader<Response>;
223
224 template <typename Stub, typename Request>
225 InputStream(impl::CallParams&& params, Stub& stub,
226 impl::RawReaderPreparer<Stub, Request, Response> prepare_func,
227 const Request& req);
228 /// @endcond
229
230 InputStream(InputStream&&) noexcept = default;
231 InputStream& operator=(InputStream&&) noexcept = default;
232 ~InputStream() = default;
233
234 private:
235 impl::RawReader<Response> stream_;
236};
237
238/// @brief Controls a request stream -> single response RPC
239///
240/// This class is not thread-safe except for `GetContext`.
241///
242/// The RPC is cancelled on destruction unless `Finish` has been called. In that
243/// case the connection is not closed (it will be reused for new RPCs), and the
244/// server receives `RpcInterruptedError` immediately.
245///
246/// If any method throws, further methods must not be called on the same stream,
247/// except for `GetContext`.
248template <typename Request, typename Response>
249class [[nodiscard]] OutputStream final : public CallAnyBase {
250 public:
251 /// @brief Write the next outgoing message
252 ///
253 /// `Write` doesn't store any references to `request`, so it can be
254 /// deallocated right after the call.
255 ///
256 /// @param request the next message to write
257 /// @return true if the data is going to the wire; false if the write
258 /// operation failed (including due to task cancellation),
259 /// in which case no more writes will be accepted,
260 /// and the error details can be fetched from Finish
261 [[nodiscard]] bool Write(const Request& request);
262
263 /// @brief Write the next outgoing message and check result
264 ///
265 /// `WriteAndCheck` doesn't store any references to `request`, so it can be
266 /// deallocated right after the call.
267 ///
268 /// `WriteAndCheck` verifies result of the write and generates exception
269 /// in case of issues.
270 ///
271 /// @param request the next message to write
272 /// @throws ugrpc::client::RpcError on an RPC error
273 /// @throws ugrpc::client::RpcCancelledError on task cancellation
274 void WriteAndCheck(const Request& request);
275
276 /// @brief Complete the RPC successfully
277 ///
278 /// Should be called once all the data is written. The server will then
279 /// send a single `Response`.
280 ///
281 /// `Finish` should not be called multiple times.
282 ///
283 /// The connection is not closed, it will be reused for new RPCs.
284 ///
285 /// @returns the single `Response` received after finishing the writes
286 /// @throws ugrpc::client::RpcError on an RPC error
287 /// @throws ugrpc::client::RpcCancelledError on task cancellation
288 Response Finish();
289
290 /// @cond
291 // For internal use only
292 using RawStream = grpc::ClientAsyncWriter<Request>;
293
294 template <typename Stub>
295 OutputStream(impl::CallParams&& params, Stub& stub,
296 impl::RawWriterPreparer<Stub, Request, Response> prepare_func);
297 /// @endcond
298
299 OutputStream(OutputStream&&) noexcept = default;
300 OutputStream& operator=(OutputStream&&) noexcept = default;
301 ~OutputStream() = default;
302
303 private:
304 std::unique_ptr<Response> final_response_;
305 impl::RawWriter<Request> stream_;
306};
307
308/// @brief Controls a request stream -> response stream RPC
309///
310/// It is safe to call the following methods from different coroutines:
311///
312/// - `GetContext`;
313/// - one of (`Read`, `ReadAsync`);
314/// - one of (`Write`, `WritesDone`).
315///
316/// `WriteAndCheck` is NOT thread-safe.
317///
318/// The RPC is cancelled on destruction unless the stream is closed (`Read` has
319/// returned `false`). In that case the connection is not closed (it will be
320/// reused for new RPCs), and the server receives `RpcInterruptedError`
321/// immediately. gRPC provides no way to early-close a server-streaming RPC
322/// gracefully.
323///
324/// `Read` and `AsyncRead` can throw if error status is received from server.
325/// User MUST NOT call `Read` or `AsyncRead` again after failure of any of these
326/// operations.
327///
328/// `Write` and `WritesDone` methods do not throw, but indicate issues with
329/// the RPC by returning `false`.
330///
331/// `WriteAndCheck` is intended for ping-pong scenarios, when after write
332/// operation the user calls `Read` and vice versa.
333///
334/// If `Write` or `WritesDone` returns negative result, the user MUST NOT call
335/// any of these methods anymore.
336/// Instead the user SHOULD call `Read` method until the end of input. If
337/// `Write` or `WritesDone` finishes with negative result, finally `Read`
338/// will throw an exception.
339/// ## Usage example:
340///
341/// @snippet grpc/tests/src/stream_test.cpp concurrent bidirectional stream
342///
343template <typename Request, typename Response>
344class [[nodiscard]] BidirectionalStream final : public CallAnyBase {
345 public:
346 /// @brief Await and read the next incoming message
347 ///
348 /// On end-of-input, `Finish` is called automatically.
349 ///
350 /// @param response where to put response on success
351 /// @returns `true` on success, `false` on end-of-input or task cancellation
352 /// @throws ugrpc::client::RpcError on an RPC error
353 [[nodiscard]] bool Read(Response& response);
354
355 /// @brief Return future to read next incoming result
356 ///
357 /// @param response where to put response on success
358 /// @return StreamReadFuture future
359 /// @throws ugrpc::client::RpcError on an RPC error
360 StreamReadFuture<BidirectionalStream> ReadAsync(Response& response) noexcept;
361
362 /// @brief Write the next outgoing message
363 ///
364 /// RPC will be performed immediately. No references to `request` are
365 /// saved, so it can be deallocated right after the call.
366 ///
367 /// @param request the next message to write
368 /// @return true if the data is going to the wire; false if the write
369 /// operation failed (including due to task cancellation),
370 /// in which case no more writes will be accepted,
371 /// but Read may still have some data and status code available
372 [[nodiscard]] bool Write(const Request& request);
373
374 /// @brief Write the next outgoing message and check result
375 ///
376 /// `WriteAndCheck` doesn't store any references to `request`, so it can be
377 /// deallocated right after the call.
378 ///
379 /// `WriteAndCheck` verifies result of the write and generates exception
380 /// in case of issues.
381 ///
382 /// @param request the next message to write
383 /// @throws ugrpc::client::RpcError on an RPC error
384 /// @throws ugrpc::client::RpcCancelledError on task cancellation
385 void WriteAndCheck(const Request& request);
386
387 /// @brief Announce end-of-output to the server
388 ///
389 /// Should be called to notify the server and receive the final response(s).
390 ///
391 /// @return true if the data is going to the wire; false if the operation
392 /// failed, but Read may still have some data and status code
393 /// available
394 [[nodiscard]] bool WritesDone();
395
396 /// @cond
397 // For internal use only
398 using RawStream = grpc::ClientAsyncReaderWriter<Request, Response>;
399
400 template <typename Stub>
401 BidirectionalStream(
402 impl::CallParams&& params, Stub& stub,
403 impl::RawReaderWriterPreparer<Stub, Request, Response> prepare_func);
404 /// @endcond
405
406 BidirectionalStream(BidirectionalStream&&) noexcept = default;
407 BidirectionalStream& operator=(BidirectionalStream&&) noexcept = default;
408 ~BidirectionalStream() = default;
409
410 private:
411 impl::RawReaderWriter<Request, Response> stream_;
412};
413
414// ========================== Implementation follows ==========================
415
416namespace impl {
417void CallMiddlewares(const Middlewares& mws, CallAnyBase& call,
418 utils::function_ref<void()> user_call,
419 const ::google::protobuf::Message* request);
420} // namespace impl
421
422template <typename RPC>
423StreamReadFuture<RPC>::StreamReadFuture(
424 impl::RpcData& data, typename RPC::RawStream& stream) noexcept
425 : impl_(data), stream_(&stream) {}
426
427template <typename RPC>
428StreamReadFuture<RPC>::~StreamReadFuture() noexcept {
429 if (auto* const data = impl_.GetData()) {
430 impl::RpcData::AsyncMethodInvocationGuard guard(*data);
431 const auto wait_status =
432 impl::Wait(data->GetAsyncMethodInvocation(), data->GetContext());
433 if (wait_status != impl::AsyncMethodInvocation::WaitStatus::kOk) {
434 if (wait_status == impl::AsyncMethodInvocation::WaitStatus::kCancelled) {
435 data->GetStatsScope().OnCancelled();
436 }
437 impl::Finish(*stream_, *data, false);
438 }
439 }
440}
441
442template <typename RPC>
443StreamReadFuture<RPC>& StreamReadFuture<RPC>::operator=(
444 StreamReadFuture<RPC>&& other) noexcept {
445 if (this == &other) return *this;
446 [[maybe_unused]] auto for_destruction = std::move(*this);
447 impl_ = std::move(other.impl_);
448 stream_ = other.stream_;
449 return *this;
450}
451
452template <typename RPC>
453bool StreamReadFuture<RPC>::Get() {
454 auto* const data = impl_.GetData();
455 UINVARIANT(data, "'Get' must be called only once");
456 impl::RpcData::AsyncMethodInvocationGuard guard(*data);
457 impl_.ClearData();
458 const auto result =
459 impl::Wait(data->GetAsyncMethodInvocation(), data->GetContext());
460 if (result == impl::AsyncMethodInvocation::WaitStatus::kCancelled) {
461 data->GetStatsScope().OnCancelled();
462 data->GetStatsScope().Flush();
463 }
464 if (result == impl::AsyncMethodInvocation::WaitStatus::kError) {
465 // Finish can only be called once all the data is read, otherwise the
466 // underlying gRPC driver hangs.
467 impl::Finish(*stream_, *data, true);
468 }
469 return result == impl::AsyncMethodInvocation::WaitStatus::kOk;
470}
471
472template <typename RPC>
473bool StreamReadFuture<RPC>::IsReady() const noexcept {
474 return impl_.IsReady();
475}
476
477template <typename Response>
478template <typename Stub, typename Request>
479UnaryCall<Response>::UnaryCall(
480 impl::CallParams&& params, Stub& stub,
481 impl::RawResponseReaderPreparer<Stub, Request, Response> prepare_func,
482 const Request& req)
483 : CallAnyBase(std::move(params)) {
484 impl::CallMiddlewares(
485 GetData().GetMiddlewares(), *this,
486 [&] {
487 reader_ = (stub.*prepare_func)(&GetData().GetContext(), req,
488 &GetData().GetQueue());
489 reader_->StartCall();
490 },
491 &req);
492 GetData().SetWritesFinished();
493}
494
495template <typename Response>
496Response UnaryCall<Response>::Finish() {
497 Response response;
498 UnaryFuture future = FinishAsync(response);
499 future.Get();
500 return response;
501}
502
503template <typename Response>
504UnaryFuture UnaryCall<Response>::FinishAsync(Response& response) {
505 UASSERT(reader_);
506 PrepareFinish(GetData());
507 GetData().EmplaceFinishAsyncMethodInvocation();
508 auto& finish = GetData().GetFinishAsyncMethodInvocation();
509 auto& status = GetData().GetStatus();
510 reader_->Finish(&response, &status, finish.GetTag());
511 return UnaryFuture{GetData()};
512}
513
514template <typename Response>
515template <typename Stub, typename Request>
516InputStream<Response>::InputStream(
517 impl::CallParams&& params, Stub& stub,
518 impl::RawReaderPreparer<Stub, Request, Response> prepare_func,
519 const Request& req)
520 : CallAnyBase(std::move(params)) {
521 impl::CallMiddlewares(
522 GetData().GetMiddlewares(), *this,
523 [&] {
524 stream_ = (stub.*prepare_func)(&GetData().GetContext(), req,
525 &GetData().GetQueue());
526 impl::StartCall(*stream_, GetData());
527 },
528 &req);
529 GetData().SetWritesFinished();
530}
531
532template <typename Response>
533bool InputStream<Response>::Read(Response& response) {
534 if (impl::Read(*stream_, response, GetData())) {
535 return true;
536 } else {
537 // Finish can only be called once all the data is read, otherwise the
538 // underlying gRPC driver hangs.
539 impl::Finish(*stream_, GetData(), true);
540 return false;
541 }
542}
543
544template <typename Request, typename Response>
545template <typename Stub>
546OutputStream<Request, Response>::OutputStream(
547 impl::CallParams&& params, Stub& stub,
548 impl::RawWriterPreparer<Stub, Request, Response> prepare_func)
549 : CallAnyBase(std::move(params)),
550 final_response_(std::make_unique<Response>()) {
551 impl::CallMiddlewares(
552 GetData().GetMiddlewares(), *this,
553 [&] {
554 // 'final_response_' will be filled upon successful 'Finish' async call
555 stream_ =
556 (stub.*prepare_func)(&GetData().GetContext(), final_response_.get(),
557 &GetData().GetQueue());
558 impl::StartCall(*stream_, GetData());
559 },
560 nullptr);
561}
562
563template <typename Request, typename Response>
564bool OutputStream<Request, Response>::Write(const Request& request) {
565 // Don't buffer writes, otherwise in an event subscription scenario, events
566 // may never actually be delivered
567 grpc::WriteOptions write_options{};
568
569 return impl::Write(*stream_, request, write_options, GetData());
570}
571
572template <typename Request, typename Response>
573void OutputStream<Request, Response>::WriteAndCheck(const Request& request) {
574 // Don't buffer writes, otherwise in an event subscription scenario, events
575 // may never actually be delivered
576 grpc::WriteOptions write_options{};
577
578 if (!impl::Write(*stream_, request, write_options, GetData())) {
579 impl::Finish(*stream_, GetData(), true);
580 }
581}
582
583template <typename Request, typename Response>
584Response OutputStream<Request, Response>::Finish() {
585 // gRPC does not implicitly call `WritesDone` in `Finish`,
586 // contrary to the documentation
587 if (!GetData().AreWritesFinished()) {
588 impl::WritesDone(*stream_, GetData());
589 }
590
591 impl::Finish(*stream_, GetData(), true);
592
593 return std::move(*final_response_);
594}
595
596template <typename Request, typename Response>
597template <typename Stub>
598BidirectionalStream<Request, Response>::BidirectionalStream(
599 impl::CallParams&& params, Stub& stub,
600 impl::RawReaderWriterPreparer<Stub, Request, Response> prepare_func)
601 : CallAnyBase(std::move(params)) {
602 impl::CallMiddlewares(
603 GetData().GetMiddlewares(), *this,
604 [&] {
605 stream_ = (stub.*prepare_func)(&GetData().GetContext(),
606 &GetData().GetQueue());
607 impl::StartCall(*stream_, GetData());
608 },
609 nullptr);
610}
611
612template <typename Request, typename Response>
613StreamReadFuture<BidirectionalStream<Request, Response>>
614BidirectionalStream<Request, Response>::ReadAsync(Response& response) noexcept {
615 impl::ReadAsync(*stream_, response, GetData());
616 return StreamReadFuture<BidirectionalStream<Request, Response>>{GetData(),
617 *stream_};
618}
619
620template <typename Request, typename Response>
621bool BidirectionalStream<Request, Response>::Read(Response& response) {
622 auto future = ReadAsync(response);
623 return future.Get();
624}
625
626template <typename Request, typename Response>
627bool BidirectionalStream<Request, Response>::Write(const Request& request) {
628 // Don't buffer writes, optimize for ping-pong-style interaction
629 grpc::WriteOptions write_options{};
630
631 return impl::Write(*stream_, request, write_options, GetData());
632}
633
634template <typename Request, typename Response>
635void BidirectionalStream<Request, Response>::WriteAndCheck(
636 const Request& request) {
637 // Don't buffer writes, optimize for ping-pong-style interaction
638 grpc::WriteOptions write_options{};
639
640 impl::WriteAndCheck(*stream_, request, write_options, GetData());
641}
642
643template <typename Request, typename Response>
644bool BidirectionalStream<Request, Response>::WritesDone() {
645 return impl::WritesDone(*stream_, GetData());
646}
647
648} // namespace ugrpc::client
649
650USERVER_NAMESPACE_END