userver: userver/ugrpc/client/rpc.hpp Source File
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages Concepts
rpc.hpp
Go to the documentation of this file.
1#pragma once
2
3/// @file userver/ugrpc/client/rpc.hpp
4/// @brief Classes representing an outgoing RPC
5
6#include <exception>
7#include <memory>
8#include <string_view>
9#include <utility>
10
11#include <grpcpp/impl/codegen/proto_utils.h>
12
13#include <userver/engine/deadline.hpp>
14#include <userver/engine/future_status.hpp>
15#include <userver/utils/assert.hpp>
16
17#include <userver/ugrpc/client/call.hpp>
18#include <userver/ugrpc/client/impl/async_methods.hpp>
19#include <userver/ugrpc/client/impl/call_state.hpp>
20#include <userver/ugrpc/client/impl/middleware_pipeline.hpp>
21#include <userver/ugrpc/client/impl/prepare_call.hpp>
22#include <userver/ugrpc/client/middlewares/fwd.hpp>
23#include <userver/ugrpc/time_utils.hpp>
24
25USERVER_NAMESPACE_BEGIN
26
27namespace ugrpc::client {
28
29namespace impl {
30
31// Contains the implementation of UnaryFinishFuture that is not dependent on template parameters.
32class UnaryFinishFutureImpl {
33public:
34 UnaryFinishFutureImpl(CallState& state, const google::protobuf::Message* response) noexcept;
35
36 UnaryFinishFutureImpl(UnaryFinishFutureImpl&&) noexcept;
37 UnaryFinishFutureImpl& operator=(UnaryFinishFutureImpl&&) noexcept;
38 UnaryFinishFutureImpl(const UnaryFinishFutureImpl&) = delete;
39 UnaryFinishFutureImpl& operator=(const UnaryFinishFutureImpl&) = delete;
40 ~UnaryFinishFutureImpl();
41
42 [[nodiscard]] bool IsReady() const noexcept;
43
44 [[nodiscard]] engine::FutureStatus WaitUntil(engine::Deadline deadline) const noexcept;
45
46 void Get();
47
48 engine::impl::ContextAccessor* TryGetContextAccessor() noexcept;
49
50private:
51 CallState* state_{};
52 const google::protobuf::Message* response_;
53 mutable std::exception_ptr exception_;
54};
55
56/// @brief UnaryFuture for awaiting a single response RPC
57template <typename Response>
58class [[nodiscard]] UnaryFinishFuture {
59public:
60 /// @cond
61 UnaryFinishFuture(impl::CallState& state, std::unique_ptr<Response>&& response) noexcept
62 : response_(std::move(response)), impl_(state, impl::ToBaseMessage(*response_)) {}
63 /// @endcond
64
65 UnaryFinishFuture(UnaryFinishFuture&&) noexcept = default;
66 UnaryFinishFuture& operator=(UnaryFinishFuture&&) noexcept = default;
67 UnaryFinishFuture(const UnaryFinishFuture&) = delete;
68 UnaryFinishFuture& operator=(const UnaryFinishFuture&) = delete;
69
70 /// @brief Checks if the asynchronous call has completed
71 /// Note, that once user gets result, IsReady should not be called
72 /// @return true if result ready
73 [[nodiscard]] bool IsReady() const noexcept { return impl_.IsReady(); }
74
75 /// @brief Await response until the deadline is reached or until the task is cancelled.
76 ///
77 /// Upon completion result is available in `response` when initiating the
78 /// asynchronous operation, e.g. FinishAsync.
79 [[nodiscard]] engine::FutureStatus WaitUntil(engine::Deadline deadline) const noexcept {
80 return impl_.WaitUntil(deadline);
81 }
82
83 /// @brief Await response
84 ///
85 /// Upon completion result is available in `response` when initiating the
86 /// asynchronous operation, e.g. FinishAsync.
87 ///
88 /// Invalidates the future: no further methods can be called on it.
89 ///
90 /// @throws ugrpc::client::RpcError on an RPC error
91 /// @throws ugrpc::client::RpcCancelledError on task cancellation
92 Response Get() {
93 UASSERT(response_);
94 impl_.Get();
95 return std::move(*response_);
96 }
97
98 /// @cond
99 // For internal use only.
100 engine::impl::ContextAccessor* TryGetContextAccessor() noexcept { return impl_.TryGetContextAccessor(); }
101 /// @endcond
102
103private:
104 std::unique_ptr<Response> response_;
105 impl::UnaryFinishFutureImpl impl_;
106};
107
108} // namespace impl
109
110/// @brief StreamReadFuture for waiting a single read response from stream
111template <typename RPC>
112class [[nodiscard]] StreamReadFuture {
113public:
114 /// @cond
115 StreamReadFuture(
116 impl::CallState& state,
117 typename RPC::RawStream& stream,
118 const google::protobuf::Message* recv_message
119 ) noexcept;
120 /// @endcond
121
122 StreamReadFuture(StreamReadFuture&& other) noexcept;
123 StreamReadFuture& operator=(StreamReadFuture&& other) noexcept;
124 StreamReadFuture(const StreamReadFuture&) = delete;
125 StreamReadFuture& operator=(const StreamReadFuture&) = delete;
126
127 ~StreamReadFuture();
128
129 /// @brief Await response
130 ///
131 /// Upon completion the result is available in `response` that was
132 /// specified when initiating the asynchronous read
133 ///
134 /// `Get` should not be called multiple times for the same StreamReadFuture.
135 ///
136 /// @throws ugrpc::client::RpcError on an RPC error
137 /// @throws ugrpc::client::RpcCancelledError on task cancellation
138 bool Get();
139
140 /// @brief Checks if the asynchronous call has completed
141 /// Note, that once user gets result, IsReady should not be called
142 /// @return true if result ready
143 [[nodiscard]] bool IsReady() const noexcept;
144
145private:
146 impl::CallState* state_{};
147 typename RPC::RawStream* stream_{};
148 const google::protobuf::Message* recv_message_;
149};
150
151namespace impl {
152
153/// @brief Controls a single request -> single response RPC
154///
155/// This class is not thread-safe, it cannot be used from multiple tasks at the same time.
156///
157/// The RPC is cancelled on destruction unless the RPC is already finished. In
158/// that case the connection is not closed (it will be reused for new RPCs), and
159/// the server receives `RpcInterruptedError` immediately.
160template <typename Response>
161class [[nodiscard]] UnaryCall final : public CallAnyBase {
162 // Implementation note. For consistency with other RPC objects, UnaryCall should have been exposed to the user
163 // directly. However, in 90-99% use cases it's more intuitive to treat the RPC as a future (see ResponseFuture).
164 // If we decide to expose more controls, like lazy Finish or ReadInitialMetadata, then they should be added
165 // to UnaryCall, and UnaryCall should be exposed via ResponseFuture::GetCall.
166
167public:
168 /// @brief Asynchronously finish the call
169 ///
170 /// `FinishAsync` should not be called multiple times for the same RPC.
171 ///
172 /// Creates the future inside this `UnaryCall`. It can be retrieved using @ref GetFinishFuture.
173 void FinishAsync();
174
175 /// @brief Returns the future created earlier using @ref FinishAsync.
176 UnaryFinishFuture<Response>& GetFinishFuture();
177
178 /// @overload
179 const UnaryFinishFuture<Response>& GetFinishFuture() const;
180
181 /// @cond
182 // For internal use only
183 template <typename PrepareAsyncMethod, typename Request, typename... PrepareExtra>
184 UnaryCall(
185 impl::CallParams&& params,
186 PrepareAsyncMethod prepare_async_method,
187 const Request& request,
188 PrepareExtra&&... extra
189 );
190 /// @endcond
191
192 UnaryCall(UnaryCall&&) noexcept = default;
193 UnaryCall& operator=(UnaryCall&&) noexcept = default;
194 ~UnaryCall() = default;
195
196private:
197 impl::RawResponseReader<Response> reader_{};
198 std::optional<UnaryFinishFuture<Response>> finish_future_{};
199};
200
201} // namespace impl
202
203/// @brief Controls a single request -> response stream RPC
204///
205/// This class is not thread-safe except for `GetContext`.
206///
207/// The RPC is cancelled on destruction unless the stream is closed (`Read` has
208/// returned `false`). In that case the connection is not closed (it will be
209/// reused for new RPCs), and the server receives `RpcInterruptedError`
210/// immediately. gRPC provides no way to early-close a server-streaming RPC
211/// gracefully.
212template <typename Response>
213class [[nodiscard]] InputStream final : public CallAnyBase {
214public:
215 /// @brief Await and read the next incoming message
216 ///
217 /// On end-of-input, `Finish` is called automatically.
218 ///
219 /// @param response where to put response on success
220 /// @returns `true` on success, `false` on end-of-input, task cancellation,
221 // or if the stream is already closed for reads
222 /// @throws ugrpc::client::RpcError on an RPC error
223 [[nodiscard]] bool Read(Response& response);
224
225 /// @cond
226 // For internal use only
227 using RawStream = grpc::ClientAsyncReader<Response>;
228
229 template <typename PrepareAsyncMethod, typename Request>
230 InputStream(impl::CallParams&& params, PrepareAsyncMethod prepare_async_method, const Request& request);
231 /// @endcond
232
233 InputStream(InputStream&&) noexcept = default;
234 InputStream& operator=(InputStream&&) noexcept = default;
235 ~InputStream() = default;
236
237private:
238 impl::RawReader<Response> stream_;
239};
240
241/// @brief Controls a request stream -> single response RPC
242///
243/// This class is not thread-safe except for `GetContext`.
244///
245/// The RPC is cancelled on destruction unless `Finish` has been called. In that
246/// case the connection is not closed (it will be reused for new RPCs), and the
247/// server receives `RpcInterruptedError` immediately.
248template <typename Request, typename Response>
249class [[nodiscard]] OutputStream final : public CallAnyBase {
250public:
251 /// @brief Write the next outgoing message
252 ///
253 /// `Write` doesn't store any references to `request`, so it can be
254 /// deallocated right after the call.
255 ///
256 /// @param request the next message to write
257 /// @return true if the data is going to the wire; false if the write
258 /// operation failed (including due to task cancellation,
259 // or if the stream is already closed for writes),
260 /// in which case no more writes will be accepted,
261 /// and the error details can be fetched from Finish
262 [[nodiscard]] bool Write(const Request& request);
263
264 /// @brief Write the next outgoing message and check result
265 ///
266 /// `WriteAndCheck` doesn't store any references to `request`, so it can be
267 /// deallocated right after the call.
268 ///
269 /// `WriteAndCheck` verifies result of the write and generates exception
270 /// in case of issues.
271 ///
272 /// @param request the next message to write
273 /// @throws ugrpc::client::RpcError on an RPC error
274 /// @throws ugrpc::client::RpcCancelledError on task cancellation
275 /// @throws ugrpc::client::RpcError if the stream is already closed for writes
276 void WriteAndCheck(const Request& request);
277
278 /// @brief Complete the RPC successfully
279 ///
280 /// Should be called once all the data is written. The server will then
281 /// send a single `Response`.
282 ///
283 /// `Finish` should not be called multiple times.
284 ///
285 /// The connection is not closed, it will be reused for new RPCs.
286 ///
287 /// @returns the single `Response` received after finishing the writes
288 /// @throws ugrpc::client::RpcError on an RPC error
289 /// @throws ugrpc::client::RpcCancelledError on task cancellation
290 Response Finish();
291
292 /// @cond
293 // For internal use only
294 using RawStream = grpc::ClientAsyncWriter<Request>;
295
296 template <typename PrepareAsyncMethod>
297 OutputStream(impl::CallParams&& params, PrepareAsyncMethod prepare_async_method);
298 /// @endcond
299
300 OutputStream(OutputStream&&) noexcept = default;
301 OutputStream& operator=(OutputStream&&) noexcept = default;
302 ~OutputStream() = default;
303
304private:
305 std::unique_ptr<Response> response_;
306 impl::RawWriter<Request> stream_;
307};
308
309/// @brief Controls a request stream -> response stream RPC
310///
311/// It is safe to call the following methods from different coroutines:
312///
313/// - `GetContext`;
314/// - one of (`Read`, `ReadAsync`);
315/// - one of (`Write`, `WritesDone`).
316///
317/// `WriteAndCheck` is NOT thread-safe.
318///
319/// The RPC is cancelled on destruction unless the stream is closed (`Read` has
320/// returned `false`). In that case the connection is not closed (it will be
321/// reused for new RPCs), and the server receives `RpcInterruptedError`
322/// immediately. gRPC provides no way to early-close a server-streaming RPC
323/// gracefully.
324///
325/// `Read` and `AsyncRead` can throw if error status is received from server.
326/// User MUST NOT call `Read` or `AsyncRead` again after failure of any of these
327/// operations.
328///
329/// `Write` and `WritesDone` methods do not throw, but indicate issues with
330/// the RPC by returning `false`.
331///
332/// `WriteAndCheck` is intended for ping-pong scenarios, when after write
333/// operation the user calls `Read` and vice versa.
334///
335/// If `Write` or `WritesDone` returns negative result, the user MUST NOT call
336/// any of these methods anymore.
337/// Instead the user SHOULD call `Read` method until the end of input. If
338/// `Write` or `WritesDone` finishes with negative result, finally `Read`
339/// will throw an exception.
340/// ## Usage example:
341///
342/// @snippet grpc/tests/stream_test.cpp concurrent bidirectional stream
343///
344template <typename Request, typename Response>
345class [[nodiscard]] BidirectionalStream final : public CallAnyBase {
346public:
347 /// @brief Await and read the next incoming message
348 ///
349 /// On end-of-input, `Finish` is called automatically.
350 ///
351 /// @param response where to put response on success
352 /// @returns `true` on success, `false` on end-of-input, task cancellation,
353 /// or if the stream is already closed for reads
354 /// @throws ugrpc::client::RpcError on an RPC error
355 [[nodiscard]] bool Read(Response& response);
356
357 /// @brief Return future to read next incoming result
358 ///
359 /// @param response where to put response on success
360 /// @return StreamReadFuture future
361 /// @throws ugrpc::client::RpcError on an RPC error
362 /// @throws ugrpc::client::RpcError if the stream is already closed for reads
363 StreamReadFuture<BidirectionalStream> ReadAsync(Response& response);
364
365 /// @brief Write the next outgoing message
366 ///
367 /// RPC will be performed immediately. No references to `request` are
368 /// saved, so it can be deallocated right after the call.
369 ///
370 /// @param request the next message to write
371 /// @return true if the data is going to the wire; false if the write
372 /// operation failed (including due to task cancellation,
373 // or if the stream is already closed for writes),
374 /// in which case no more writes will be accepted,
375 /// but Read may still have some data and status code available
376 [[nodiscard]] bool Write(const Request& request);
377
378 /// @brief Write the next outgoing message and check result
379 ///
380 /// `WriteAndCheck` doesn't store any references to `request`, so it can be
381 /// deallocated right after the call.
382 ///
383 /// `WriteAndCheck` verifies result of the write and generates exception
384 /// in case of issues.
385 ///
386 /// @param request the next message to write
387 /// @throws ugrpc::client::RpcError on an RPC error
388 /// @throws ugrpc::client::RpcCancelledError on task cancellation
389 /// @throws ugrpc::client::RpcError if the stream is already closed for writes
390 void WriteAndCheck(const Request& request);
391
392 /// @brief Announce end-of-output to the server
393 ///
394 /// Should be called to notify the server and receive the final response(s).
395 ///
396 /// @return true if the data is going to the wire; false if the operation
397 /// failed (including if the stream is already closed for writes),
398 /// but Read may still have some data and status code available
399 [[nodiscard]] bool WritesDone();
400
401 /// @cond
402 // For internal use only
403 using RawStream = grpc::ClientAsyncReaderWriter<Request, Response>;
404
405 template <typename PrepareAsyncMethod>
406 BidirectionalStream(impl::CallParams&& params, PrepareAsyncMethod prepare_async_method);
407 /// @endcond
408
409 BidirectionalStream(BidirectionalStream&&) noexcept = default;
410 BidirectionalStream& operator=(BidirectionalStream&&) noexcept = default;
411 ~BidirectionalStream() = default;
412
413private:
414 impl::RawReaderWriter<Request, Response> stream_;
415};
416
417template <typename RPC>
418StreamReadFuture<RPC>::StreamReadFuture(
419 impl::CallState& state,
420 typename RPC::RawStream& stream,
421 const google::protobuf::Message* recv_message
422) noexcept
423 : state_(&state), stream_(&stream), recv_message_(recv_message) {}
424
425template <typename RPC>
426StreamReadFuture<RPC>::StreamReadFuture(StreamReadFuture&& other) noexcept
427 // state_ == nullptr signals that *this is empty. Other fields may remain garbage in `other`.
428 : state_{std::exchange(other.state_, nullptr)}, stream_(other.stream_), recv_message_{other.recv_message_} {}
429
430template <typename RPC>
431StreamReadFuture<RPC>& StreamReadFuture<RPC>::operator=(StreamReadFuture<RPC>&& other) noexcept {
432 if (this == &other) return *this;
433 [[maybe_unused]] auto for_destruction = std::move(*this);
434 // state_ == nullptr signals that *this is empty. Other fields may remain garbage in `other`.
435 state_ = std::exchange(other.state_, nullptr);
436 stream_ = other.stream_;
437 recv_message_ = other.recv_message_;
438 return *this;
439}
440
441template <typename RPC>
442StreamReadFuture<RPC>::~StreamReadFuture() {
443 if (state_) {
444 const impl::CallState::AsyncMethodInvocationGuard guard(*state_);
445 const auto wait_status =
446 impl::WaitAndTryCancelIfNeeded(state_->GetAsyncMethodInvocation(), state_->GetContext());
447 if (wait_status != impl::AsyncMethodInvocation::WaitStatus::kOk) {
448 if (wait_status == impl::AsyncMethodInvocation::WaitStatus::kCancelled) {
449 state_->GetStatsScope().OnCancelled();
450 }
451 impl::Finish(*stream_, *state_, /*final_response=*/nullptr, /*throw_on_error=*/false);
452 } else {
453 if (recv_message_) {
454 impl::MiddlewarePipeline::PostRecvMessage(*state_, *recv_message_);
455 }
456 }
457 }
458}
459
460template <typename RPC>
461bool StreamReadFuture<RPC>::Get() {
462 UINVARIANT(state_, "'Get' must be called only once");
463 const impl::CallState::AsyncMethodInvocationGuard guard(*state_);
464 auto* const state = std::exchange(state_, nullptr);
465 const auto result = impl::WaitAndTryCancelIfNeeded(state->GetAsyncMethodInvocation(), state->GetContext());
466 if (result == impl::AsyncMethodInvocation::WaitStatus::kCancelled) {
467 state->GetStatsScope().OnCancelled();
468 state->GetStatsScope().Flush();
469 } else if (result == impl::AsyncMethodInvocation::WaitStatus::kError) {
470 // Finish can only be called once all the data is read, otherwise the
471 // underlying gRPC driver hangs.
472 impl::Finish(*stream_, *state, /*final_response=*/nullptr, /*throw_on_error=*/true);
473 } else {
474 if (recv_message_) {
475 impl::MiddlewarePipeline::PostRecvMessage(*state, *recv_message_);
476 }
477 }
478 return result == impl::AsyncMethodInvocation::WaitStatus::kOk;
479}
480
481template <typename RPC>
482bool StreamReadFuture<RPC>::IsReady() const noexcept {
483 UINVARIANT(state_, "IsReady should be called only before 'Get'");
484 auto& method = state_->GetAsyncMethodInvocation();
485 return method.IsReady();
486}
487
488namespace impl {
489
490template <typename Response>
491template <typename PrepareAsyncMethod, typename Request, typename... PrepareExtra>
492UnaryCall<Response>::UnaryCall(
493 impl::CallParams&& params,
494 PrepareAsyncMethod prepare_async_method,
495 const Request& request,
496 PrepareExtra&&... extra
497)
498 : CallAnyBase(std::move(params), impl::CallKind::kUnaryCall) {
499 impl::MiddlewarePipeline::PreStartCall(GetState());
500 if constexpr (std::is_base_of_v<google::protobuf::Message, Request>) {
501 impl::MiddlewarePipeline::PreSendMessage(GetState(), request);
502 }
503
504 reader_ = impl::PrepareCall(
505 GetState().GetStub(),
506 prepare_async_method,
507 &GetState().GetContext(),
508 request,
509 &GetState().GetQueue(),
510 std::forward<PrepareExtra>(extra)...
511 );
512 reader_->StartCall();
513
514 GetState().SetWritesFinished();
515
516 FinishAsync();
517}
518
519template <typename Response>
520void UnaryCall<Response>::FinishAsync() {
521 UASSERT(reader_);
522 auto response = std::make_unique<Response>();
523
524 PrepareFinish(GetState());
525 GetState().EmplaceFinishAsyncMethodInvocation();
526 auto& finish = GetState().GetFinishAsyncMethodInvocation();
527 auto& status = GetState().GetStatus();
528 reader_->Finish(response.get(), &status, finish.GetCompletionTag());
529
530 finish_future_.emplace(GetState(), std::move(response));
531}
532
533template <typename Response>
534UnaryFinishFuture<Response>& UnaryCall<Response>::GetFinishFuture() {
535 UASSERT(finish_future_);
536 return *finish_future_;
537}
538
539template <typename Response>
540const UnaryFinishFuture<Response>& UnaryCall<Response>::GetFinishFuture() const {
541 UASSERT(finish_future_);
542 return *finish_future_;
543}
544
545} // namespace impl
546
547template <typename Response>
548template <typename PrepareAsyncMethod, typename Request>
549InputStream<Response>::InputStream(
550 impl::CallParams&& params,
551 PrepareAsyncMethod prepare_async_method,
552 const Request& request
553)
554 : CallAnyBase(std::move(params), impl::CallKind::kInputStream) {
555 impl::MiddlewarePipeline::PreStartCall(GetState());
556 impl::MiddlewarePipeline::PreSendMessage(GetState(), request);
557
558 // NOLINTNEXTLINE(cppcoreguidelines-prefer-member-initializer)
559 stream_ = impl::PrepareCall(
560 GetState().GetStub(), prepare_async_method, &GetState().GetContext(), request, &GetState().GetQueue()
561 );
562 impl::StartCall(*stream_, GetState());
563
564 GetState().SetWritesFinished();
565}
566
567template <typename Response>
568bool InputStream<Response>::Read(Response& response) {
569 if (!GetState().IsReadAvailable()) {
570 // If the stream is already finished, we must exit immediately.
571 // If not, even the middlewares may access something that is already dead.
572 return false;
573 }
574
575 if (impl::Read(*stream_, response, GetState())) {
576 impl::MiddlewarePipeline::PostRecvMessage(GetState(), response);
577 return true;
578 } else {
579 // Finish can only be called once all the data is read, otherwise the
580 // underlying gRPC driver hangs.
581 impl::Finish(*stream_, GetState(), /*final_response=*/nullptr, /*throw_on_error=*/true);
582 return false;
583 }
584}
585
586template <typename Request, typename Response>
587template <typename PrepareAsyncMethod>
588OutputStream<Request, Response>::OutputStream(impl::CallParams&& params, PrepareAsyncMethod prepare_async_method)
589 : CallAnyBase(std::move(params), impl::CallKind::kOutputStream), response_(std::make_unique<Response>()) {
590 impl::MiddlewarePipeline::PreStartCall(GetState());
591
592 // 'response_' will be filled upon successful 'Finish' async call
593 // NOLINTNEXTLINE(cppcoreguidelines-prefer-member-initializer)
594 stream_ = impl::PrepareCall(
595 GetState().GetStub(), prepare_async_method, &GetState().GetContext(), response_.get(), &GetState().GetQueue()
596 );
597 impl::StartCall(*stream_, GetState());
598}
599
600template <typename Request, typename Response>
601bool OutputStream<Request, Response>::Write(const Request& request) {
602 if (!GetState().IsWriteAvailable()) {
603 // If the stream is already finished, we must exit immediately.
604 // If not, even the middlewares may access something that is already dead.
605 return false;
606 }
607
608 impl::MiddlewarePipeline::PreSendMessage(GetState(), request);
609
610 // Don't buffer writes, otherwise in an event subscription scenario, events
611 // may never actually be delivered
612 const grpc::WriteOptions write_options{};
613 return impl::Write(*stream_, request, write_options, GetState());
614}
615
616template <typename Request, typename Response>
617void OutputStream<Request, Response>::WriteAndCheck(const Request& request) {
618 if (!GetState().IsWriteAndCheckAvailable()) {
619 // If the stream is already finished, we must exit immediately.
620 // If not, even the middlewares may access something that is already dead.
621 throw RpcError(GetState().GetCallName(), "'WriteAndCheck' called on a finished or closed stream");
622 }
623
624 impl::MiddlewarePipeline::PreSendMessage(GetState(), request);
625
626 // Don't buffer writes, otherwise in an event subscription scenario, events
627 // may never actually be delivered
628 const grpc::WriteOptions write_options{};
629 if (!impl::Write(*stream_, request, write_options, GetState())) {
630 // We don't need final_response here, because the RPC is broken anyway.
631 impl::Finish(*stream_, GetState(), /*final_response=*/nullptr, /*throw_on_error=*/true);
632 }
633}
634
635template <typename Request, typename Response>
636Response OutputStream<Request, Response>::Finish() {
637 // gRPC does not implicitly call `WritesDone` in `Finish`,
638 // contrary to the documentation
639 if (GetState().IsWriteAvailable()) {
640 impl::WritesDone(*stream_, GetState());
641 }
642
643 UASSERT(response_);
644 impl::Finish(*stream_, GetState(), impl::ToBaseMessage(*response_), /*throw_on_error=*/true);
645
646 return std::move(*response_);
647}
648
649template <typename Request, typename Response>
650template <typename PrepareAsyncMethod>
651BidirectionalStream<Request, Response>::BidirectionalStream(
652 impl::CallParams&& params,
653 PrepareAsyncMethod prepare_async_method
654)
655 : CallAnyBase(std::move(params), impl::CallKind::kBidirectionalStream) {
656 impl::MiddlewarePipeline::PreStartCall(GetState());
657
658 // NOLINTNEXTLINE(cppcoreguidelines-prefer-member-initializer)
659 stream_ =
660 impl::PrepareCall(GetState().GetStub(), prepare_async_method, &GetState().GetContext(), &GetState().GetQueue());
661 impl::StartCall(*stream_, GetState());
662}
663
664template <typename Request, typename Response>
665StreamReadFuture<BidirectionalStream<Request, Response>> BidirectionalStream<Request, Response>::ReadAsync(
666 Response& response
667) {
668 if (!GetState().IsReadAvailable()) {
669 // If the stream is already finished, we must exit immediately.
670 // If not, even the middlewares may access something that is already dead.
671 throw RpcError(GetState().GetCallName(), "'ReadAsync' called on a finished call");
672 }
673
674 impl::ReadAsync(*stream_, response, GetState());
675 return StreamReadFuture<BidirectionalStream<Request, Response>>{
676 GetState(), *stream_, impl::ToBaseMessage(response)};
677}
678
679template <typename Request, typename Response>
680bool BidirectionalStream<Request, Response>::Read(Response& response) {
681 if (!GetState().IsReadAvailable()) {
682 return false;
683 }
684
685 auto future = ReadAsync(response);
686 return future.Get();
687}
688
689template <typename Request, typename Response>
690bool BidirectionalStream<Request, Response>::Write(const Request& request) {
691 if (!GetState().IsWriteAvailable()) {
692 // If the stream is already finished, we must exit immediately.
693 // If not, even the middlewares may access something that is already dead.
694 return false;
695 }
696
697 impl::MiddlewarePipeline::PreSendMessage(GetState(), request);
698
699 // Don't buffer writes, optimize for ping-pong-style interaction
700 const grpc::WriteOptions write_options{};
701 return impl::Write(*stream_, request, write_options, GetState());
702}
703
704template <typename Request, typename Response>
705void BidirectionalStream<Request, Response>::WriteAndCheck(const Request& request) {
706 if (!GetState().IsWriteAndCheckAvailable()) {
707 // If the stream is already finished, we must exit immediately.
708 // If not, even the middlewares may access something that is already dead.
709 throw RpcError(GetState().GetCallName(), "'WriteAndCheck' called on a finished or closed stream");
710 }
711
712 impl::MiddlewarePipeline::PreSendMessage(GetState(), request);
713
714 // Don't buffer writes, optimize for ping-pong-style interaction
715 const grpc::WriteOptions write_options{};
716 impl::WriteAndCheck(*stream_, request, write_options, GetState());
717}
718
719template <typename Request, typename Response>
720bool BidirectionalStream<Request, Response>::WritesDone() {
721 if (!GetState().IsWriteAvailable()) {
722 // If the stream is already finished, we must exit immediately.
723 // If not, even the middlewares may access something that is already dead.
724 return false;
725 }
726
727 return impl::WritesDone(*stream_, GetState());
728}
729
730} // namespace ugrpc::client
731
732USERVER_NAMESPACE_END