userver: userver/ugrpc/client/rpc.hpp Source File
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages Concepts
rpc.hpp
Go to the documentation of this file.
1#pragma once
2
3/// @file userver/ugrpc/client/rpc.hpp
4/// @brief Classes representing an outgoing RPC
5
6#include <exception>
7#include <memory>
8#include <string_view>
9#include <utility>
10
11#include <grpcpp/impl/codegen/proto_utils.h>
12
13#include <userver/engine/deadline.hpp>
14#include <userver/engine/future_status.hpp>
15#include <userver/utils/assert.hpp>
16
17#include <userver/ugrpc/client/call.hpp>
18#include <userver/ugrpc/client/impl/async_methods.hpp>
19#include <userver/ugrpc/client/impl/call_state.hpp>
20#include <userver/ugrpc/client/impl/middleware_pipeline.hpp>
21#include <userver/ugrpc/client/impl/prepare_call.hpp>
22#include <userver/ugrpc/client/middlewares/fwd.hpp>
23#include <userver/ugrpc/time_utils.hpp>
24
25USERVER_NAMESPACE_BEGIN
26
27namespace ugrpc::client {
28
29namespace impl {
30
31// Contains the implementation of UnaryFinishFuture that is not dependent on template parameters.
32class UnaryFinishFutureImpl {
33public:
34 UnaryFinishFutureImpl(CallState& state, const google::protobuf::Message* response) noexcept;
35
36 UnaryFinishFutureImpl(UnaryFinishFutureImpl&&) noexcept;
37 UnaryFinishFutureImpl& operator=(UnaryFinishFutureImpl&&) noexcept;
38 UnaryFinishFutureImpl(const UnaryFinishFutureImpl&) = delete;
39 UnaryFinishFutureImpl& operator=(const UnaryFinishFutureImpl&) = delete;
40 ~UnaryFinishFutureImpl();
41
42 [[nodiscard]] bool IsReady() const noexcept;
43
44 [[nodiscard]] engine::FutureStatus WaitUntil(engine::Deadline deadline) const noexcept;
45
46 void Get();
47
48 engine::impl::ContextAccessor* TryGetContextAccessor() noexcept;
49
50private:
51 CallState* state_{};
52 const google::protobuf::Message* response_;
53 mutable std::exception_ptr exception_;
54};
55
56/// @brief UnaryFuture for awaiting a single response RPC
57template <typename Response>
58class [[nodiscard]] UnaryFinishFuture {
59public:
60 /// @cond
61 UnaryFinishFuture(CallState& state, std::unique_ptr<Response>&& response) noexcept
62 : response_(std::move(response)), impl_(state, impl::ToBaseMessage(*response_)) {}
63 /// @endcond
64
65 UnaryFinishFuture(UnaryFinishFuture&&) noexcept = default;
66 UnaryFinishFuture& operator=(UnaryFinishFuture&&) noexcept = default;
67 UnaryFinishFuture(const UnaryFinishFuture&) = delete;
68 UnaryFinishFuture& operator=(const UnaryFinishFuture&) = delete;
69
70 /// @brief Checks if the asynchronous call has completed
71 /// Note, that once user gets result, IsReady should not be called
72 /// @return true if result ready
73 [[nodiscard]] bool IsReady() const noexcept { return impl_.IsReady(); }
74
75 /// @brief Await response until the deadline is reached or until the task is cancelled.
76 ///
77 /// Upon completion result is available in `response` when initiating the
78 /// asynchronous operation, e.g. FinishAsync.
79 [[nodiscard]] engine::FutureStatus WaitUntil(engine::Deadline deadline) const noexcept {
80 return impl_.WaitUntil(deadline);
81 }
82
83 /// @brief Await response
84 ///
85 /// Upon completion result is available in `response` when initiating the
86 /// asynchronous operation, e.g. FinishAsync.
87 ///
88 /// Invalidates the future: no further methods can be called on it.
89 ///
90 /// @throws ugrpc::client::RpcError on an RPC error
91 /// @throws ugrpc::client::RpcCancelledError on task cancellation
92 Response Get() {
93 UASSERT(response_);
94 impl_.Get();
95 return std::move(*response_);
96 }
97
98 /// @cond
99 // For internal use only.
100 engine::impl::ContextAccessor* TryGetContextAccessor() noexcept { return impl_.TryGetContextAccessor(); }
101 /// @endcond
102
103private:
104 std::unique_ptr<Response> response_;
105 UnaryFinishFutureImpl impl_;
106};
107
108} // namespace impl
109
110/// @brief StreamReadFuture for waiting a single read response from stream
111template <typename RPC>
112class [[nodiscard]] StreamReadFuture {
113public:
114 /// @cond
115 StreamReadFuture(
116 impl::CallState& state,
117 typename RPC::RawStream& stream,
118 const google::protobuf::Message* recv_message
119 ) noexcept;
120 /// @endcond
121
122 StreamReadFuture(StreamReadFuture&& other) noexcept;
123 StreamReadFuture& operator=(StreamReadFuture&& other) noexcept;
124 StreamReadFuture(const StreamReadFuture&) = delete;
125 StreamReadFuture& operator=(const StreamReadFuture&) = delete;
126
127 ~StreamReadFuture();
128
129 /// @brief Await response
130 ///
131 /// Upon completion the result is available in `response` that was
132 /// specified when initiating the asynchronous read
133 ///
134 /// `Get` should not be called multiple times for the same StreamReadFuture.
135 ///
136 /// @throws ugrpc::client::RpcError on an RPC error
137 /// @throws ugrpc::client::RpcCancelledError on task cancellation
138 bool Get();
139
140 /// @brief Checks if the asynchronous call has completed
141 /// Note, that once user gets result, IsReady should not be called
142 /// @return true if result ready
143 [[nodiscard]] bool IsReady() const noexcept;
144
145private:
146 impl::CallState* state_{};
147 typename RPC::RawStream* stream_{};
148 const google::protobuf::Message* recv_message_;
149};
150
151namespace impl {
152
153/// @brief Controls a single request -> single response RPC
154///
155/// This class is not thread-safe, it cannot be used from multiple tasks at the same time.
156///
157/// The RPC is cancelled on destruction unless the RPC is already finished. In
158/// that case the connection is not closed (it will be reused for new RPCs), and
159/// the server receives `RpcInterruptedError` immediately.
160template <typename Response>
161class [[nodiscard]] UnaryCall final : public CallAnyBase {
162 // Implementation note. For consistency with other RPC objects, UnaryCall should have been exposed to the user
163 // directly. However, in 90-99% use cases it's more intuitive to treat the RPC as a future (see ResponseFuture).
164 // If we decide to expose more controls, like lazy Finish or ReadInitialMetadata, then they should be added
165 // to UnaryCall, and UnaryCall should be exposed via ResponseFuture::GetCall.
166
167public:
168 /// @brief Asynchronously finish the call
169 ///
170 /// `FinishAsync` should not be called multiple times for the same RPC.
171 ///
172 /// Creates the future inside this `UnaryCall`. It can be retrieved using @ref GetFinishFuture.
173 void FinishAsync();
174
175 /// @brief Returns the future created earlier using @ref FinishAsync.
176 UnaryFinishFuture<Response>& GetFinishFuture();
177
178 /// @overload
179 const UnaryFinishFuture<Response>& GetFinishFuture() const;
180
181 /// @cond
182 // For internal use only
183 template <typename Stub, typename Request>
184 UnaryCall(
185 CallParams&& params,
186 PrepareUnaryCallProxy<Stub, Request, Response> prepare_unary_call,
187 const Request& request
188 );
189 /// @endcond
190
191 UnaryCall(UnaryCall&&) noexcept = default;
192 UnaryCall& operator=(UnaryCall&&) noexcept = default;
193 ~UnaryCall() = default;
194
195private:
196 RawResponseReader<Response> reader_{};
197 std::optional<UnaryFinishFuture<Response>> finish_future_{};
198};
199
200} // namespace impl
201
202/// @brief Controls a single request -> response stream RPC
203///
204/// This class is not thread-safe except for `GetContext`.
205///
206/// The RPC is cancelled on destruction unless the stream is closed (`Read` has
207/// returned `false`). In that case the connection is not closed (it will be
208/// reused for new RPCs), and the server receives `RpcInterruptedError`
209/// immediately. gRPC provides no way to early-close a server-streaming RPC
210/// gracefully.
211template <typename Response>
212class [[nodiscard]] InputStream final : public CallAnyBase {
213public:
214 /// @brief Await and read the next incoming message
215 ///
216 /// On end-of-input, `Finish` is called automatically.
217 ///
218 /// @param response where to put response on success
219 /// @returns `true` on success, `false` on end-of-input, task cancellation,
220 // or if the stream is already closed for reads
221 /// @throws ugrpc::client::RpcError on an RPC error
222 [[nodiscard]] bool Read(Response& response);
223
224 /// @cond
225 // For internal use only
226 using RawStream = grpc::ClientAsyncReader<Response>;
227
228 template <typename Stub, typename Request>
229 InputStream(
230 impl::CallParams&& params,
231 impl::PrepareServerStreamingCall<Stub, Request, Response> prepare_async_method,
232 const Request& request
233 );
234 /// @endcond
235
236 InputStream(InputStream&&) noexcept = default;
237 InputStream& operator=(InputStream&&) noexcept = default;
238 ~InputStream() = default;
239
240private:
241 impl::RawReader<Response> stream_;
242};
243
244/// @brief Controls a request stream -> single response RPC
245///
246/// This class is not thread-safe except for `GetContext`.
247///
248/// The RPC is cancelled on destruction unless `Finish` has been called. In that
249/// case the connection is not closed (it will be reused for new RPCs), and the
250/// server receives `RpcInterruptedError` immediately.
251template <typename Request, typename Response>
252class [[nodiscard]] OutputStream final : public CallAnyBase {
253public:
254 /// @brief Write the next outgoing message
255 ///
256 /// `Write` doesn't store any references to `request`, so it can be
257 /// deallocated right after the call.
258 ///
259 /// @param request the next message to write
260 /// @return true if the data is going to the wire; false if the write
261 /// operation failed (including due to task cancellation,
262 // or if the stream is already closed for writes),
263 /// in which case no more writes will be accepted,
264 /// and the error details can be fetched from Finish
265 [[nodiscard]] bool Write(const Request& request);
266
267 /// @brief Write the next outgoing message and check result
268 ///
269 /// `WriteAndCheck` doesn't store any references to `request`, so it can be
270 /// deallocated right after the call.
271 ///
272 /// `WriteAndCheck` verifies result of the write and generates exception
273 /// in case of issues.
274 ///
275 /// @param request the next message to write
276 /// @throws ugrpc::client::RpcError on an RPC error
277 /// @throws ugrpc::client::RpcCancelledError on task cancellation
278 /// @throws ugrpc::client::RpcError if the stream is already closed for writes
279 void WriteAndCheck(const Request& request);
280
281 /// @brief Complete the RPC successfully
282 ///
283 /// Should be called once all the data is written. The server will then
284 /// send a single `Response`.
285 ///
286 /// `Finish` should not be called multiple times.
287 ///
288 /// The connection is not closed, it will be reused for new RPCs.
289 ///
290 /// @returns the single `Response` received after finishing the writes
291 /// @throws ugrpc::client::RpcError on an RPC error
292 /// @throws ugrpc::client::RpcCancelledError on task cancellation
293 Response Finish();
294
295 /// @cond
296 // For internal use only
297 using RawStream = grpc::ClientAsyncWriter<Request>;
298
299 template <typename Stub>
300 OutputStream(
301 impl::CallParams&& params,
302 impl::PrepareClientStreamingCall<Stub, Request, Response> prepare_async_method
303 );
304 /// @endcond
305
306 OutputStream(OutputStream&&) noexcept = default;
307 OutputStream& operator=(OutputStream&&) noexcept = default;
308 ~OutputStream() = default;
309
310private:
311 std::unique_ptr<Response> response_;
312 impl::RawWriter<Request> stream_;
313};
314
315/// @brief Controls a request stream -> response stream RPC
316///
317/// It is safe to call the following methods from different coroutines:
318///
319/// - `GetContext`;
320/// - one of (`Read`, `ReadAsync`);
321/// - one of (`Write`, `WritesDone`).
322///
323/// `WriteAndCheck` is NOT thread-safe.
324///
325/// The RPC is cancelled on destruction unless the stream is closed (`Read` has
326/// returned `false`). In that case the connection is not closed (it will be
327/// reused for new RPCs), and the server receives `RpcInterruptedError`
328/// immediately. gRPC provides no way to early-close a server-streaming RPC
329/// gracefully.
330///
331/// `Read` and `AsyncRead` can throw if error status is received from server.
332/// User MUST NOT call `Read` or `AsyncRead` again after failure of any of these
333/// operations.
334///
335/// `Write` and `WritesDone` methods do not throw, but indicate issues with
336/// the RPC by returning `false`.
337///
338/// `WriteAndCheck` is intended for ping-pong scenarios, when after write
339/// operation the user calls `Read` and vice versa.
340///
341/// If `Write` or `WritesDone` returns negative result, the user MUST NOT call
342/// any of these methods anymore.
343/// Instead the user SHOULD call `Read` method until the end of input. If
344/// `Write` or `WritesDone` finishes with negative result, finally `Read`
345/// will throw an exception.
346/// ## Usage example:
347///
348/// @snippet grpc/tests/stream_test.cpp concurrent bidirectional stream
349///
350template <typename Request, typename Response>
351class [[nodiscard]] BidirectionalStream final : public CallAnyBase {
352public:
353 /// @brief Await and read the next incoming message
354 ///
355 /// On end-of-input, `Finish` is called automatically.
356 ///
357 /// @param response where to put response on success
358 /// @returns `true` on success, `false` on end-of-input, task cancellation,
359 /// or if the stream is already closed for reads
360 /// @throws ugrpc::client::RpcError on an RPC error
361 [[nodiscard]] bool Read(Response& response);
362
363 /// @brief Return future to read next incoming result
364 ///
365 /// @param response where to put response on success
366 /// @return StreamReadFuture future
367 /// @throws ugrpc::client::RpcError on an RPC error
368 /// @throws ugrpc::client::RpcError if the stream is already closed for reads
369 StreamReadFuture<BidirectionalStream> ReadAsync(Response& response);
370
371 /// @brief Write the next outgoing message
372 ///
373 /// RPC will be performed immediately. No references to `request` are
374 /// saved, so it can be deallocated right after the call.
375 ///
376 /// @param request the next message to write
377 /// @return true if the data is going to the wire; false if the write
378 /// operation failed (including due to task cancellation,
379 // or if the stream is already closed for writes),
380 /// in which case no more writes will be accepted,
381 /// but Read may still have some data and status code available
382 [[nodiscard]] bool Write(const Request& request);
383
384 /// @brief Write the next outgoing message and check result
385 ///
386 /// `WriteAndCheck` doesn't store any references to `request`, so it can be
387 /// deallocated right after the call.
388 ///
389 /// `WriteAndCheck` verifies result of the write and generates exception
390 /// in case of issues.
391 ///
392 /// @param request the next message to write
393 /// @throws ugrpc::client::RpcError on an RPC error
394 /// @throws ugrpc::client::RpcCancelledError on task cancellation
395 /// @throws ugrpc::client::RpcError if the stream is already closed for writes
396 void WriteAndCheck(const Request& request);
397
398 /// @brief Announce end-of-output to the server
399 ///
400 /// Should be called to notify the server and receive the final response(s).
401 ///
402 /// @return true if the data is going to the wire; false if the operation
403 /// failed (including if the stream is already closed for writes),
404 /// but Read may still have some data and status code available
405 [[nodiscard]] bool WritesDone();
406
407 /// @cond
408 // For internal use only
409 using RawStream = grpc::ClientAsyncReaderWriter<Request, Response>;
410
411 template <typename Stub>
412 BidirectionalStream(
413 impl::CallParams&& params,
414 impl::PrepareBidiStreamingCall<Stub, Request, Response> prepare_async_method
415 );
416 /// @endcond
417
418 BidirectionalStream(BidirectionalStream&&) noexcept = default;
419 BidirectionalStream& operator=(BidirectionalStream&&) noexcept = default;
420 ~BidirectionalStream() = default;
421
422private:
423 impl::RawReaderWriter<Request, Response> stream_;
424};
425
426template <typename RPC>
427StreamReadFuture<RPC>::StreamReadFuture(
428 impl::CallState& state,
429 typename RPC::RawStream& stream,
430 const google::protobuf::Message* recv_message
431) noexcept
432 : state_(&state), stream_(&stream), recv_message_(recv_message) {}
433
434template <typename RPC>
435StreamReadFuture<RPC>::StreamReadFuture(StreamReadFuture&& other) noexcept
436 // state_ == nullptr signals that *this is empty. Other fields may remain garbage in `other`.
437 : state_{std::exchange(other.state_, nullptr)}, stream_(other.stream_), recv_message_{other.recv_message_} {}
438
439template <typename RPC>
440StreamReadFuture<RPC>& StreamReadFuture<RPC>::operator=(StreamReadFuture<RPC>&& other) noexcept {
441 if (this == &other) return *this;
442 [[maybe_unused]] auto for_destruction = std::move(*this);
443 // state_ == nullptr signals that *this is empty. Other fields may remain garbage in `other`.
444 state_ = std::exchange(other.state_, nullptr);
445 stream_ = other.stream_;
446 recv_message_ = other.recv_message_;
447 return *this;
448}
449
450template <typename RPC>
451StreamReadFuture<RPC>::~StreamReadFuture() {
452 if (state_) {
453 const impl::CallState::AsyncMethodInvocationGuard guard(*state_);
454 const auto wait_status =
455 impl::WaitAndTryCancelIfNeeded(state_->GetAsyncMethodInvocation(), state_->GetContext());
456 if (wait_status != impl::AsyncMethodInvocation::WaitStatus::kOk) {
457 if (wait_status == impl::AsyncMethodInvocation::WaitStatus::kCancelled) {
458 state_->GetStatsScope().OnCancelled();
459 }
460 impl::Finish(*stream_, *state_, /*final_response=*/nullptr, /*throw_on_error=*/false);
461 } else {
462 if (recv_message_) {
463 impl::MiddlewarePipeline::PostRecvMessage(*state_, *recv_message_);
464 }
465 }
466 }
467}
468
469template <typename RPC>
470bool StreamReadFuture<RPC>::Get() {
471 UINVARIANT(state_, "'Get' must be called only once");
472 const impl::CallState::AsyncMethodInvocationGuard guard(*state_);
473 auto* const state = std::exchange(state_, nullptr);
474 const auto result = impl::WaitAndTryCancelIfNeeded(state->GetAsyncMethodInvocation(), state->GetContext());
475 if (result == impl::AsyncMethodInvocation::WaitStatus::kCancelled) {
476 state->GetStatsScope().OnCancelled();
477 state->GetStatsScope().Flush();
478 } else if (result == impl::AsyncMethodInvocation::WaitStatus::kError) {
479 // Finish can only be called once all the data is read, otherwise the
480 // underlying gRPC driver hangs.
481 impl::Finish(*stream_, *state, /*final_response=*/nullptr, /*throw_on_error=*/true);
482 } else {
483 if (recv_message_) {
484 impl::MiddlewarePipeline::PostRecvMessage(*state, *recv_message_);
485 }
486 }
487 return result == impl::AsyncMethodInvocation::WaitStatus::kOk;
488}
489
490template <typename RPC>
491bool StreamReadFuture<RPC>::IsReady() const noexcept {
492 UINVARIANT(state_, "IsReady should be called only before 'Get'");
493 auto& method = state_->GetAsyncMethodInvocation();
494 return method.IsReady();
495}
496
497namespace impl {
498
499template <typename Response>
500template <typename Stub, typename Request>
501UnaryCall<Response>::UnaryCall(
502 CallParams&& params,
503 PrepareUnaryCallProxy<Stub, Request, Response> prepare_unary_call,
504 const Request& request
505)
506 : CallAnyBase(std::move(params), CallKind::kUnaryCall) {
507 MiddlewarePipeline::PreStartCall(GetState());
508 if constexpr (std::is_base_of_v<google::protobuf::Message, Request>) {
509 MiddlewarePipeline::PreSendMessage(GetState(), request);
510 }
511
512 reader_ =
513 prepare_unary_call.PrepareCall(GetState().GetStub(), &GetState().GetContext(), request, &GetState().GetQueue());
514 reader_->StartCall();
515
516 GetState().SetWritesFinished();
517
518 FinishAsync();
519}
520
521template <typename Response>
522void UnaryCall<Response>::FinishAsync() {
523 UASSERT(reader_);
524 auto response = std::make_unique<Response>();
525
526 PrepareFinish(GetState());
527 GetState().EmplaceFinishAsyncMethodInvocation();
528 auto& finish = GetState().GetFinishAsyncMethodInvocation();
529 auto& status = GetState().GetStatus();
530 reader_->Finish(response.get(), &status, finish.GetCompletionTag());
531
532 finish_future_.emplace(GetState(), std::move(response));
533}
534
535template <typename Response>
536UnaryFinishFuture<Response>& UnaryCall<Response>::GetFinishFuture() {
537 UASSERT(finish_future_);
538 return *finish_future_;
539}
540
541template <typename Response>
542const UnaryFinishFuture<Response>& UnaryCall<Response>::GetFinishFuture() const {
543 UASSERT(finish_future_);
544 return *finish_future_;
545}
546
547} // namespace impl
548
549template <typename Response>
550template <typename Stub, typename Request>
551InputStream<Response>::InputStream(
552 impl::CallParams&& params,
553 impl::PrepareServerStreamingCall<Stub, Request, Response> prepare_async_method,
554 const Request& request
555)
556 : CallAnyBase(std::move(params), impl::CallKind::kInputStream) {
557 impl::MiddlewarePipeline::PreStartCall(GetState());
558 impl::MiddlewarePipeline::PreSendMessage(GetState(), request);
559
560 // NOLINTNEXTLINE(cppcoreguidelines-prefer-member-initializer)
561 stream_ = impl::PrepareCall(
562 prepare_async_method, GetState().GetStub(), &GetState().GetContext(), request, &GetState().GetQueue()
563 );
564 impl::StartCall(*stream_, GetState());
565
566 GetState().SetWritesFinished();
567}
568
569template <typename Response>
570bool InputStream<Response>::Read(Response& response) {
571 if (!GetState().IsReadAvailable()) {
572 // If the stream is already finished, we must exit immediately.
573 // If not, even the middlewares may access something that is already dead.
574 return false;
575 }
576
577 if (impl::Read(*stream_, response, GetState())) {
578 impl::MiddlewarePipeline::PostRecvMessage(GetState(), response);
579 return true;
580 } else {
581 // Finish can only be called once all the data is read, otherwise the
582 // underlying gRPC driver hangs.
583 impl::Finish(*stream_, GetState(), /*final_response=*/nullptr, /*throw_on_error=*/true);
584 return false;
585 }
586}
587
588template <typename Request, typename Response>
589template <typename Stub>
590OutputStream<Request, Response>::OutputStream(
591 impl::CallParams&& params,
592 impl::PrepareClientStreamingCall<Stub, Request, Response> prepare_async_method
593)
594 : CallAnyBase(std::move(params), impl::CallKind::kOutputStream), response_(std::make_unique<Response>()) {
595 impl::MiddlewarePipeline::PreStartCall(GetState());
596
597 // 'response_' will be filled upon successful 'Finish' async call
598 // NOLINTNEXTLINE(cppcoreguidelines-prefer-member-initializer)
599 stream_ = impl::PrepareCall(
600 prepare_async_method, GetState().GetStub(), &GetState().GetContext(), response_.get(), &GetState().GetQueue()
601 );
602 impl::StartCall(*stream_, GetState());
603}
604
605template <typename Request, typename Response>
606bool OutputStream<Request, Response>::Write(const Request& request) {
607 if (!GetState().IsWriteAvailable()) {
608 // If the stream is already finished, we must exit immediately.
609 // If not, even the middlewares may access something that is already dead.
610 return false;
611 }
612
613 impl::MiddlewarePipeline::PreSendMessage(GetState(), request);
614
615 // Don't buffer writes, otherwise in an event subscription scenario, events
616 // may never actually be delivered
617 const grpc::WriteOptions write_options{};
618 return impl::Write(*stream_, request, write_options, GetState());
619}
620
621template <typename Request, typename Response>
622void OutputStream<Request, Response>::WriteAndCheck(const Request& request) {
623 if (!GetState().IsWriteAndCheckAvailable()) {
624 // If the stream is already finished, we must exit immediately.
625 // If not, even the middlewares may access something that is already dead.
626 throw RpcError(GetState().GetCallName(), "'WriteAndCheck' called on a finished or closed stream");
627 }
628
629 impl::MiddlewarePipeline::PreSendMessage(GetState(), request);
630
631 // Don't buffer writes, otherwise in an event subscription scenario, events
632 // may never actually be delivered
633 const grpc::WriteOptions write_options{};
634 if (!impl::Write(*stream_, request, write_options, GetState())) {
635 // We don't need final_response here, because the RPC is broken anyway.
636 impl::Finish(*stream_, GetState(), /*final_response=*/nullptr, /*throw_on_error=*/true);
637 }
638}
639
640template <typename Request, typename Response>
641Response OutputStream<Request, Response>::Finish() {
642 // gRPC does not implicitly call `WritesDone` in `Finish`,
643 // contrary to the documentation
644 if (GetState().IsWriteAvailable()) {
645 impl::WritesDone(*stream_, GetState());
646 }
647
648 UASSERT(response_);
649 impl::Finish(*stream_, GetState(), impl::ToBaseMessage(*response_), /*throw_on_error=*/true);
650
651 return std::move(*response_);
652}
653
654template <typename Request, typename Response>
655template <typename Stub>
656BidirectionalStream<Request, Response>::BidirectionalStream(
657 impl::CallParams&& params,
658 impl::PrepareBidiStreamingCall<Stub, Request, Response> prepare_async_method
659)
660 : CallAnyBase(std::move(params), impl::CallKind::kBidirectionalStream) {
661 impl::MiddlewarePipeline::PreStartCall(GetState());
662
663 // NOLINTNEXTLINE(cppcoreguidelines-prefer-member-initializer)
664 stream_ =
665 impl::PrepareCall(prepare_async_method, GetState().GetStub(), &GetState().GetContext(), &GetState().GetQueue());
666 impl::StartCall(*stream_, GetState());
667}
668
669template <typename Request, typename Response>
670StreamReadFuture<BidirectionalStream<Request, Response>> BidirectionalStream<Request, Response>::ReadAsync(
671 Response& response
672) {
673 if (!GetState().IsReadAvailable()) {
674 // If the stream is already finished, we must exit immediately.
675 // If not, even the middlewares may access something that is already dead.
676 throw RpcError(GetState().GetCallName(), "'ReadAsync' called on a finished call");
677 }
678
679 impl::ReadAsync(*stream_, response, GetState());
680 return StreamReadFuture<BidirectionalStream<Request, Response>>{
681 GetState(), *stream_, impl::ToBaseMessage(response)};
682}
683
684template <typename Request, typename Response>
685bool BidirectionalStream<Request, Response>::Read(Response& response) {
686 if (!GetState().IsReadAvailable()) {
687 return false;
688 }
689
690 auto future = ReadAsync(response);
691 return future.Get();
692}
693
694template <typename Request, typename Response>
695bool BidirectionalStream<Request, Response>::Write(const Request& request) {
696 if (!GetState().IsWriteAvailable()) {
697 // If the stream is already finished, we must exit immediately.
698 // If not, even the middlewares may access something that is already dead.
699 return false;
700 }
701
702 impl::MiddlewarePipeline::PreSendMessage(GetState(), request);
703
704 // Don't buffer writes, optimize for ping-pong-style interaction
705 const grpc::WriteOptions write_options{};
706 return impl::Write(*stream_, request, write_options, GetState());
707}
708
709template <typename Request, typename Response>
710void BidirectionalStream<Request, Response>::WriteAndCheck(const Request& request) {
711 if (!GetState().IsWriteAndCheckAvailable()) {
712 // If the stream is already finished, we must exit immediately.
713 // If not, even the middlewares may access something that is already dead.
714 throw RpcError(GetState().GetCallName(), "'WriteAndCheck' called on a finished or closed stream");
715 }
716
717 impl::MiddlewarePipeline::PreSendMessage(GetState(), request);
718
719 // Don't buffer writes, optimize for ping-pong-style interaction
720 const grpc::WriteOptions write_options{};
721 impl::WriteAndCheck(*stream_, request, write_options, GetState());
722}
723
724template <typename Request, typename Response>
725bool BidirectionalStream<Request, Response>::WritesDone() {
726 if (!GetState().IsWriteAvailable()) {
727 // If the stream is already finished, we must exit immediately.
728 // If not, even the middlewares may access something that is already dead.
729 return false;
730 }
731
732 return impl::WritesDone(*stream_, GetState());
733}
734
735} // namespace ugrpc::client
736
737USERVER_NAMESPACE_END