userver: userver/ugrpc/client/rpc.hpp Source File
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages Concepts
rpc.hpp
Go to the documentation of this file.
1#pragma once
2
3/// @file userver/ugrpc/client/rpc.hpp
4/// @brief Classes representing an outgoing RPC
5
6#include <exception>
7#include <functional>
8#include <memory>
9#include <string_view>
10#include <utility>
11
12#include <grpcpp/impl/codegen/proto_utils.h>
13
14#include <userver/engine/deadline.hpp>
15#include <userver/engine/future_status.hpp>
16#include <userver/utils/assert.hpp>
17
18#include <userver/ugrpc/client/call.hpp>
19#include <userver/ugrpc/client/impl/async_methods.hpp>
20#include <userver/ugrpc/client/middlewares/fwd.hpp>
21#include <userver/ugrpc/deadline_timepoint.hpp>
22
23USERVER_NAMESPACE_BEGIN
24
25namespace ugrpc::client {
26
27namespace impl {
28
29struct MiddlewarePipeline {
30 static void PreStartCall(impl::RpcData& data);
31
32 static void PreSendMessage(impl::RpcData& data, const google::protobuf::Message& message);
33 static void PostRecvMessage(impl::RpcData& data, const google::protobuf::Message& message);
34
35 static void PostFinish(impl::RpcData& data, const grpc::Status& status);
36};
37
38/// @brief UnaryFuture for waiting a single response RPC
39class [[nodiscard]] UnaryFuture {
40public:
41 /// @cond
42 UnaryFuture(
43 impl::RpcData& data,
44 std::function<void(impl::RpcData& data, const grpc::Status& status)> post_finish
45 ) noexcept;
46 /// @endcond
47
48 UnaryFuture(UnaryFuture&&) noexcept;
49 UnaryFuture& operator=(UnaryFuture&&) noexcept;
50 UnaryFuture(const UnaryFuture&) = delete;
51 UnaryFuture& operator=(const UnaryFuture&) = delete;
52
53 ~UnaryFuture();
54
55 /// @brief Checks if the asynchronous call has completed
56 /// Note, that once user gets result, IsReady should not be called
57 /// @return true if result ready
58 [[nodiscard]] bool IsReady() const noexcept;
59
60 /// @brief Await response until the deadline is reached or until the task is cancelled.
61 ///
62 /// Upon completion result is available in `response` when initiating the
63 /// asynchronous operation, e.g. FinishAsync.
64 [[nodiscard]] engine::FutureStatus WaitUntil(engine::Deadline deadline) const noexcept;
65
66 /// @brief Await response
67 ///
68 /// Upon completion result is available in `response` when initiating the
69 /// asynchronous operation, e.g. FinishAsync.
70 ///
71 /// `Get` should not be called multiple times for the same UnaryFuture.
72 ///
73 /// @throws ugrpc::client::RpcError on an RPC error
74 /// @throws ugrpc::client::RpcCancelledError on task cancellation
75 void Get();
76
77 /// @cond
78 // For internal use only.
79 engine::impl::ContextAccessor* TryGetContextAccessor() noexcept;
80 /// @endcond
81
82private:
83 impl::RpcData* data_{};
84 std::function<void(impl::RpcData& data, const grpc::Status& status)> post_finish_;
85 mutable std::exception_ptr exception_;
86};
87
88} // namespace impl
89
90/// @brief StreamReadFuture for waiting a single read response from stream
91template <typename RPC>
92class [[nodiscard]] StreamReadFuture {
93public:
94 /// @cond
95 explicit StreamReadFuture(
96 impl::RpcData& data,
97 typename RPC::RawStream& stream,
98 std::function<void(impl::RpcData& data)> post_recv_message,
99 std::function<void(impl::RpcData& data, const grpc::Status& status)> post_finish
100 ) noexcept;
101 /// @endcond
102
103 StreamReadFuture(StreamReadFuture&& other) noexcept;
104 StreamReadFuture& operator=(StreamReadFuture&& other) noexcept;
105 StreamReadFuture(const StreamReadFuture&) = delete;
106 StreamReadFuture& operator=(const StreamReadFuture&) = delete;
107
108 ~StreamReadFuture();
109
110 /// @brief Await response
111 ///
112 /// Upon completion the result is available in `response` that was
113 /// specified when initiating the asynchronous read
114 ///
115 /// `Get` should not be called multiple times for the same StreamReadFuture.
116 ///
117 /// @throws ugrpc::client::RpcError on an RPC error
118 /// @throws ugrpc::client::RpcCancelledError on task cancellation
119 bool Get();
120
121 /// @brief Checks if the asynchronous call has completed
122 /// Note, that once user gets result, IsReady should not be called
123 /// @return true if result ready
124 [[nodiscard]] bool IsReady() const noexcept;
125
126private:
127 impl::RpcData* data_{};
128 typename RPC::RawStream* stream_{};
129 std::function<void(impl::RpcData& data)> post_recv_message_;
130 std::function<void(impl::RpcData& data, const grpc::Status& status)> post_finish_;
131};
132
133namespace impl {
134
135/// @brief Controls a single request -> single response RPC
136///
137/// This class is not thread-safe except for `GetContext`.
138///
139/// The RPC is cancelled on destruction unless `Finish` or `FinishAsync`. In
140/// that case the connection is not closed (it will be reused for new RPCs), and
141/// the server receives `RpcInterruptedError` immediately.
142template <typename Response>
143class [[nodiscard]] UnaryCall final : public CallAnyBase {
144public:
145 using ResponseType = Response;
146
147 /// @brief Await and read the response
148 ///
149 /// `Finish` should not be called multiple times for the same RPC.
150 ///
151 /// The connection is not closed, it will be reused for new RPCs.
152 ///
153 /// @returns the response on success
154 /// @throws ugrpc::client::RpcError on an RPC error
155 /// @throws ugrpc::client::RpcCancelledError on task cancellation
156 Response Finish();
157
158 /// @brief Asynchronously finish the call
159 ///
160 /// `FinishAsync` should not be called multiple times for the same RPC.
161 ///
162 /// `Finish` and `FinishAsync` should not be called together for the same RPC.
163 ///
164 /// @returns the future for the single response
165 UnaryFuture FinishAsync(Response& response);
166
167 /// @cond
168 // For internal use only
169 template <typename PrepareAsyncCall, typename Request>
170 UnaryCall(impl::CallParams&& params, PrepareAsyncCall prepare_async_call, const Request& req);
171 /// @endcond
172
173 UnaryCall(UnaryCall&&) noexcept = default;
174 UnaryCall& operator=(UnaryCall&&) noexcept = default;
175 ~UnaryCall() = default;
176
177private:
178 impl::RawResponseReader<Response> reader_;
179};
180
181} // namespace impl
182
183/// @brief Controls a single request -> response stream RPC
184///
185/// This class is not thread-safe except for `GetContext`.
186///
187/// The RPC is cancelled on destruction unless the stream is closed (`Read` has
188/// returned `false`). In that case the connection is not closed (it will be
189/// reused for new RPCs), and the server receives `RpcInterruptedError`
190/// immediately. gRPC provides no way to early-close a server-streaming RPC
191/// gracefully.
192///
193/// If any method throws, further methods must not be called on the same stream,
194/// except for `GetContext`.
195template <typename Response>
196class [[nodiscard]] InputStream final : public CallAnyBase {
197public:
198 /// @brief Await and read the next incoming message
199 ///
200 /// On end-of-input, `Finish` is called automatically.
201 ///
202 /// @param response where to put response on success
203 /// @returns `true` on success, `false` on end-of-input or task cancellation
204 /// @throws ugrpc::client::RpcError on an RPC error
205 [[nodiscard]] bool Read(Response& response);
206
207 /// @cond
208 // For internal use only
209 using RawStream = grpc::ClientAsyncReader<Response>;
210
211 template <typename PrepareAsyncCall, typename Request>
212 InputStream(impl::CallParams&& params, PrepareAsyncCall prepare_async_call, const Request& req);
213 /// @endcond
214
215 InputStream(InputStream&&) noexcept = default;
216 InputStream& operator=(InputStream&&) noexcept = default;
217 ~InputStream() = default;
218
219private:
220 impl::RawReader<Response> stream_;
221};
222
223/// @brief Controls a request stream -> single response RPC
224///
225/// This class is not thread-safe except for `GetContext`.
226///
227/// The RPC is cancelled on destruction unless `Finish` has been called. In that
228/// case the connection is not closed (it will be reused for new RPCs), and the
229/// server receives `RpcInterruptedError` immediately.
230///
231/// If any method throws, further methods must not be called on the same stream,
232/// except for `GetContext`.
233template <typename Request, typename Response>
234class [[nodiscard]] OutputStream final : public CallAnyBase {
235public:
236 /// @brief Write the next outgoing message
237 ///
238 /// `Write` doesn't store any references to `request`, so it can be
239 /// deallocated right after the call.
240 ///
241 /// @param request the next message to write
242 /// @return true if the data is going to the wire; false if the write
243 /// operation failed (including due to task cancellation),
244 /// in which case no more writes will be accepted,
245 /// and the error details can be fetched from Finish
246 [[nodiscard]] bool Write(const Request& request);
247
248 /// @brief Write the next outgoing message and check result
249 ///
250 /// `WriteAndCheck` doesn't store any references to `request`, so it can be
251 /// deallocated right after the call.
252 ///
253 /// `WriteAndCheck` verifies result of the write and generates exception
254 /// in case of issues.
255 ///
256 /// @param request the next message to write
257 /// @throws ugrpc::client::RpcError on an RPC error
258 /// @throws ugrpc::client::RpcCancelledError on task cancellation
259 void WriteAndCheck(const Request& request);
260
261 /// @brief Complete the RPC successfully
262 ///
263 /// Should be called once all the data is written. The server will then
264 /// send a single `Response`.
265 ///
266 /// `Finish` should not be called multiple times.
267 ///
268 /// The connection is not closed, it will be reused for new RPCs.
269 ///
270 /// @returns the single `Response` received after finishing the writes
271 /// @throws ugrpc::client::RpcError on an RPC error
272 /// @throws ugrpc::client::RpcCancelledError on task cancellation
273 Response Finish();
274
275 /// @cond
276 // For internal use only
277 using RawStream = grpc::ClientAsyncWriter<Request>;
278
279 template <typename PrepareAsyncCall>
280 OutputStream(impl::CallParams&& params, PrepareAsyncCall prepare_async_call);
281 /// @endcond
282
283 OutputStream(OutputStream&&) noexcept = default;
284 OutputStream& operator=(OutputStream&&) noexcept = default;
285 ~OutputStream() = default;
286
287private:
288 std::unique_ptr<Response> final_response_;
289 impl::RawWriter<Request> stream_;
290};
291
292/// @brief Controls a request stream -> response stream RPC
293///
294/// It is safe to call the following methods from different coroutines:
295///
296/// - `GetContext`;
297/// - one of (`Read`, `ReadAsync`);
298/// - one of (`Write`, `WritesDone`).
299///
300/// `WriteAndCheck` is NOT thread-safe.
301///
302/// The RPC is cancelled on destruction unless the stream is closed (`Read` has
303/// returned `false`). In that case the connection is not closed (it will be
304/// reused for new RPCs), and the server receives `RpcInterruptedError`
305/// immediately. gRPC provides no way to early-close a server-streaming RPC
306/// gracefully.
307///
308/// `Read` and `AsyncRead` can throw if error status is received from server.
309/// User MUST NOT call `Read` or `AsyncRead` again after failure of any of these
310/// operations.
311///
312/// `Write` and `WritesDone` methods do not throw, but indicate issues with
313/// the RPC by returning `false`.
314///
315/// `WriteAndCheck` is intended for ping-pong scenarios, when after write
316/// operation the user calls `Read` and vice versa.
317///
318/// If `Write` or `WritesDone` returns negative result, the user MUST NOT call
319/// any of these methods anymore.
320/// Instead the user SHOULD call `Read` method until the end of input. If
321/// `Write` or `WritesDone` finishes with negative result, finally `Read`
322/// will throw an exception.
323/// ## Usage example:
324///
325/// @snippet grpc/tests/stream_test.cpp concurrent bidirectional stream
326///
327template <typename Request, typename Response>
328class [[nodiscard]] BidirectionalStream final : public CallAnyBase {
329public:
330 /// @brief Await and read the next incoming message
331 ///
332 /// On end-of-input, `Finish` is called automatically.
333 ///
334 /// @param response where to put response on success
335 /// @returns `true` on success, `false` on end-of-input or task cancellation
336 /// @throws ugrpc::client::RpcError on an RPC error
337 [[nodiscard]] bool Read(Response& response);
338
339 /// @brief Return future to read next incoming result
340 ///
341 /// @param response where to put response on success
342 /// @return StreamReadFuture future
343 /// @throws ugrpc::client::RpcError on an RPC error
344 StreamReadFuture<BidirectionalStream> ReadAsync(Response& response) noexcept;
345
346 /// @brief Write the next outgoing message
347 ///
348 /// RPC will be performed immediately. No references to `request` are
349 /// saved, so it can be deallocated right after the call.
350 ///
351 /// @param request the next message to write
352 /// @return true if the data is going to the wire; false if the write
353 /// operation failed (including due to task cancellation),
354 /// in which case no more writes will be accepted,
355 /// but Read may still have some data and status code available
356 [[nodiscard]] bool Write(const Request& request);
357
358 /// @brief Write the next outgoing message and check result
359 ///
360 /// `WriteAndCheck` doesn't store any references to `request`, so it can be
361 /// deallocated right after the call.
362 ///
363 /// `WriteAndCheck` verifies result of the write and generates exception
364 /// in case of issues.
365 ///
366 /// @param request the next message to write
367 /// @throws ugrpc::client::RpcError on an RPC error
368 /// @throws ugrpc::client::RpcCancelledError on task cancellation
369 void WriteAndCheck(const Request& request);
370
371 /// @brief Announce end-of-output to the server
372 ///
373 /// Should be called to notify the server and receive the final response(s).
374 ///
375 /// @return true if the data is going to the wire; false if the operation
376 /// failed, but Read may still have some data and status code
377 /// available
378 [[nodiscard]] bool WritesDone();
379
380 /// @cond
381 // For internal use only
382 using RawStream = grpc::ClientAsyncReaderWriter<Request, Response>;
383
384 template <typename PrepareAsyncCall>
385 BidirectionalStream(impl::CallParams&& params, PrepareAsyncCall prepare_async_call);
386 /// @endcond
387
388 BidirectionalStream(BidirectionalStream&&) noexcept = default;
389 BidirectionalStream& operator=(BidirectionalStream&&) noexcept = default;
390 ~BidirectionalStream() = default;
391
392private:
393 impl::RawReaderWriter<Request, Response> stream_;
394};
395
396template <typename RPC>
397StreamReadFuture<RPC>::StreamReadFuture(
398 impl::RpcData& data,
399 typename RPC::RawStream& stream,
400 std::function<void(impl::RpcData& data)> post_recv_message,
401 std::function<void(impl::RpcData& data, const grpc::Status& status)> post_finish
402) noexcept
403 : data_(&data),
404 stream_(&stream),
405 post_recv_message_(std::move(post_recv_message)),
406 post_finish_(std::move(post_finish)) {}
407
408template <typename RPC>
409StreamReadFuture<RPC>::StreamReadFuture(StreamReadFuture&& other) noexcept
410 : data_{std::exchange(other.data_, nullptr)},
411 stream_{other.stream_},
412 post_recv_message_{std::move(other.post_recv_message_)},
413 post_finish_{std::move(other.post_finish_)} {}
414
415template <typename RPC>
416StreamReadFuture<RPC>& StreamReadFuture<RPC>::operator=(StreamReadFuture<RPC>&& other) noexcept {
417 if (this == &other) return *this;
418 [[maybe_unused]] auto for_destruction = std::move(*this);
419 data_ = std::exchange(other.data_, nullptr);
420 stream_ = other.stream_;
421 post_recv_message_ = std::move(other.post_recv_message_);
422 post_finish_ = std::move(other.post_finish_);
423 return *this;
424}
425
426template <typename RPC>
427StreamReadFuture<RPC>::~StreamReadFuture() {
428 if (data_) {
429 impl::RpcData::AsyncMethodInvocationGuard guard(*data_);
430 const auto wait_status = impl::Wait(data_->GetAsyncMethodInvocation(), data_->GetContext());
431 if (wait_status != impl::AsyncMethodInvocation::WaitStatus::kOk) {
432 if (wait_status == impl::AsyncMethodInvocation::WaitStatus::kCancelled) {
433 data_->GetStatsScope().OnCancelled();
434 }
435 impl::Finish(*stream_, *data_, post_finish_, false);
436 } else {
437 post_recv_message_(*data_);
438 }
439 }
440}
441
442template <typename RPC>
443bool StreamReadFuture<RPC>::Get() {
444 UINVARIANT(data_, "'Get' must be called only once");
445 impl::RpcData::AsyncMethodInvocationGuard guard(*data_);
446 auto* const data = std::exchange(data_, nullptr);
447 const auto result = impl::Wait(data->GetAsyncMethodInvocation(), data->GetContext());
448 if (result == impl::AsyncMethodInvocation::WaitStatus::kCancelled) {
449 data->GetStatsScope().OnCancelled();
450 data->GetStatsScope().Flush();
451 } else if (result == impl::AsyncMethodInvocation::WaitStatus::kError) {
452 // Finish can only be called once all the data is read, otherwise the
453 // underlying gRPC driver hangs.
454 impl::Finish(*stream_, *data, post_finish_, true);
455 } else {
456 post_recv_message_(*data);
457 }
458 return result == impl::AsyncMethodInvocation::WaitStatus::kOk;
459}
460
461template <typename RPC>
462bool StreamReadFuture<RPC>::IsReady() const noexcept {
463 UINVARIANT(data_, "IsReady should be called only before 'Get'");
464 auto& method = data_->GetAsyncMethodInvocation();
465 return method.IsReady();
466}
467
468namespace impl {
469
470template <typename Response>
471template <typename PrepareAsyncCall, typename Request>
472UnaryCall<Response>::UnaryCall(impl::CallParams&& params, PrepareAsyncCall prepare_async_call, const Request& req)
473 : CallAnyBase(std::move(params), impl::CallKind::kUnaryCall) {
474 impl::MiddlewarePipeline::PreStartCall(GetData());
475 if constexpr (std::is_base_of_v<google::protobuf::Message, Request>) {
476 impl::MiddlewarePipeline::PreSendMessage(GetData(), req);
477 }
478 reader_ = prepare_async_call(GetData().GetStub(), &GetData().GetContext(), req, &GetData().GetQueue());
479 reader_->StartCall();
480
481 GetData().SetWritesFinished();
482}
483
484template <typename Response>
485Response UnaryCall<Response>::Finish() {
486 Response response;
487 UnaryFuture future = FinishAsync(response);
488 future.Get();
489 return response;
490}
491
492template <typename Response>
493UnaryFuture UnaryCall<Response>::FinishAsync(Response& response) {
494 UASSERT(reader_);
495 PrepareFinish(GetData());
496 GetData().EmplaceFinishAsyncMethodInvocation();
497 auto& finish = GetData().GetFinishAsyncMethodInvocation();
498 auto& status = GetData().GetStatus();
499 reader_->Finish(&response, &status, finish.GetTag());
500 auto post_finish = [&response](impl::RpcData& data, const grpc::Status& status) {
501 if (status.ok()) { // response is not filled on bad status
502 if constexpr (std::is_base_of_v<google::protobuf::Message, Response>) {
503 impl::MiddlewarePipeline::PostRecvMessage(data, response);
504 } else {
505 (void)response; // unused by now
506 }
507 }
508 impl::MiddlewarePipeline::PostFinish(data, status);
509 };
510 return UnaryFuture{GetData(), post_finish};
511}
512
513} // namespace impl
514
515template <typename Response>
516template <typename PrepareAsyncCall, typename Request>
517InputStream<Response>::InputStream(impl::CallParams&& params, PrepareAsyncCall prepare_async_call, const Request& req)
518 : CallAnyBase(std::move(params), impl::CallKind::kInputStream) {
519 impl::MiddlewarePipeline::PreStartCall(GetData());
520 impl::MiddlewarePipeline::PreSendMessage(GetData(), req);
521
522 // NOLINTNEXTLINE(cppcoreguidelines-prefer-member-initializer)
523 stream_ = prepare_async_call(GetData().GetStub(), &GetData().GetContext(), req, &GetData().GetQueue());
524 impl::StartCall(*stream_, GetData());
525
526 GetData().SetWritesFinished();
527}
528
529template <typename Response>
530bool InputStream<Response>::Read(Response& response) {
531 if (impl::Read(*stream_, response, GetData())) {
532 impl::MiddlewarePipeline::PostRecvMessage(GetData(), response);
533 return true;
534 } else {
535 // Finish can only be called once all the data is read, otherwise the
536 // underlying gRPC driver hangs.
537 auto post_finish = [](impl::RpcData& data, const grpc::Status& status) {
538 impl::MiddlewarePipeline::PostFinish(data, status);
539 };
540 impl::Finish(*stream_, GetData(), post_finish, true);
541 return false;
542 }
543}
544
545template <typename Request, typename Response>
546template <typename PrepareAsyncCall>
547OutputStream<Request, Response>::OutputStream(impl::CallParams&& params, PrepareAsyncCall prepare_async_call)
548 : CallAnyBase(std::move(params), impl::CallKind::kOutputStream), final_response_(std::make_unique<Response>()) {
549 impl::MiddlewarePipeline::PreStartCall(GetData());
550
551 // 'final_response_' will be filled upon successful 'Finish' async call
552 // NOLINTNEXTLINE(cppcoreguidelines-prefer-member-initializer)
553 stream_ =
554 prepare_async_call(GetData().GetStub(), &GetData().GetContext(), final_response_.get(), &GetData().GetQueue());
555 impl::StartCall(*stream_, GetData());
556}
557
558template <typename Request, typename Response>
559bool OutputStream<Request, Response>::Write(const Request& request) {
560 impl::MiddlewarePipeline::PreSendMessage(GetData(), request);
561
562 // Don't buffer writes, otherwise in an event subscription scenario, events
563 // may never actually be delivered
564 grpc::WriteOptions write_options{};
565 return impl::Write(*stream_, request, write_options, GetData());
566}
567
568template <typename Request, typename Response>
569void OutputStream<Request, Response>::WriteAndCheck(const Request& request) {
570 impl::MiddlewarePipeline::PreSendMessage(GetData(), request);
571
572 // Don't buffer writes, otherwise in an event subscription scenario, events
573 // may never actually be delivered
574 grpc::WriteOptions write_options{};
575 if (!impl::Write(*stream_, request, write_options, GetData())) {
576 auto post_finish = [](impl::RpcData& data, const grpc::Status& status) {
577 impl::MiddlewarePipeline::PostFinish(data, status);
578 };
579 impl::Finish(*stream_, GetData(), post_finish, true);
580 }
581}
582
583template <typename Request, typename Response>
584Response OutputStream<Request, Response>::Finish() {
585 // gRPC does not implicitly call `WritesDone` in `Finish`,
586 // contrary to the documentation
587 if (!GetData().AreWritesFinished()) {
588 impl::WritesDone(*stream_, GetData());
589 }
590
591 auto post_finish = [this](impl::RpcData& data, const grpc::Status& status) {
592 if (status.ok()) { // response is not filled on bad status
593 if constexpr (std::is_base_of_v<google::protobuf::Message, Response>) {
594 UASSERT(final_response_);
595 impl::MiddlewarePipeline::PostRecvMessage(data, *final_response_);
596 } else {
597 // unused by now
598 }
599 }
600 impl::MiddlewarePipeline::PostFinish(data, status);
601 };
602 impl::Finish(*stream_, GetData(), post_finish, true);
603
604 return std::move(*final_response_);
605}
606
607template <typename Request, typename Response>
608template <typename PrepareAsyncCall>
609BidirectionalStream<Request, Response>::BidirectionalStream(
610 impl::CallParams&& params,
611 PrepareAsyncCall prepare_async_call
612)
613 : CallAnyBase(std::move(params), impl::CallKind::kBidirectionalStream) {
614 impl::MiddlewarePipeline::PreStartCall(GetData());
615
616 // NOLINTNEXTLINE(cppcoreguidelines-prefer-member-initializer)
617 stream_ = prepare_async_call(GetData().GetStub(), &GetData().GetContext(), &GetData().GetQueue());
618 impl::StartCall(*stream_, GetData());
619}
620
621template <typename Request, typename Response>
622StreamReadFuture<BidirectionalStream<Request, Response>> BidirectionalStream<Request, Response>::ReadAsync(
623 Response& response
624) noexcept {
625 impl::ReadAsync(*stream_, response, GetData());
626 auto post_recv_message = [&response](impl::RpcData& data) {
627 impl::MiddlewarePipeline::PostRecvMessage(data, response);
628 };
629 auto post_finish = [](impl::RpcData& data, const grpc::Status& status) {
630 impl::MiddlewarePipeline::PostFinish(data, status);
631 };
632 return StreamReadFuture<BidirectionalStream<Request, Response>>{
633 GetData(), *stream_, post_recv_message, post_finish};
634}
635
636template <typename Request, typename Response>
637bool BidirectionalStream<Request, Response>::Read(Response& response) {
638 auto future = ReadAsync(response);
639 return future.Get();
640}
641
642template <typename Request, typename Response>
643bool BidirectionalStream<Request, Response>::Write(const Request& request) {
644 impl::MiddlewarePipeline::PreSendMessage(GetData(), request);
645
646 // Don't buffer writes, optimize for ping-pong-style interaction
647 grpc::WriteOptions write_options{};
648 return impl::Write(*stream_, request, write_options, GetData());
649}
650
651template <typename Request, typename Response>
652void BidirectionalStream<Request, Response>::WriteAndCheck(const Request& request) {
653 impl::MiddlewarePipeline::PreSendMessage(GetData(), request);
654
655 // Don't buffer writes, optimize for ping-pong-style interaction
656 grpc::WriteOptions write_options{};
657 impl::WriteAndCheck(*stream_, request, write_options, GetData());
658}
659
660template <typename Request, typename Response>
661bool BidirectionalStream<Request, Response>::WritesDone() {
662 return impl::WritesDone(*stream_, GetData());
663}
664
665} // namespace ugrpc::client
666
667USERVER_NAMESPACE_END