userver: userver/ugrpc/client/rpc.hpp Source File
Loading...
Searching...
No Matches
rpc.hpp
Go to the documentation of this file.
1#pragma once
2
3/// @file userver/ugrpc/client/rpc.hpp
4/// @brief Classes representing an outgoing RPC
5
6#include <memory>
7#include <string_view>
8#include <utility>
9#include <vector>
10
11#include <grpcpp/impl/codegen/proto_utils.h>
12
13#include <userver/dynamic_config/snapshot.hpp>
14#include <userver/engine/deadline.hpp>
15#include <userver/engine/future_status.hpp>
16#include <userver/utils/assert.hpp>
17#include <userver/utils/function_ref.hpp>
18
19#include <userver/ugrpc/client/exceptions.hpp>
20#include <userver/ugrpc/client/impl/async_methods.hpp>
21#include <userver/ugrpc/client/impl/call_params.hpp>
22#include <userver/ugrpc/client/impl/channel_cache.hpp>
23#include <userver/ugrpc/client/middlewares/fwd.hpp>
24#include <userver/ugrpc/impl/deadline_timepoint.hpp>
25#include <userver/ugrpc/impl/internal_tag_fwd.hpp>
26#include <userver/ugrpc/impl/statistics_scope.hpp>
27
28USERVER_NAMESPACE_BEGIN
29
30namespace ugrpc::client {
31
32/// @brief UnaryFuture for waiting a single response RPC
33class [[nodiscard]] UnaryFuture {
34 public:
35 /// @cond
36 explicit UnaryFuture(impl::RpcData& data) noexcept;
37 /// @endcond
38
39 UnaryFuture(UnaryFuture&&) noexcept = default;
40 UnaryFuture& operator=(UnaryFuture&&) noexcept;
41 UnaryFuture(const UnaryFuture&) = delete;
42 UnaryFuture& operator=(const UnaryFuture&) = delete;
43
44 ~UnaryFuture() noexcept;
45
46 /// @brief Await response
47 ///
48 /// Upon completion result is available in `response` when initiating the
49 /// asynchronous operation, e.g. FinishAsync.
50 ///
51 /// `Get` should not be called multiple times for the same UnaryFuture.
52 ///
53 /// @throws ugrpc::client::RpcError on an RPC error
54 /// @throws ugrpc::client::RpcCancelledError on task cancellation
55 void Get();
56
57 /// @brief Await response until specified timepoint
58 ///
59 /// Once `kReady` is returned, result is available in `response` when
60 /// initiating the asynchronous operation, e.g. FinishAsync.
61 ///
62 /// In case of 'kReady/kCancelled' answer or exception `Get` should not
63 /// be called anymore.
64 ///
65 /// @throws ugrpc::client::RpcError on an RPC error
66 [[nodiscard]] engine::FutureStatus Get(engine::Deadline deadline);
67
68 /// @brief Checks if the asynchronous call has completed
69 /// Note, that once user gets result, IsReady should not be called
70 /// @return true if result ready
71 [[nodiscard]] bool IsReady() const noexcept;
72
73 /// @cond
74 // For internal use only.
75 engine::impl::ContextAccessor* TryGetContextAccessor() noexcept;
76 /// @endcond
77 private:
78 impl::FutureImpl impl_;
79};
80
81/// @brief StreamReadFuture for waiting a single read response from stream
82template <typename RPC>
83class [[nodiscard]] StreamReadFuture {
84 public:
85 /// @cond
86 explicit StreamReadFuture(impl::RpcData& data,
87 typename RPC::RawStream& stream) noexcept;
88 /// @endcond
89
90 StreamReadFuture(StreamReadFuture&& other) noexcept = default;
91 StreamReadFuture& operator=(StreamReadFuture&& other) noexcept;
92
93 ~StreamReadFuture() noexcept;
94
95 /// @brief Await response
96 ///
97 /// Upon completion the result is available in `response` that was
98 /// specified when initiating the asynchronous read
99 ///
100 /// `Get` should not be called multiple times for the same StreamReadFuture.
101 ///
102 /// @throws ugrpc::client::RpcError on an RPC error
103 /// @throws ugrpc::client::RpcCancelledError on task cancellation
104 bool Get();
105
106 /// @brief Checks if the asynchronous call has completed
107 /// Note, that once user gets result, IsReady should not be called
108 /// @return true if result ready
109 [[nodiscard]] bool IsReady() const noexcept;
110
111 private:
112 impl::FutureImpl impl_;
113 typename RPC::RawStream* stream_;
114};
115
116/// @brief Base class for any RPC
118 protected:
119 /// @cond
120 CallAnyBase(impl::CallParams&& params)
121 : data_(std::make_unique<impl::RpcData>(std::move(params))) {}
122 /// @endcond
123
124 public:
125 /// @returns the `ClientContext` used for this RPC
126 grpc::ClientContext& GetContext();
127
128 /// @returns client name
130
131 /// @returns RPC name
133
134 /// @returns RPC span
135 tracing::Span& GetSpan();
136
137 /// @cond
138 // For internal use only
139 impl::RpcData& GetData(ugrpc::impl::InternalTag);
140 /// @endcond
141
142 protected:
143 impl::RpcData& GetData();
144
145 private:
146 std::unique_ptr<impl::RpcData> data_;
147};
148
149/// @brief Controls a single request -> single response RPC
150///
151/// This class is not thread-safe except for `GetContext`.
152///
153/// The RPC is cancelled on destruction unless `Finish` or `FinishAsync`. In
154/// that case the connection is not closed (it will be reused for new RPCs), and
155/// the server receives `RpcInterruptedError` immediately.
156template <typename Response>
157class [[nodiscard]] UnaryCall final : public CallAnyBase {
158 public:
159 /// @brief Await and read the response
160 ///
161 /// `Finish` should not be called multiple times for the same RPC.
162 ///
163 /// The connection is not closed, it will be reused for new RPCs.
164 ///
165 /// @returns the response on success
166 /// @throws ugrpc::client::RpcError on an RPC error
167 /// @throws ugrpc::client::RpcCancelledError on task cancellation
168 Response Finish();
169
170 /// @brief Asynchronously finish the call
171 ///
172 /// `FinishAsync` should not be called multiple times for the same RPC.
173 ///
174 /// `Finish` and `FinishAsync` should not be called together for the same RPC.
175 ///
176 /// @returns the future for the single response
177 UnaryFuture FinishAsync(Response& response);
178
179 /// @cond
180 // For internal use only
181 template <typename Stub, typename Request>
182 UnaryCall(
183 impl::CallParams&& params, Stub& stub,
184 impl::RawResponseReaderPreparer<Stub, Request, Response> prepare_func,
185 const Request& req);
186 /// @endcond
187
188 UnaryCall(UnaryCall&&) noexcept = default;
189 UnaryCall& operator=(UnaryCall&&) noexcept = default;
190 ~UnaryCall() = default;
191
192 private:
193 impl::RawResponseReader<Response> reader_;
194};
195
196/// @brief Controls a single request -> response stream RPC
197///
198/// This class is not thread-safe except for `GetContext`.
199///
200/// The RPC is cancelled on destruction unless the stream is closed (`Read` has
201/// returned `false`). In that case the connection is not closed (it will be
202/// reused for new RPCs), and the server receives `RpcInterruptedError`
203/// immediately. gRPC provides no way to early-close a server-streaming RPC
204/// gracefully.
205///
206/// If any method throws, further methods must not be called on the same stream,
207/// except for `GetContext`.
208template <typename Response>
209class [[nodiscard]] InputStream final : public CallAnyBase {
210 public:
211 /// @brief Await and read the next incoming message
212 ///
213 /// On end-of-input, `Finish` is called automatically.
214 ///
215 /// @param response where to put response on success
216 /// @returns `true` on success, `false` on end-of-input or task cancellation
217 /// @throws ugrpc::client::RpcError on an RPC error
218 [[nodiscard]] bool Read(Response& response);
219
220 /// @cond
221 // For internal use only
222 using RawStream = grpc::ClientAsyncReader<Response>;
223
224 template <typename Stub, typename Request>
225 InputStream(impl::CallParams&& params, Stub& stub,
226 impl::RawReaderPreparer<Stub, Request, Response> prepare_func,
227 const Request& req);
228 /// @endcond
229
230 InputStream(InputStream&&) noexcept = default;
231 InputStream& operator=(InputStream&&) noexcept = default;
232 ~InputStream() = default;
233
234 private:
235 impl::RawReader<Response> stream_;
236};
237
238/// @brief Controls a request stream -> single response RPC
239///
240/// This class is not thread-safe except for `GetContext`.
241///
242/// The RPC is cancelled on destruction unless `Finish` has been called. In that
243/// case the connection is not closed (it will be reused for new RPCs), and the
244/// server receives `RpcInterruptedError` immediately.
245///
246/// If any method throws, further methods must not be called on the same stream,
247/// except for `GetContext`.
248template <typename Request, typename Response>
249class [[nodiscard]] OutputStream final : public CallAnyBase {
250 public:
251 /// @brief Write the next outgoing message
252 ///
253 /// `Write` doesn't store any references to `request`, so it can be
254 /// deallocated right after the call.
255 ///
256 /// @param request the next message to write
257 /// @return true if the data is going to the wire; false if the write
258 /// operation failed (including due to task cancellation),
259 /// in which case no more writes will be accepted,
260 /// and the error details can be fetched from Finish
261 [[nodiscard]] bool Write(const Request& request);
262
263 /// @brief Write the next outgoing message and check result
264 ///
265 /// `WriteAndCheck` doesn't store any references to `request`, so it can be
266 /// deallocated right after the call.
267 ///
268 /// `WriteAndCheck` verifies result of the write and generates exception
269 /// in case of issues.
270 ///
271 /// @param request the next message to write
272 /// @throws ugrpc::client::RpcError on an RPC error
273 /// @throws ugrpc::client::RpcCancelledError on task cancellation
274 void WriteAndCheck(const Request& request);
275
276 /// @brief Complete the RPC successfully
277 ///
278 /// Should be called once all the data is written. The server will then
279 /// send a single `Response`.
280 ///
281 /// `Finish` should not be called multiple times.
282 ///
283 /// The connection is not closed, it will be reused for new RPCs.
284 ///
285 /// @returns the single `Response` received after finishing the writes
286 /// @throws ugrpc::client::RpcError on an RPC error
287 /// @throws ugrpc::client::RpcCancelledError on task cancellation
288 Response Finish();
289
290 /// @cond
291 // For internal use only
292 using RawStream = grpc::ClientAsyncWriter<Request>;
293
294 template <typename Stub>
295 OutputStream(impl::CallParams&& params, Stub& stub,
296 impl::RawWriterPreparer<Stub, Request, Response> prepare_func);
297 /// @endcond
298
299 OutputStream(OutputStream&&) noexcept = default;
300 OutputStream& operator=(OutputStream&&) noexcept = default;
301 ~OutputStream() = default;
302
303 private:
304 std::unique_ptr<Response> final_response_;
305 impl::RawWriter<Request> stream_;
306};
307
308/// @brief Controls a request stream -> response stream RPC
309///
310/// It is safe to call the following methods from different coroutines:
311///
312/// - `GetContext`;
313/// - one of (`Read`, `ReadAsync`);
314/// - one of (`Write`, `WritesDone`).
315///
316/// `WriteAndCheck` is NOT thread-safe.
317///
318/// The RPC is cancelled on destruction unless the stream is closed (`Read` has
319/// returned `false`). In that case the connection is not closed (it will be
320/// reused for new RPCs), and the server receives `RpcInterruptedError`
321/// immediately. gRPC provides no way to early-close a server-streaming RPC
322/// gracefully.
323///
324/// `Read` and `AsyncRead` can throw if error status is received from server.
325/// User MUST NOT call `Read` or `AsyncRead` again after failure of any of these
326/// operations.
327///
328/// `Write` and `WritesDone` methods do not throw, but indicate issues with
329/// the RPC by returning `false`.
330///
331/// `WriteAndCheck` is intended for ping-pong scenarios, when after write
332/// operation the user calls `Read` and vice versa.
333///
334/// If `Write` or `WritesDone` returns negative result, the user MUST NOT call
335/// any of these methods anymore.
336/// Instead the user SHOULD call `Read` method until the end of input. If
337/// `Write` or `WritesDone` finishes with negative result, finally `Read`
338/// will throw an exception.
339/// ## Usage example:
340///
341/// @snippet grpc/tests/src/stream_test.cpp concurrent bidirectional stream
342///
343template <typename Request, typename Response>
344class [[nodiscard]] BidirectionalStream final : public CallAnyBase {
345 public:
346 /// @brief Await and read the next incoming message
347 ///
348 /// On end-of-input, `Finish` is called automatically.
349 ///
350 /// @param response where to put response on success
351 /// @returns `true` on success, `false` on end-of-input or task cancellation
352 /// @throws ugrpc::client::RpcError on an RPC error
353 [[nodiscard]] bool Read(Response& response);
354
355 /// @brief Return future to read next incoming result
356 ///
357 /// @param response where to put response on success
358 /// @return StreamReadFuture future
359 /// @throws ugrpc::client::RpcError on an RPC error
360 StreamReadFuture<BidirectionalStream> ReadAsync(Response& response) noexcept;
361
362 /// @brief Write the next outgoing message
363 ///
364 /// RPC will be performed immediately. No references to `request` are
365 /// saved, so it can be deallocated right after the call.
366 ///
367 /// @param request the next message to write
368 /// @return true if the data is going to the wire; false if the write
369 /// operation failed (including due to task cancellation),
370 /// in which case no more writes will be accepted,
371 /// but Read may still have some data and status code available
372 [[nodiscard]] bool Write(const Request& request);
373
374 /// @brief Write the next outgoing message and check result
375 ///
376 /// `WriteAndCheck` doesn't store any references to `request`, so it can be
377 /// deallocated right after the call.
378 ///
379 /// `WriteAndCheck` verifies result of the write and generates exception
380 /// in case of issues.
381 ///
382 /// @param request the next message to write
383 /// @throws ugrpc::client::RpcError on an RPC error
384 /// @throws ugrpc::client::RpcCancelledError on task cancellation
385 void WriteAndCheck(const Request& request);
386
387 /// @brief Announce end-of-output to the server
388 ///
389 /// Should be called to notify the server and receive the final response(s).
390 ///
391 /// @return true if the data is going to the wire; false if the operation
392 /// failed, but Read may still have some data and status code
393 /// available
394 [[nodiscard]] bool WritesDone();
395
396 /// @cond
397 // For internal use only
398 using RawStream = grpc::ClientAsyncReaderWriter<Request, Response>;
399
400 template <typename Stub>
401 BidirectionalStream(
402 impl::CallParams&& params, Stub& stub,
403 impl::RawReaderWriterPreparer<Stub, Request, Response> prepare_func);
404 /// @endcond
405
406 BidirectionalStream(BidirectionalStream&&) noexcept = default;
407 BidirectionalStream& operator=(BidirectionalStream&&) noexcept = default;
408 ~BidirectionalStream() = default;
409
410 private:
411 impl::RawReaderWriter<Request, Response> stream_;
412};
413
414// ========================== Implementation follows ==========================
415
416namespace impl {
417void CallMiddlewares(const Middlewares& mws, CallAnyBase& call,
418 utils::function_ref<void()> user_call,
419 const ::google::protobuf::Message* request);
420} // namespace impl
421
422template <typename RPC>
423StreamReadFuture<RPC>::StreamReadFuture(
424 impl::RpcData& data, typename RPC::RawStream& stream) noexcept
425 : impl_(data), stream_(&stream) {}
426
427template <typename RPC>
428StreamReadFuture<RPC>::~StreamReadFuture() noexcept {
429 if (auto* const data = impl_.GetData()) {
430 impl::RpcData::AsyncMethodInvocationGuard guard(*data);
431 const auto wait_status =
432 impl::Wait(data->GetAsyncMethodInvocation(), data->GetContext());
433 if (wait_status != impl::AsyncMethodInvocation::WaitStatus::kOk) {
434 if (wait_status == impl::AsyncMethodInvocation::WaitStatus::kCancelled) {
435 data->GetStatsScope().OnCancelled();
436 }
437 impl::Finish(*stream_, *data, false);
438 }
439 }
440}
441
442template <typename RPC>
443StreamReadFuture<RPC>& StreamReadFuture<RPC>::operator=(
444 StreamReadFuture<RPC>&& other) noexcept {
445 if (this == &other) return *this;
446 [[maybe_unused]] auto for_destruction = std::move(*this);
447 impl_ = std::move(other.impl_);
448 stream_ = other.stream_;
449 return *this;
450}
451
452template <typename RPC>
453bool StreamReadFuture<RPC>::Get() {
454 auto* const data = impl_.GetData();
455 UINVARIANT(data, "'Get' must be called only once");
456 impl::RpcData::AsyncMethodInvocationGuard guard(*data);
457 impl_.ClearData();
458 const auto result =
459 impl::Wait(data->GetAsyncMethodInvocation(), data->GetContext());
460 if (result == impl::AsyncMethodInvocation::WaitStatus::kCancelled) {
461 data->GetStatsScope().OnCancelled();
462 data->GetStatsScope().Flush();
463 }
464 if (result == impl::AsyncMethodInvocation::WaitStatus::kError) {
465 // Finish can only be called once all the data is read, otherwise the
466 // underlying gRPC driver hangs.
467 impl::Finish(*stream_, *data, true);
468 }
469 return result == impl::AsyncMethodInvocation::WaitStatus::kOk;
470}
471
472template <typename RPC>
473bool StreamReadFuture<RPC>::IsReady() const noexcept {
474 return impl_.IsReady();
475}
476
477template <typename Response>
478template <typename Stub, typename Request>
479UnaryCall<Response>::UnaryCall(
480 impl::CallParams&& params, Stub& stub,
481 impl::RawResponseReaderPreparer<Stub, Request, Response> prepare_func,
482 const Request& req)
483 : CallAnyBase(std::move(params)) {
484 impl::CallMiddlewares(
485 GetData().GetMiddlewares(), *this,
486 [&] {
487 reader_ = (stub.*prepare_func)(&GetData().GetContext(), req,
488 &GetData().GetQueue());
489 reader_->StartCall();
490 },
491 &req);
492 GetData().SetWritesFinished();
493}
494
495template <typename Response>
496Response UnaryCall<Response>::Finish() {
497 Response response;
498 UnaryFuture future = FinishAsync(response);
499 future.Get();
500 return response;
501}
502
503template <typename Response>
504UnaryFuture UnaryCall<Response>::FinishAsync(Response& response) {
505 UASSERT(reader_);
506 PrepareFinish(GetData());
507 GetData().EmplaceFinishAsyncMethodInvocation();
508 auto& finish = GetData().GetFinishAsyncMethodInvocation();
509 auto& status = GetData().GetStatus();
510 reader_->Finish(&response, &status, finish.GetTag());
511 return UnaryFuture{GetData()};
512}
513
514template <typename Response>
515template <typename Stub, typename Request>
516InputStream<Response>::InputStream(
517 impl::CallParams&& params, Stub& stub,
518 impl::RawReaderPreparer<Stub, Request, Response> prepare_func,
519 const Request& req)
520 : CallAnyBase(std::move(params)) {
521 impl::CallMiddlewares(
522 GetData().GetMiddlewares(), *this,
523 [&] {
524 stream_ = (stub.*prepare_func)(&GetData().GetContext(), req,
525 &GetData().GetQueue());
526 impl::StartCall(*stream_, GetData());
527 },
528 &req);
529 GetData().SetWritesFinished();
530}
531
532template <typename Response>
533bool InputStream<Response>::Read(Response& response) {
534 if (impl::Read(*stream_, response, GetData())) {
535 return true;
536 } else {
537 // Finish can only be called once all the data is read, otherwise the
538 // underlying gRPC driver hangs.
539 impl::Finish(*stream_, GetData(), true);
540 return false;
541 }
542}
543
544template <typename Request, typename Response>
545template <typename Stub>
546OutputStream<Request, Response>::OutputStream(
547 impl::CallParams&& params, Stub& stub,
548 impl::RawWriterPreparer<Stub, Request, Response> prepare_func)
549 : CallAnyBase(std::move(params)),
550 final_response_(std::make_unique<Response>()) {
551 impl::CallMiddlewares(
552 GetData().GetMiddlewares(), *this,
553 [&] {
554 // 'final_response_' will be filled upon successful 'Finish' async call
555 stream_ =
556 (stub.*prepare_func)(&GetData().GetContext(), final_response_.get(),
557 &GetData().GetQueue());
558 impl::StartCall(*stream_, GetData());
559 },
560 nullptr);
561}
562
563template <typename Request, typename Response>
564bool OutputStream<Request, Response>::Write(const Request& request) {
565 // Don't buffer writes, otherwise in an event subscription scenario, events
566 // may never actually be delivered
567 grpc::WriteOptions write_options{};
568
569 return impl::Write(*stream_, request, write_options, GetData());
570}
571
572template <typename Request, typename Response>
573void OutputStream<Request, Response>::WriteAndCheck(const Request& request) {
574 // Don't buffer writes, otherwise in an event subscription scenario, events
575 // may never actually be delivered
576 grpc::WriteOptions write_options{};
577
578 if (!impl::Write(*stream_, request, write_options, GetData())) {
579 impl::Finish(*stream_, GetData(), true);
580 }
581}
582
583template <typename Request, typename Response>
584Response OutputStream<Request, Response>::Finish() {
585 // gRPC does not implicitly call `WritesDone` in `Finish`,
586 // contrary to the documentation
587 if (!GetData().AreWritesFinished()) {
588 impl::WritesDone(*stream_, GetData());
589 }
590
591 impl::Finish(*stream_, GetData(), true);
592
593 return std::move(*final_response_);
594}
595
596template <typename Request, typename Response>
597template <typename Stub>
598BidirectionalStream<Request, Response>::BidirectionalStream(
599 impl::CallParams&& params, Stub& stub,
600 impl::RawReaderWriterPreparer<Stub, Request, Response> prepare_func)
601 : CallAnyBase(std::move(params)) {
602 impl::CallMiddlewares(
603 GetData().GetMiddlewares(), *this,
604 [&] {
605 stream_ = (stub.*prepare_func)(&GetData().GetContext(),
606 &GetData().GetQueue());
607 impl::StartCall(*stream_, GetData());
608 },
609 nullptr);
610}
611
612template <typename Request, typename Response>
613StreamReadFuture<BidirectionalStream<Request, Response>>
614BidirectionalStream<Request, Response>::ReadAsync(Response& response) noexcept {
615 impl::ReadAsync(*stream_, response, GetData());
616 return StreamReadFuture<BidirectionalStream<Request, Response>>{GetData(),
617 *stream_};
618}
619
620template <typename Request, typename Response>
621bool BidirectionalStream<Request, Response>::Read(Response& response) {
622 auto future = ReadAsync(response);
623 return future.Get();
624}
625
626template <typename Request, typename Response>
627bool BidirectionalStream<Request, Response>::Write(const Request& request) {
628 // Don't buffer writes, optimize for ping-pong-style interaction
629 grpc::WriteOptions write_options{};
630
631 return impl::Write(*stream_, request, write_options, GetData());
632}
633
634template <typename Request, typename Response>
635void BidirectionalStream<Request, Response>::WriteAndCheck(
636 const Request& request) {
637 // Don't buffer writes, optimize for ping-pong-style interaction
638 grpc::WriteOptions write_options{};
639
640 impl::WriteAndCheck(*stream_, request, write_options, GetData());
641}
642
643template <typename Request, typename Response>
644bool BidirectionalStream<Request, Response>::WritesDone() {
645 return impl::WritesDone(*stream_, GetData());
646}
647
648} // namespace ugrpc::client
649
650USERVER_NAMESPACE_END