userver: userver/ugrpc/client/rpc.hpp Source File
Loading...
Searching...
No Matches
rpc.hpp
Go to the documentation of this file.
1#pragma once
2
3/// @file userver/ugrpc/client/rpc.hpp
4/// @brief Classes representing an outgoing RPC
5
6#include <memory>
7#include <string_view>
8#include <utility>
9#include <vector>
10
11#include <grpcpp/impl/codegen/proto_utils.h>
12
13#include <userver/dynamic_config/snapshot.hpp>
14#include <userver/engine/deadline.hpp>
15#include <userver/engine/future_status.hpp>
16#include <userver/utils/assert.hpp>
17#include <userver/utils/function_ref.hpp>
18
19#include <userver/ugrpc/client/exceptions.hpp>
20#include <userver/ugrpc/client/impl/async_methods.hpp>
21#include <userver/ugrpc/client/impl/call_params.hpp>
22#include <userver/ugrpc/client/impl/channel_cache.hpp>
23#include <userver/ugrpc/client/middlewares/fwd.hpp>
24#include <userver/ugrpc/impl/deadline_timepoint.hpp>
25#include <userver/ugrpc/impl/internal_tag_fwd.hpp>
26#include <userver/ugrpc/impl/statistics_scope.hpp>
27
28USERVER_NAMESPACE_BEGIN
29
30namespace ugrpc::client {
31
32namespace impl {
33
34struct MiddlewarePipeline {
35 static void PreStartCall(impl::RpcData& data);
36
37 static void PreSendMessage(impl::RpcData& data, const google::protobuf::Message& message);
38 static void PostRecvMessage(impl::RpcData& data, const google::protobuf::Message& message);
39
40 static void PostFinish(impl::RpcData& data, const grpc::Status& status);
41};
42
43} // namespace impl
44
45/// @brief UnaryFuture for waiting a single response RPC
46class [[nodiscard]] UnaryFuture {
47public:
48 /// @cond
49 explicit UnaryFuture(
50 impl::RpcData& data,
51 std::function<void(impl::RpcData& data, const grpc::Status& status)> post_finish
52 ) noexcept;
53 /// @endcond
54
55 UnaryFuture(UnaryFuture&&) noexcept = default;
56 UnaryFuture& operator=(UnaryFuture&&) noexcept;
57 UnaryFuture(const UnaryFuture&) = delete;
58 UnaryFuture& operator=(const UnaryFuture&) = delete;
59
60 ~UnaryFuture() noexcept;
61
62 /// @brief Await response
63 ///
64 /// Upon completion result is available in `response` when initiating the
65 /// asynchronous operation, e.g. FinishAsync.
66 ///
67 /// `Get` should not be called multiple times for the same UnaryFuture.
68 ///
69 /// @throws ugrpc::client::RpcError on an RPC error
70 /// @throws ugrpc::client::RpcCancelledError on task cancellation
71 void Get();
72
73 /// @brief Await response until specified timepoint
74 ///
75 /// Once `kReady` is returned, result is available in `response` when
76 /// initiating the asynchronous operation, e.g. FinishAsync.
77 ///
78 /// In case of 'kReady/kCancelled' answer or exception `Get` should not
79 /// be called anymore.
80 ///
81 /// @throws ugrpc::client::RpcError on an RPC error
82 [[nodiscard]] engine::FutureStatus Get(engine::Deadline deadline);
83
84 /// @brief Checks if the asynchronous call has completed
85 /// Note, that once user gets result, IsReady should not be called
86 /// @return true if result ready
87 [[nodiscard]] bool IsReady() const noexcept;
88
89 /// @cond
90 // For internal use only.
91 engine::impl::ContextAccessor* TryGetContextAccessor() noexcept;
92 /// @endcond
93
94private:
95 impl::FutureImpl impl_;
96 std::function<void(impl::RpcData& data, const grpc::Status& status)> post_finish_;
97};
98
99/// @brief StreamReadFuture for waiting a single read response from stream
100template <typename RPC>
101class [[nodiscard]] StreamReadFuture {
102public:
103 /// @cond
104 explicit StreamReadFuture(
105 impl::RpcData& data,
106 typename RPC::RawStream& stream,
107 std::function<void(impl::RpcData& data)> post_recv_message,
108 std::function<void(impl::RpcData& data, const grpc::Status& status)> post_finish
109 ) noexcept;
110 /// @endcond
111
112 StreamReadFuture(StreamReadFuture&& other) noexcept = default;
113 StreamReadFuture& operator=(StreamReadFuture&& other) noexcept;
114
115 ~StreamReadFuture() noexcept;
116
117 /// @brief Await response
118 ///
119 /// Upon completion the result is available in `response` that was
120 /// specified when initiating the asynchronous read
121 ///
122 /// `Get` should not be called multiple times for the same StreamReadFuture.
123 ///
124 /// @throws ugrpc::client::RpcError on an RPC error
125 /// @throws ugrpc::client::RpcCancelledError on task cancellation
126 bool Get();
127
128 /// @brief Checks if the asynchronous call has completed
129 /// Note, that once user gets result, IsReady should not be called
130 /// @return true if result ready
131 [[nodiscard]] bool IsReady() const noexcept;
132
133private:
134 impl::FutureImpl impl_;
135 typename RPC::RawStream* stream_;
136 std::function<void(impl::RpcData& data)> post_recv_message_;
137 std::function<void(impl::RpcData& data, const grpc::Status& status)> post_finish_;
138};
139
140/// @brief Base class for any RPC
142protected:
143 /// @cond
144 CallAnyBase(impl::CallParams&& params, CallKind call_kind)
145 : data_(std::make_unique<impl::RpcData>(std::move(params), call_kind)) {}
146 /// @endcond
147
148public:
149 /// @returns the `ClientContext` used for this RPC
150 grpc::ClientContext& GetContext();
151
152 /// @returns client name
154
155 /// @returns RPC name
157
158 /// @returns RPC span
159 tracing::Span& GetSpan();
160
161protected:
162 impl::RpcData& GetData();
163
164private:
165 std::unique_ptr<impl::RpcData> data_;
166};
167
168/// @brief Controls a single request -> single response RPC
169///
170/// This class is not thread-safe except for `GetContext`.
171///
172/// The RPC is cancelled on destruction unless `Finish` or `FinishAsync`. In
173/// that case the connection is not closed (it will be reused for new RPCs), and
174/// the server receives `RpcInterruptedError` immediately.
175template <typename Response>
176class [[nodiscard]] UnaryCall final : public CallAnyBase {
177public:
178 using ResponseType = Response;
179
180 /// @brief Await and read the response
181 ///
182 /// `Finish` should not be called multiple times for the same RPC.
183 ///
184 /// The connection is not closed, it will be reused for new RPCs.
185 ///
186 /// @returns the response on success
187 /// @throws ugrpc::client::RpcError on an RPC error
188 /// @throws ugrpc::client::RpcCancelledError on task cancellation
189 Response Finish();
190
191 /// @brief Asynchronously finish the call
192 ///
193 /// `FinishAsync` should not be called multiple times for the same RPC.
194 ///
195 /// `Finish` and `FinishAsync` should not be called together for the same RPC.
196 ///
197 /// @returns the future for the single response
198 UnaryFuture FinishAsync(Response& response);
199
200 /// @cond
201 // For internal use only
202 template <typename PrepareFunc, typename Request>
203 UnaryCall(impl::CallParams&& params, PrepareFunc prepare_func, const Request& req);
204 /// @endcond
205
206 UnaryCall(UnaryCall&&) noexcept = default;
207 UnaryCall& operator=(UnaryCall&&) noexcept = default;
208 ~UnaryCall() = default;
209
210private:
211 impl::RawResponseReader<Response> reader_;
212};
213
214/// @brief Controls a single request -> response stream RPC
215///
216/// This class is not thread-safe except for `GetContext`.
217///
218/// The RPC is cancelled on destruction unless the stream is closed (`Read` has
219/// returned `false`). In that case the connection is not closed (it will be
220/// reused for new RPCs), and the server receives `RpcInterruptedError`
221/// immediately. gRPC provides no way to early-close a server-streaming RPC
222/// gracefully.
223///
224/// If any method throws, further methods must not be called on the same stream,
225/// except for `GetContext`.
226template <typename Response>
227class [[nodiscard]] InputStream final : public CallAnyBase {
228public:
229 /// @brief Await and read the next incoming message
230 ///
231 /// On end-of-input, `Finish` is called automatically.
232 ///
233 /// @param response where to put response on success
234 /// @returns `true` on success, `false` on end-of-input or task cancellation
235 /// @throws ugrpc::client::RpcError on an RPC error
236 [[nodiscard]] bool Read(Response& response);
237
238 /// @cond
239 // For internal use only
240 using RawStream = grpc::ClientAsyncReader<Response>;
241
242 template <typename PrepareFunc, typename Request>
243 InputStream(impl::CallParams&& params, PrepareFunc prepare_func, const Request& req);
244 /// @endcond
245
246 InputStream(InputStream&&) noexcept = default;
247 InputStream& operator=(InputStream&&) noexcept = default;
248 ~InputStream() = default;
249
250private:
251 impl::RawReader<Response> stream_;
252};
253
254/// @brief Controls a request stream -> single response RPC
255///
256/// This class is not thread-safe except for `GetContext`.
257///
258/// The RPC is cancelled on destruction unless `Finish` has been called. In that
259/// case the connection is not closed (it will be reused for new RPCs), and the
260/// server receives `RpcInterruptedError` immediately.
261///
262/// If any method throws, further methods must not be called on the same stream,
263/// except for `GetContext`.
264template <typename Request, typename Response>
265class [[nodiscard]] OutputStream final : public CallAnyBase {
266public:
267 /// @brief Write the next outgoing message
268 ///
269 /// `Write` doesn't store any references to `request`, so it can be
270 /// deallocated right after the call.
271 ///
272 /// @param request the next message to write
273 /// @return true if the data is going to the wire; false if the write
274 /// operation failed (including due to task cancellation),
275 /// in which case no more writes will be accepted,
276 /// and the error details can be fetched from Finish
277 [[nodiscard]] bool Write(const Request& request);
278
279 /// @brief Write the next outgoing message and check result
280 ///
281 /// `WriteAndCheck` doesn't store any references to `request`, so it can be
282 /// deallocated right after the call.
283 ///
284 /// `WriteAndCheck` verifies result of the write and generates exception
285 /// in case of issues.
286 ///
287 /// @param request the next message to write
288 /// @throws ugrpc::client::RpcError on an RPC error
289 /// @throws ugrpc::client::RpcCancelledError on task cancellation
290 void WriteAndCheck(const Request& request);
291
292 /// @brief Complete the RPC successfully
293 ///
294 /// Should be called once all the data is written. The server will then
295 /// send a single `Response`.
296 ///
297 /// `Finish` should not be called multiple times.
298 ///
299 /// The connection is not closed, it will be reused for new RPCs.
300 ///
301 /// @returns the single `Response` received after finishing the writes
302 /// @throws ugrpc::client::RpcError on an RPC error
303 /// @throws ugrpc::client::RpcCancelledError on task cancellation
304 Response Finish();
305
306 /// @cond
307 // For internal use only
308 using RawStream = grpc::ClientAsyncWriter<Request>;
309
310 template <typename PrepareFunc>
311 OutputStream(impl::CallParams&& params, PrepareFunc prepare_func);
312 /// @endcond
313
314 OutputStream(OutputStream&&) noexcept = default;
315 OutputStream& operator=(OutputStream&&) noexcept = default;
316 ~OutputStream() = default;
317
318private:
319 std::unique_ptr<Response> final_response_;
320 impl::RawWriter<Request> stream_;
321};
322
323/// @brief Controls a request stream -> response stream RPC
324///
325/// It is safe to call the following methods from different coroutines:
326///
327/// - `GetContext`;
328/// - one of (`Read`, `ReadAsync`);
329/// - one of (`Write`, `WritesDone`).
330///
331/// `WriteAndCheck` is NOT thread-safe.
332///
333/// The RPC is cancelled on destruction unless the stream is closed (`Read` has
334/// returned `false`). In that case the connection is not closed (it will be
335/// reused for new RPCs), and the server receives `RpcInterruptedError`
336/// immediately. gRPC provides no way to early-close a server-streaming RPC
337/// gracefully.
338///
339/// `Read` and `AsyncRead` can throw if error status is received from server.
340/// User MUST NOT call `Read` or `AsyncRead` again after failure of any of these
341/// operations.
342///
343/// `Write` and `WritesDone` methods do not throw, but indicate issues with
344/// the RPC by returning `false`.
345///
346/// `WriteAndCheck` is intended for ping-pong scenarios, when after write
347/// operation the user calls `Read` and vice versa.
348///
349/// If `Write` or `WritesDone` returns negative result, the user MUST NOT call
350/// any of these methods anymore.
351/// Instead the user SHOULD call `Read` method until the end of input. If
352/// `Write` or `WritesDone` finishes with negative result, finally `Read`
353/// will throw an exception.
354/// ## Usage example:
355///
356/// @snippet grpc/tests/stream_test.cpp concurrent bidirectional stream
357///
358template <typename Request, typename Response>
359class [[nodiscard]] BidirectionalStream final : public CallAnyBase {
360public:
361 /// @brief Await and read the next incoming message
362 ///
363 /// On end-of-input, `Finish` is called automatically.
364 ///
365 /// @param response where to put response on success
366 /// @returns `true` on success, `false` on end-of-input or task cancellation
367 /// @throws ugrpc::client::RpcError on an RPC error
368 [[nodiscard]] bool Read(Response& response);
369
370 /// @brief Return future to read next incoming result
371 ///
372 /// @param response where to put response on success
373 /// @return StreamReadFuture future
374 /// @throws ugrpc::client::RpcError on an RPC error
375 StreamReadFuture<BidirectionalStream> ReadAsync(Response& response) noexcept;
376
377 /// @brief Write the next outgoing message
378 ///
379 /// RPC will be performed immediately. No references to `request` are
380 /// saved, so it can be deallocated right after the call.
381 ///
382 /// @param request the next message to write
383 /// @return true if the data is going to the wire; false if the write
384 /// operation failed (including due to task cancellation),
385 /// in which case no more writes will be accepted,
386 /// but Read may still have some data and status code available
387 [[nodiscard]] bool Write(const Request& request);
388
389 /// @brief Write the next outgoing message and check result
390 ///
391 /// `WriteAndCheck` doesn't store any references to `request`, so it can be
392 /// deallocated right after the call.
393 ///
394 /// `WriteAndCheck` verifies result of the write and generates exception
395 /// in case of issues.
396 ///
397 /// @param request the next message to write
398 /// @throws ugrpc::client::RpcError on an RPC error
399 /// @throws ugrpc::client::RpcCancelledError on task cancellation
400 void WriteAndCheck(const Request& request);
401
402 /// @brief Announce end-of-output to the server
403 ///
404 /// Should be called to notify the server and receive the final response(s).
405 ///
406 /// @return true if the data is going to the wire; false if the operation
407 /// failed, but Read may still have some data and status code
408 /// available
409 [[nodiscard]] bool WritesDone();
410
411 /// @cond
412 // For internal use only
413 using RawStream = grpc::ClientAsyncReaderWriter<Request, Response>;
414
415 template <typename PrepareFunc>
416 BidirectionalStream(impl::CallParams&& params, PrepareFunc prepare_func);
417 /// @endcond
418
419 BidirectionalStream(BidirectionalStream&&) noexcept = default;
420 BidirectionalStream& operator=(BidirectionalStream&&) noexcept = default;
421 ~BidirectionalStream() = default;
422
423private:
424 impl::RawReaderWriter<Request, Response> stream_;
425};
426
427// ========================== Implementation follows ==========================
428
429template <typename RPC>
430StreamReadFuture<RPC>::StreamReadFuture(
431 impl::RpcData& data,
432 typename RPC::RawStream& stream,
433 std::function<void(impl::RpcData& data)> post_recv_message,
434 std::function<void(impl::RpcData& data, const grpc::Status& status)> post_finish
435) noexcept
436 : impl_(data),
437 stream_(&stream),
438 post_recv_message_(std::move(post_recv_message)),
439 post_finish_(std::move(post_finish)) {}
440
441template <typename RPC>
442StreamReadFuture<RPC>::~StreamReadFuture() noexcept {
443 if (auto* const data = impl_.GetData()) {
444 impl::RpcData::AsyncMethodInvocationGuard guard(*data);
445 const auto wait_status = impl::Wait(data->GetAsyncMethodInvocation(), data->GetContext());
446 if (wait_status != impl::AsyncMethodInvocation::WaitStatus::kOk) {
447 if (wait_status == impl::AsyncMethodInvocation::WaitStatus::kCancelled) {
448 data->GetStatsScope().OnCancelled();
449 }
450 impl::Finish(*stream_, *data, post_finish_, false);
451 } else {
452 post_recv_message_(*data);
453 }
454 }
455}
456
457template <typename RPC>
458StreamReadFuture<RPC>& StreamReadFuture<RPC>::operator=(StreamReadFuture<RPC>&& other) noexcept {
459 if (this == &other) return *this;
460 [[maybe_unused]] auto for_destruction = std::move(*this);
461 impl_ = std::move(other.impl_);
462 stream_ = other.stream_;
463 post_recv_message_ = std::move(other.post_recv_message_);
464 post_finish_ = std::move(other.post_finish_);
465 return *this;
466}
467
468template <typename RPC>
469bool StreamReadFuture<RPC>::Get() {
470 auto* const data = impl_.GetData();
471 UINVARIANT(data, "'Get' must be called only once");
472 impl::RpcData::AsyncMethodInvocationGuard guard(*data);
473 impl_.ClearData();
474 const auto result = impl::Wait(data->GetAsyncMethodInvocation(), data->GetContext());
475 if (result == impl::AsyncMethodInvocation::WaitStatus::kCancelled) {
476 data->GetStatsScope().OnCancelled();
477 data->GetStatsScope().Flush();
478 } else if (result == impl::AsyncMethodInvocation::WaitStatus::kError) {
479 // Finish can only be called once all the data is read, otherwise the
480 // underlying gRPC driver hangs.
481 impl::Finish(*stream_, *data, post_finish_, true);
482 } else {
483 post_recv_message_(*data);
484 }
485 return result == impl::AsyncMethodInvocation::WaitStatus::kOk;
486}
487
488template <typename RPC>
489bool StreamReadFuture<RPC>::IsReady() const noexcept {
490 auto* const data = impl_.GetData();
491 UINVARIANT(data, "IsReady should be called only before 'Get'");
492 auto& method = data->GetAsyncMethodInvocation();
493 return method.IsReady();
494}
495
496template <typename Response>
497template <typename PrepareFunc, typename Request>
498UnaryCall<Response>::UnaryCall(impl::CallParams&& params, PrepareFunc prepare_func, const Request& req)
499 : CallAnyBase(std::move(params), CallKind::kUnaryCall) {
500 impl::MiddlewarePipeline::PreStartCall(GetData());
501 if constexpr (std::is_base_of_v<google::protobuf::Message, Request>) {
502 impl::MiddlewarePipeline::PreSendMessage(GetData(), req);
503 }
504
505 reader_ = prepare_func(&GetData().GetContext(), req, &GetData().GetQueue());
506 reader_->StartCall();
507
508 GetData().SetWritesFinished();
509}
510
511template <typename Response>
512Response UnaryCall<Response>::Finish() {
513 Response response;
514 UnaryFuture future = FinishAsync(response);
515 future.Get();
516 return response;
517}
518
519template <typename Response>
520UnaryFuture UnaryCall<Response>::FinishAsync(Response& response) {
521 UASSERT(reader_);
522 PrepareFinish(GetData());
523 GetData().EmplaceFinishAsyncMethodInvocation();
524 auto& finish = GetData().GetFinishAsyncMethodInvocation();
525 auto& status = GetData().GetStatus();
526 reader_->Finish(&response, &status, finish.GetTag());
527 auto post_finish = [&response](impl::RpcData& data, const grpc::Status& status) {
528 if constexpr (std::is_base_of_v<google::protobuf::Message, Response>) {
529 impl::MiddlewarePipeline::PostRecvMessage(data, response);
530 } else {
531 (void)response; // unused by now
532 }
533 impl::MiddlewarePipeline::PostFinish(data, status);
534 };
535 return UnaryFuture{GetData(), post_finish};
536}
537
538template <typename Response>
539template <typename PrepareFunc, typename Request>
540InputStream<Response>::InputStream(impl::CallParams&& params, PrepareFunc prepare_func, const Request& req)
541 : CallAnyBase(std::move(params), CallKind::kInputStream) {
542 impl::MiddlewarePipeline::PreStartCall(GetData());
543 impl::MiddlewarePipeline::PreSendMessage(GetData(), req);
544
545 // NOLINTNEXTLINE(cppcoreguidelines-prefer-member-initializer)
546 stream_ = prepare_func(&GetData().GetContext(), req, &GetData().GetQueue());
547 impl::StartCall(*stream_, GetData());
548
549 GetData().SetWritesFinished();
550}
551
552template <typename Response>
553bool InputStream<Response>::Read(Response& response) {
554 if (impl::Read(*stream_, response, GetData())) {
555 impl::MiddlewarePipeline::PostRecvMessage(GetData(), response);
556 return true;
557 } else {
558 // Finish can only be called once all the data is read, otherwise the
559 // underlying gRPC driver hangs.
560 auto post_finish = [](impl::RpcData& data, const grpc::Status& status) {
561 impl::MiddlewarePipeline::PostFinish(data, status);
562 };
563 impl::Finish(*stream_, GetData(), post_finish, true);
564 return false;
565 }
566}
567
568template <typename Request, typename Response>
569template <typename PrepareFunc>
570OutputStream<Request, Response>::OutputStream(impl::CallParams&& params, PrepareFunc prepare_func)
571 : CallAnyBase(std::move(params), CallKind::kOutputStream), final_response_(std::make_unique<Response>()) {
572 impl::MiddlewarePipeline::PreStartCall(GetData());
573
574 // 'final_response_' will be filled upon successful 'Finish' async call
575 // NOLINTNEXTLINE(cppcoreguidelines-prefer-member-initializer)
576 stream_ = prepare_func(&GetData().GetContext(), final_response_.get(), &GetData().GetQueue());
577 impl::StartCall(*stream_, GetData());
578}
579
580template <typename Request, typename Response>
581bool OutputStream<Request, Response>::Write(const Request& request) {
582 impl::MiddlewarePipeline::PreSendMessage(GetData(), request);
583
584 // Don't buffer writes, otherwise in an event subscription scenario, events
585 // may never actually be delivered
586 grpc::WriteOptions write_options{};
587 return impl::Write(*stream_, request, write_options, GetData());
588}
589
590template <typename Request, typename Response>
591void OutputStream<Request, Response>::WriteAndCheck(const Request& request) {
592 impl::MiddlewarePipeline::PreSendMessage(GetData(), request);
593
594 // Don't buffer writes, otherwise in an event subscription scenario, events
595 // may never actually be delivered
596 grpc::WriteOptions write_options{};
597 if (!impl::Write(*stream_, request, write_options, GetData())) {
598 auto post_finish = [](impl::RpcData& data, const grpc::Status& status) {
599 impl::MiddlewarePipeline::PostFinish(data, status);
600 };
601 impl::Finish(*stream_, GetData(), post_finish, true);
602 }
603}
604
605template <typename Request, typename Response>
606Response OutputStream<Request, Response>::Finish() {
607 // gRPC does not implicitly call `WritesDone` in `Finish`,
608 // contrary to the documentation
609 if (!GetData().AreWritesFinished()) {
610 impl::WritesDone(*stream_, GetData());
611 }
612
613 auto post_finish = [](impl::RpcData& data, const grpc::Status& status) {
614 impl::MiddlewarePipeline::PostFinish(data, status);
615 };
616 impl::Finish(*stream_, GetData(), post_finish, true);
617
618 return std::move(*final_response_);
619}
620
621template <typename Request, typename Response>
622template <typename PrepareFunc>
623BidirectionalStream<Request, Response>::BidirectionalStream(impl::CallParams&& params, PrepareFunc prepare_func)
624 : CallAnyBase(std::move(params), CallKind::kBidirectionalStream) {
625 impl::MiddlewarePipeline::PreStartCall(GetData());
626
627 // NOLINTNEXTLINE(cppcoreguidelines-prefer-member-initializer)
628 stream_ = prepare_func(&GetData().GetContext(), &GetData().GetQueue());
629 impl::StartCall(*stream_, GetData());
630}
631
632template <typename Request, typename Response>
633StreamReadFuture<BidirectionalStream<Request, Response>> BidirectionalStream<Request, Response>::ReadAsync(
634 Response& response
635) noexcept {
636 impl::ReadAsync(*stream_, response, GetData());
637 auto post_recv_message = [&response](impl::RpcData& data) {
638 impl::MiddlewarePipeline::PostRecvMessage(data, response);
639 };
640 auto post_finish = [](impl::RpcData& data, const grpc::Status& status) {
641 impl::MiddlewarePipeline::PostFinish(data, status);
642 };
643 return StreamReadFuture<BidirectionalStream<Request, Response>>{
644 GetData(), *stream_, post_recv_message, post_finish};
645}
646
647template <typename Request, typename Response>
648bool BidirectionalStream<Request, Response>::Read(Response& response) {
649 auto future = ReadAsync(response);
650 return future.Get();
651}
652
653template <typename Request, typename Response>
654bool BidirectionalStream<Request, Response>::Write(const Request& request) {
655 impl::MiddlewarePipeline::PreSendMessage(GetData(), request);
656
657 // Don't buffer writes, optimize for ping-pong-style interaction
658 grpc::WriteOptions write_options{};
659 return impl::Write(*stream_, request, write_options, GetData());
660}
661
662template <typename Request, typename Response>
663void BidirectionalStream<Request, Response>::WriteAndCheck(const Request& request) {
664 impl::MiddlewarePipeline::PreSendMessage(GetData(), request);
665
666 // Don't buffer writes, optimize for ping-pong-style interaction
667 grpc::WriteOptions write_options{};
668 impl::WriteAndCheck(*stream_, request, write_options, GetData());
669}
670
671template <typename Request, typename Response>
672bool BidirectionalStream<Request, Response>::WritesDone() {
673 return impl::WritesDone(*stream_, GetData());
674}
675
676} // namespace ugrpc::client
677
678USERVER_NAMESPACE_END