userver: userver/ugrpc/client/rpc.hpp Source File
⚠️ This is the documentation for an old userver version. Click here to switch to the latest version.
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages Concepts
rpc.hpp
Go to the documentation of this file.
1#pragma once
2
3/// @file userver/ugrpc/client/rpc.hpp
4/// @brief Classes representing an outgoing RPC
5
6#include <memory>
7#include <string_view>
8#include <utility>
9#include <vector>
10
11#include <grpcpp/impl/codegen/proto_utils.h>
12
13#include <userver/dynamic_config/snapshot.hpp>
14#include <userver/utils/assert.hpp>
15#include <userver/utils/function_ref.hpp>
16
17#include <userver/ugrpc/client/exceptions.hpp>
18#include <userver/ugrpc/client/impl/async_methods.hpp>
19#include <userver/ugrpc/client/impl/call_params.hpp>
20#include <userver/ugrpc/client/impl/channel_cache.hpp>
21#include <userver/ugrpc/client/middlewares/fwd.hpp>
22#include <userver/ugrpc/impl/deadline_timepoint.hpp>
23#include <userver/ugrpc/impl/internal_tag_fwd.hpp>
24#include <userver/ugrpc/impl/statistics_scope.hpp>
25
26USERVER_NAMESPACE_BEGIN
27
28namespace ugrpc::client {
29
30/// @brief UnaryFuture for waiting a single response RPC
31class [[nodiscard]] UnaryFuture {
32 public:
33 /// @cond
34 explicit UnaryFuture(impl::RpcData& data) noexcept;
35 /// @endcond
36
37 UnaryFuture(UnaryFuture&&) noexcept = default;
38 UnaryFuture& operator=(UnaryFuture&&) noexcept;
39
40 ~UnaryFuture() noexcept;
41
42 /// @brief Await response
43 ///
44 /// Upon completion result is available in `response` when initiating the
45 /// asynchronous operation, e.g. FinishAsync.
46 ///
47 /// `Get` should not be called multiple times for the same UnaryFuture.
48 ///
49 /// @throws ugrpc::client::RpcError on an RPC error
50 /// @throws ugrpc::client::RpcCancelledError on task cancellation
51 void Get();
52
53 /// @brief Checks if the asynchronous call has completed
54 /// Note, that once user gets result, IsReady should not be called
55 /// @return true if result ready
56 [[nodiscard]] bool IsReady() const noexcept;
57
58 private:
59 impl::FutureImpl impl_;
60};
61
62/// @brief StreamReadFuture for waiting a single read response from stream
63template <typename RPC>
64class [[nodiscard]] StreamReadFuture {
65 public:
66 /// @cond
67 explicit StreamReadFuture(impl::RpcData& data,
68 typename RPC::RawStream& stream) noexcept;
69 /// @endcond
70
71 StreamReadFuture(StreamReadFuture&& other) noexcept = default;
72 StreamReadFuture& operator=(StreamReadFuture&& other) noexcept;
73
74 ~StreamReadFuture() noexcept;
75
76 /// @brief Await response
77 ///
78 /// Upon completion the result is available in `response` that was
79 /// specified when initiating the asynchronous read
80 ///
81 /// `Get` should not be called multiple times for the same StreamReadFuture.
82 ///
83 /// @throws ugrpc::client::RpcError on an RPC error
84 /// @throws ugrpc::client::RpcCancelledError on task cancellation
85 bool Get();
86
87 /// @brief Checks if the asynchronous call has completed
88 /// Note, that once user gets result, IsReady should not be called
89 /// @return true if result ready
90 [[nodiscard]] bool IsReady() const noexcept;
91
92 private:
93 impl::FutureImpl impl_;
94 typename RPC::RawStream* stream_;
95};
96
97/// @brief Base class for any RPC
99 protected:
100 /// @cond
101 CallAnyBase(impl::CallParams&& params)
102 : data_(std::make_unique<impl::RpcData>(std::move(params))) {}
103 /// @endcond
104
105 public:
106 /// @returns the `ClientContext` used for this RPC
107 grpc::ClientContext& GetContext();
108
109 /// @returns client name
111
112 /// @returns RPC name
114
115 /// @returns RPC span
116 tracing::Span& GetSpan();
117
118 /// @cond
119 // For internal use only
120 impl::RpcData& GetData(ugrpc::impl::InternalTag);
121 /// @endcond
122
123 protected:
124 impl::RpcData& GetData();
125
126 private:
127 std::unique_ptr<impl::RpcData> data_;
128};
129
130/// @brief Controls a single request -> single response RPC
131///
132/// This class is not thread-safe except for `GetContext`.
133///
134/// The RPC is cancelled on destruction unless `Finish` or `FinishAsync`. In
135/// that case the connection is not closed (it will be reused for new RPCs), and
136/// the server receives `RpcInterruptedError` immediately.
137template <typename Response>
138class [[nodiscard]] UnaryCall final : public CallAnyBase {
139 public:
140 /// @brief Await and read the response
141 ///
142 /// `Finish` should not be called multiple times for the same RPC.
143 ///
144 /// The connection is not closed, it will be reused for new RPCs.
145 ///
146 /// @returns the response on success
147 /// @throws ugrpc::client::RpcError on an RPC error
148 /// @throws ugrpc::client::RpcCancelledError on task cancellation
149 Response Finish();
150
151 /// @brief Asynchronously finish the call
152 ///
153 /// `FinishAsync` should not be called multiple times for the same RPC.
154 ///
155 /// `Finish` and `FinishAsync` should not be called together for the same RPC.
156 ///
157 /// @returns the future for the single response
158 UnaryFuture FinishAsync(Response& response);
159
160 /// @cond
161 // For internal use only
162 template <typename Stub, typename Request>
163 UnaryCall(
164 impl::CallParams&& params, Stub& stub,
165 impl::RawResponseReaderPreparer<Stub, Request, Response> prepare_func,
166 const Request& req);
167 /// @endcond
168
169 UnaryCall(UnaryCall&&) noexcept = default;
170 UnaryCall& operator=(UnaryCall&&) noexcept = default;
171 ~UnaryCall() = default;
172
173 private:
174 impl::RawResponseReader<Response> reader_;
175};
176
177/// @brief Controls a single request -> response stream RPC
178///
179/// This class is not thread-safe except for `GetContext`.
180///
181/// The RPC is cancelled on destruction unless the stream is closed (`Read` has
182/// returned `false`). In that case the connection is not closed (it will be
183/// reused for new RPCs), and the server receives `RpcInterruptedError`
184/// immediately. gRPC provides no way to early-close a server-streaming RPC
185/// gracefully.
186///
187/// If any method throws, further methods must not be called on the same stream,
188/// except for `GetContext`.
189template <typename Response>
190class [[nodiscard]] InputStream final : public CallAnyBase {
191 public:
192 /// @brief Await and read the next incoming message
193 ///
194 /// On end-of-input, `Finish` is called automatically.
195 ///
196 /// @param response where to put response on success
197 /// @returns `true` on success, `false` on end-of-input or task cancellation
198 /// @throws ugrpc::client::RpcError on an RPC error
199 [[nodiscard]] bool Read(Response& response);
200
201 /// @cond
202 // For internal use only
203 using RawStream = grpc::ClientAsyncReader<Response>;
204
205 template <typename Stub, typename Request>
206 InputStream(impl::CallParams&& params, Stub& stub,
207 impl::RawReaderPreparer<Stub, Request, Response> prepare_func,
208 const Request& req);
209 /// @endcond
210
211 InputStream(InputStream&&) noexcept = default;
212 InputStream& operator=(InputStream&&) noexcept = default;
213 ~InputStream() = default;
214
215 private:
216 std::unique_ptr<impl::RpcData> data_;
217 impl::RawReader<Response> stream_;
218};
219
220/// @brief Controls a request stream -> single response RPC
221///
222/// This class is not thread-safe except for `GetContext`.
223///
224/// The RPC is cancelled on destruction unless `Finish` has been called. In that
225/// case the connection is not closed (it will be reused for new RPCs), and the
226/// server receives `RpcInterruptedError` immediately.
227///
228/// If any method throws, further methods must not be called on the same stream,
229/// except for `GetContext`.
230template <typename Request, typename Response>
231class [[nodiscard]] OutputStream final : public CallAnyBase {
232 public:
233 /// @brief Write the next outgoing message
234 ///
235 /// `Write` doesn't store any references to `request`, so it can be
236 /// deallocated right after the call.
237 ///
238 /// @param request the next message to write
239 /// @return true if the data is going to the wire; false if the write
240 /// operation failed (including due to task cancellation),
241 /// in which case no more writes will be accepted,
242 /// and the error details can be fetched from Finish
243 [[nodiscard]] bool Write(const Request& request);
244
245 /// @brief Write the next outgoing message and check result
246 ///
247 /// `WriteAndCheck` doesn't store any references to `request`, so it can be
248 /// deallocated right after the call.
249 ///
250 /// `WriteAndCheck` verifies result of the write and generates exception
251 /// in case of issues.
252 ///
253 /// @param request the next message to write
254 /// @throws ugrpc::client::RpcError on an RPC error
255 /// @throws ugrpc::client::RpcCancelledError on task cancellation
256 void WriteAndCheck(const Request& request);
257
258 /// @brief Complete the RPC successfully
259 ///
260 /// Should be called once all the data is written. The server will then
261 /// send a single `Response`.
262 ///
263 /// `Finish` should not be called multiple times.
264 ///
265 /// The connection is not closed, it will be reused for new RPCs.
266 ///
267 /// @returns the single `Response` received after finishing the writes
268 /// @throws ugrpc::client::RpcError on an RPC error
269 /// @throws ugrpc::client::RpcCancelledError on task cancellation
270 Response Finish();
271
272 /// @cond
273 // For internal use only
274 using RawStream = grpc::ClientAsyncWriter<Request>;
275
276 template <typename Stub>
277 OutputStream(impl::CallParams&& params, Stub& stub,
278 impl::RawWriterPreparer<Stub, Request, Response> prepare_func);
279 /// @endcond
280
281 OutputStream(OutputStream&&) noexcept = default;
282 OutputStream& operator=(OutputStream&&) noexcept = default;
283 ~OutputStream() = default;
284
285 private:
286 std::unique_ptr<impl::RpcData> data_;
287 std::unique_ptr<Response> final_response_;
288 impl::RawWriter<Request> stream_;
289};
290
291/// @brief Controls a request stream -> response stream RPC
292///
293/// This class allows the following concurrent calls:
294/// - `GetContext`
295/// - Concurrent call of one of (`Read`, `ReadAsync`) with one of (`Write`,
296/// `WritesDone`)
297/// `WriteAndCheck` is not thread-safe
298///
299/// The RPC is cancelled on destruction unless the stream is closed (`Read` has
300/// returned `false`). In that case the connection is not closed (it will be
301/// reused for new RPCs), and the server receives `RpcInterruptedError`
302/// immediately. gRPC provides no way to early-close a server-streaming RPC
303/// gracefully.
304///
305/// `Read` and `AsyncRead` can throw if error status is received from server.
306/// User MUST NOT call `Read` or `AsyncRead` again after failure of any of these
307/// operations.
308///
309/// `Write` and `WritesDone` methods do not throw, but indicate issues with
310/// the RPC by returning `false`.
311///
312/// `WriteAndCheck` is intended for ping-pong scenarios, when after write
313/// operation the user calls `Read` and vice versa.
314///
315/// If `Write` or `WritesDone` returns negative result, the user MUST NOT call
316/// any of these methods anymore.
317/// Instead the user SHOULD call `Read` method until the end of input. If
318/// `Write` or `WritesDone` finishes with negative result, finally `Read`
319/// will throw an exception.
320template <typename Request, typename Response>
321class [[nodiscard]] BidirectionalStream final : public CallAnyBase {
322 public:
323 /// @brief Await and read the next incoming message
324 ///
325 /// On end-of-input, `Finish` is called automatically.
326 ///
327 /// @param response where to put response on success
328 /// @returns `true` on success, `false` on end-of-input or task cancellation
329 /// @throws ugrpc::client::RpcError on an RPC error
330 [[nodiscard]] bool Read(Response& response);
331
332 /// @brief Return future to read next incoming result
333 ///
334 /// @param response where to put response on success
335 /// @return StreamReadFuture future
336 /// @throws ugrpc::client::RpcError on an RPC error
337 StreamReadFuture<BidirectionalStream> ReadAsync(Response& response) noexcept;
338
339 /// @brief Write the next outgoing message
340 ///
341 /// RPC will be performed immediately. No references to `request` are
342 /// saved, so it can be deallocated right after the call.
343 ///
344 /// @param request the next message to write
345 /// @return true if the data is going to the wire; false if the write
346 /// operation failed (including due to task cancellation),
347 /// in which case no more writes will be accepted,
348 /// but Read may still have some data and status code available
349 [[nodiscard]] bool Write(const Request& request);
350
351 /// @brief Write the next outgoing message and check result
352 ///
353 /// `WriteAndCheck` doesn't store any references to `request`, so it can be
354 /// deallocated right after the call.
355 ///
356 /// `WriteAndCheck` verifies result of the write and generates exception
357 /// in case of issues.
358 ///
359 /// @param request the next message to write
360 /// @throws ugrpc::client::RpcError on an RPC error
361 /// @throws ugrpc::client::RpcCancelledError on task cancellation
362 void WriteAndCheck(const Request& request);
363
364 /// @brief Announce end-of-output to the server
365 ///
366 /// Should be called to notify the server and receive the final response(s).
367 ///
368 /// @return true if the data is going to the wire; false if the operation
369 /// failed, but Read may still have some data and status code
370 /// available
371 [[nodiscard]] bool WritesDone();
372
373 /// @cond
374 // For internal use only
375 using RawStream = grpc::ClientAsyncReaderWriter<Request, Response>;
376
377 template <typename Stub>
378 BidirectionalStream(
379 impl::CallParams&& params, Stub& stub,
380 impl::RawReaderWriterPreparer<Stub, Request, Response> prepare_func);
381 /// @endcond
382
383 BidirectionalStream(BidirectionalStream&&) noexcept = default;
384 BidirectionalStream& operator=(BidirectionalStream&&) noexcept = default;
385 ~BidirectionalStream() = default;
386
387 private:
388 std::unique_ptr<impl::RpcData> data_;
389 impl::RawReaderWriter<Request, Response> stream_;
390};
391
392// ========================== Implementation follows ==========================
393
394namespace impl {
395void CallMiddlewares(const Middlewares& mws, CallAnyBase& call,
396 utils::function_ref<void()> user_call,
397 const ::google::protobuf::Message* request);
398} // namespace impl
399
400template <typename RPC>
401StreamReadFuture<RPC>::StreamReadFuture(
402 impl::RpcData& data, typename RPC::RawStream& stream) noexcept
403 : impl_(data), stream_(&stream) {}
404
405template <typename RPC>
406StreamReadFuture<RPC>::~StreamReadFuture() noexcept {
407 if (auto* const data = impl_.GetData()) {
408 impl::RpcData::AsyncMethodInvocationGuard guard(*data);
409 const auto wait_status =
410 impl::Wait(data->GetAsyncMethodInvocation(), data->GetContext());
411 if (wait_status != impl::AsyncMethodInvocation::WaitStatus::kOk) {
412 if (wait_status == impl::AsyncMethodInvocation::WaitStatus::kCancelled) {
413 data->GetStatsScope().OnCancelled();
414 }
415 impl::Finish(*stream_, *data, false);
416 }
417 }
418}
419
420template <typename RPC>
421StreamReadFuture<RPC>& StreamReadFuture<RPC>::operator=(
422 StreamReadFuture<RPC>&& other) noexcept {
423 if (this == &other) return *this;
424 [[maybe_unused]] auto for_destruction = std::move(*this);
425 impl_ = std::move(other.impl_);
426 stream_ = other.stream_;
427 return *this;
428}
429
430template <typename RPC>
431bool StreamReadFuture<RPC>::Get() {
432 auto* const data = impl_.GetData();
433 UINVARIANT(data, "'Get' must be called only once");
434 impl::RpcData::AsyncMethodInvocationGuard guard(*data);
435 impl_.ClearData();
436 const auto result =
437 impl::Wait(data->GetAsyncMethodInvocation(), data->GetContext());
438 if (result == impl::AsyncMethodInvocation::WaitStatus::kCancelled) {
439 data->GetStatsScope().OnCancelled();
440 data->GetStatsScope().Flush();
441 }
442 if (result == impl::AsyncMethodInvocation::WaitStatus::kError) {
443 // Finish can only be called once all the data is read, otherwise the
444 // underlying gRPC driver hangs.
445 impl::Finish(*stream_, *data, true);
446 }
447 return result == impl::AsyncMethodInvocation::WaitStatus::kOk;
448}
449
450template <typename RPC>
451bool StreamReadFuture<RPC>::IsReady() const noexcept {
452 return impl_.IsReady();
453}
454
455template <typename Response>
456template <typename Stub, typename Request>
457UnaryCall<Response>::UnaryCall(
458 impl::CallParams&& params, Stub& stub,
459 impl::RawResponseReaderPreparer<Stub, Request, Response> prepare_func,
460 const Request& req)
461 : CallAnyBase(std::move(params)) {
462 impl::CallMiddlewares(
463 GetData().GetMiddlewares(), *this,
464 [&] {
465 reader_ = (stub.*prepare_func)(&GetData().GetContext(), req,
466 &GetData().GetQueue());
467 reader_->StartCall();
468 },
469 &req);
470 GetData().SetWritesFinished();
471}
472
473template <typename Response>
474Response UnaryCall<Response>::Finish() {
475 Response response;
476 UnaryFuture future = FinishAsync(response);
477 future.Get();
478 return response;
479}
480
481template <typename Response>
482UnaryFuture UnaryCall<Response>::FinishAsync(Response& response) {
483 UASSERT(reader_);
484 PrepareFinish(GetData());
485 GetData().EmplaceFinishAsyncMethodInvocation();
486 auto& finish = GetData().GetFinishAsyncMethodInvocation();
487 auto& status = GetData().GetStatus();
488 reader_->Finish(&response, &status, finish.GetTag());
489 return UnaryFuture{GetData()};
490}
491
492template <typename Response>
493template <typename Stub, typename Request>
494InputStream<Response>::InputStream(
495 impl::CallParams&& params, Stub& stub,
496 impl::RawReaderPreparer<Stub, Request, Response> prepare_func,
497 const Request& req)
498 : CallAnyBase(std::move(params)) {
499 impl::CallMiddlewares(
500 GetData().GetMiddlewares(), *this,
501 [&] {
502 stream_ = (stub.*prepare_func)(&GetData().GetContext(), req,
503 &GetData().GetQueue());
504 impl::StartCall(*stream_, GetData());
505 },
506 &req);
507 GetData().SetWritesFinished();
508}
509
510template <typename Response>
511bool InputStream<Response>::Read(Response& response) {
512 if (impl::Read(*stream_, response, GetData())) {
513 return true;
514 } else {
515 // Finish can only be called once all the data is read, otherwise the
516 // underlying gRPC driver hangs.
517 impl::Finish(*stream_, GetData(), true);
518 return false;
519 }
520}
521
522template <typename Request, typename Response>
523template <typename Stub>
524OutputStream<Request, Response>::OutputStream(
525 impl::CallParams&& params, Stub& stub,
526 impl::RawWriterPreparer<Stub, Request, Response> prepare_func)
527 : CallAnyBase(std::move(params)),
528 final_response_(std::make_unique<Response>()) {
529 impl::CallMiddlewares(
530 GetData().GetMiddlewares(), *this,
531 [&] {
532 // 'final_response_' will be filled upon successful 'Finish' async call
533 stream_ =
534 (stub.*prepare_func)(&GetData().GetContext(), final_response_.get(),
535 &GetData().GetQueue());
536 impl::StartCall(*stream_, GetData());
537 },
538 nullptr);
539}
540
541template <typename Request, typename Response>
542bool OutputStream<Request, Response>::Write(const Request& request) {
543 // Don't buffer writes, otherwise in an event subscription scenario, events
544 // may never actually be delivered
545 grpc::WriteOptions write_options{};
546
547 return impl::Write(*stream_, request, write_options, GetData());
548}
549
550template <typename Request, typename Response>
551void OutputStream<Request, Response>::WriteAndCheck(const Request& request) {
552 // Don't buffer writes, otherwise in an event subscription scenario, events
553 // may never actually be delivered
554 grpc::WriteOptions write_options{};
555
556 if (!impl::Write(*stream_, request, write_options, GetData())) {
557 impl::Finish(*stream_, GetData(), true);
558 }
559}
560
561template <typename Request, typename Response>
562Response OutputStream<Request, Response>::Finish() {
563 // gRPC does not implicitly call `WritesDone` in `Finish`,
564 // contrary to the documentation
565 if (!GetData().AreWritesFinished()) {
566 impl::WritesDone(*stream_, GetData());
567 }
568
569 impl::Finish(*stream_, GetData(), true);
570
571 return std::move(*final_response_);
572}
573
574template <typename Request, typename Response>
575template <typename Stub>
576BidirectionalStream<Request, Response>::BidirectionalStream(
577 impl::CallParams&& params, Stub& stub,
578 impl::RawReaderWriterPreparer<Stub, Request, Response> prepare_func)
579 : CallAnyBase(std::move(params)) {
580 impl::CallMiddlewares(
581 GetData().GetMiddlewares(), *this,
582 [&] {
583 stream_ = (stub.*prepare_func)(&GetData().GetContext(),
584 &GetData().GetQueue());
585 impl::StartCall(*stream_, GetData());
586 },
587 nullptr);
588}
589
590template <typename Request, typename Response>
591StreamReadFuture<BidirectionalStream<Request, Response>>
592BidirectionalStream<Request, Response>::ReadAsync(Response& response) noexcept {
593 impl::ReadAsync(*stream_, response, GetData());
594 return StreamReadFuture<BidirectionalStream<Request, Response>>{GetData(),
595 *stream_};
596}
597
598template <typename Request, typename Response>
599bool BidirectionalStream<Request, Response>::Read(Response& response) {
600 auto future = ReadAsync(response);
601 return future.Get();
602}
603
604template <typename Request, typename Response>
605bool BidirectionalStream<Request, Response>::Write(const Request& request) {
606 // Don't buffer writes, optimize for ping-pong-style interaction
607 grpc::WriteOptions write_options{};
608
609 return impl::Write(*stream_, request, write_options, GetData());
610}
611
612template <typename Request, typename Response>
613void BidirectionalStream<Request, Response>::WriteAndCheck(
614 const Request& request) {
615 // Don't buffer writes, optimize for ping-pong-style interaction
616 grpc::WriteOptions write_options{};
617
618 impl::WriteAndCheck(*stream_, request, write_options, GetData());
619}
620
621template <typename Request, typename Response>
622bool BidirectionalStream<Request, Response>::WritesDone() {
623 return impl::WritesDone(*stream_, GetData());
624}
625
626} // namespace ugrpc::client
627
628USERVER_NAMESPACE_END