6#include <google/protobuf/message.h>
8#include <userver/utils/assert.hpp>
10#include <userver/ugrpc/impl/deadline_timepoint.hpp>
11#include <userver/ugrpc/impl/internal_tag_fwd.hpp>
12#include <userver/ugrpc/impl/span.hpp>
13#include <userver/ugrpc/server/call.hpp>
14#include <userver/ugrpc/server/exceptions.hpp>
15#include <userver/ugrpc/server/impl/async_methods.hpp>
16#include <userver/ugrpc/server/stream.hpp>
18USERVER_NAMESPACE_BEGIN
25template <
typename Response>
34 void Finish(Response& response);
42 void Finish(Response&& response);
53 UnaryCall(impl::CallParams&& call_params, impl::RawResponseWriter<Response>& stream);
55 UnaryCall(UnaryCall&&) =
delete;
56 UnaryCall& operator=(UnaryCall&&) =
delete;
62 impl::RawResponseWriter<Response>& stream_;
63 bool is_finished_{
false};
74template <
typename Request,
typename Response>
80 bool Read(Request& request)
override;
88 void Finish(Response& response);
96 void Finish(Response&& response);
107 InputStream(impl::CallParams&& call_params, impl::RawReader<Request, Response>& stream);
109 InputStream(InputStream&&) =
delete;
110 InputStream& operator=(InputStream&&) =
delete;
111 ~InputStream()
override;
116 enum class State { kOpen, kReadsDone, kFinished };
118 impl::RawReader<Request, Response>& stream_;
119 State state_{State::kOpen};
130template <
typename Response>
136 void Write(Response& response)
override;
141 void Write(Response&& response)
override;
179 OutputStream(impl::CallParams&& call_params, impl::RawWriter<Response>& stream);
181 OutputStream(OutputStream&&) =
delete;
182 OutputStream& operator=(OutputStream&&) =
delete;
183 ~OutputStream()
override;
188 enum class State { kNew, kOpen, kFinished };
190 impl::RawWriter<Response>& stream_;
191 State state_{State::kNew};
206template <
typename Request,
typename Response>
213 bool Read(Request& request)
override;
218 void Write(Response& response)
override;
223 void Write(Response&& response)
override;
261 BidirectionalStream(impl::CallParams&& call_params, impl::RawReaderWriter<Request, Response>& stream);
265 ~BidirectionalStream()
override;
270 impl::RawReaderWriter<Request, Response>& stream_;
271 bool are_reads_done_{
false};
272 bool is_finished_{
false};
275template <
typename Response>
276UnaryCall<Response>::
UnaryCall(impl::CallParams&& call_params, impl::RawResponseWriter<Response>& stream)
279template <
typename Response>
280UnaryCall<Response>::~UnaryCall() {
282 impl::CancelWithError(stream_, GetCallName());
283 LogFinish(impl::kUnknownErrorStatus);
287template <
typename Response>
288void UnaryCall<Response>::
Finish(Response&& response) {
292template <
typename Response>
293void UnaryCall<Response>::
Finish(Response& response) {
294 UINVARIANT(!is_finished_,
"'Finish' called on a finished call");
295 ApplyResponseHook(&response);
301 LogFinish(grpc::Status::OK);
302 impl::Finish(stream_, response, grpc::Status::OK, GetCallName());
303 GetStatistics().OnExplicitFinish(grpc::StatusCode::OK);
304 ugrpc::impl::UpdateSpanWithStatus(
GetSpan(), grpc::Status::OK);
307template <
typename Response>
312 impl::FinishWithError(stream_, status, GetCallName());
313 GetStatistics().OnExplicitFinish(status.error_code());
314 ugrpc::impl::UpdateSpanWithStatus(
GetSpan(), status);
317template <
typename Response>
322template <
typename Request,
typename Response>
323InputStream<Request, Response>::
InputStream(impl::CallParams&& call_params, impl::RawReader<Request, Response>& stream)
326template <
typename Request,
typename Response>
327InputStream<Request, Response>::~InputStream() {
328 if (state_ != State::kFinished) {
329 impl::CancelWithError(stream_, GetCallName());
330 LogFinish(impl::kUnknownErrorStatus);
334template <
typename Request,
typename Response>
335bool InputStream<Request, Response>::
Read(Request& request) {
336 UINVARIANT(state_ == State::kOpen,
"'Read' called while the stream is half-closed for reads");
337 if (impl::Read(stream_, request)) {
338 ApplyRequestHook(&request);
341 state_ = State::kReadsDone;
346template <
typename Request,
typename Response>
347void InputStream<Request, Response>::
Finish(Response&& response) {
351template <
typename Request,
typename Response>
352void InputStream<Request, Response>::
Finish(Response& response) {
353 UINVARIANT(state_ != State::kFinished,
"'Finish' called on a finished stream");
354 ApplyResponseHook(&response);
358 state_ = State::kFinished;
360 const auto& status = grpc::Status::OK;
363 impl::Finish(stream_, response, status, GetCallName());
364 GetStatistics().OnExplicitFinish(status.error_code());
365 ugrpc::impl::UpdateSpanWithStatus(
GetSpan(), status);
368template <
typename Request,
typename Response>
372 state_ = State::kFinished;
374 impl::FinishWithError(stream_, status, GetCallName());
375 GetStatistics().OnExplicitFinish(status.error_code());
376 ugrpc::impl::UpdateSpanWithStatus(
GetSpan(), status);
379template <
typename Request,
typename Response>
381 return state_ == State::kFinished;
384template <
typename Response>
385OutputStream<Response>::
OutputStream(impl::CallParams&& call_params, impl::RawWriter<Response>& stream)
388template <
typename Response>
389OutputStream<Response>::~OutputStream() {
390 if (state_ != State::kFinished) {
391 impl::Cancel(stream_, GetCallName());
392 LogFinish(impl::kUnknownErrorStatus);
396template <
typename Response>
397void OutputStream<Response>::
Write(Response&& response) {
401template <
typename Response>
402void OutputStream<Response>::
Write(Response& response) {
403 UINVARIANT(state_ != State::kFinished,
"'Write' called on a finished stream");
404 ApplyResponseHook(&response);
408 impl::SendInitialMetadataIfNew(stream_, GetCallName(), state_);
412 grpc::WriteOptions write_options{};
414 impl::Write(stream_, response, write_options, GetCallName());
417template <
typename Response>
419 UINVARIANT(state_ != State::kFinished,
"'Finish' called on a finished stream");
420 state_ = State::kFinished;
422 const auto& status = grpc::Status::OK;
424 impl::Finish(stream_, status, GetCallName());
425 GetStatistics().OnExplicitFinish(grpc::StatusCode::OK);
426 ugrpc::impl::UpdateSpanWithStatus(
GetSpan(), status);
429template <
typename Response>
433 state_ = State::kFinished;
435 impl::Finish(stream_, status, GetCallName());
436 GetStatistics().OnExplicitFinish(status.error_code());
437 ugrpc::impl::UpdateSpanWithStatus(
GetSpan(), status);
440template <
typename Response>
442 WriteAndFinish(response);
445template <
typename Response>
447 UINVARIANT(state_ != State::kFinished,
"'WriteAndFinish' called on a finished stream");
448 ApplyResponseHook(&response);
452 state_ = State::kFinished;
456 grpc::WriteOptions write_options{};
458 const auto& status = grpc::Status::OK;
461 impl::WriteAndFinish(stream_, response, write_options, status, GetCallName());
462 GetStatistics().OnExplicitFinish(grpc::StatusCode::OK);
463 ugrpc::impl::UpdateSpanWithStatus(
GetSpan(), status);
466template <
typename Response>
468 return state_ == State::kFinished;
471template <
typename Request,
typename Response>
473 impl::CallParams&& call_params,
474 impl::RawReaderWriter<Request, Response>& stream
479template <
typename Request,
typename Response>
482 impl::Cancel(stream_, GetCallName());
483 LogFinish(impl::kUnknownErrorStatus);
487template <
typename Request,
typename Response>
489 UINVARIANT(!are_reads_done_,
"'Read' called while the stream is half-closed for reads");
490 if (impl::Read(stream_, request)) {
491 if constexpr (std::is_base_of_v<google::protobuf::Message, Request>) {
492 ApplyRequestHook(&request);
496 are_reads_done_ =
true;
501template <
typename Request,
typename Response>
506template <
typename Request,
typename Response>
508 UINVARIANT(!is_finished_,
"'Write' called on a finished stream");
509 if constexpr (std::is_base_of_v<google::protobuf::Message, Response>) {
510 ApplyResponseHook(&response);
514 grpc::WriteOptions write_options{};
517 impl::Write(stream_, response, write_options, GetCallName());
524template <
typename Request,
typename Response>
526 UINVARIANT(!is_finished_,
"'Finish' called on a finished stream");
529 const auto& status = grpc::Status::OK;
531 impl::Finish(stream_, status, GetCallName());
532 GetStatistics().OnExplicitFinish(grpc::StatusCode::OK);
533 ugrpc::impl::UpdateSpanWithStatus(
GetSpan(), status);
536template <
typename Request,
typename Response>
542 impl::Finish(stream_, status, GetCallName());
543 GetStatistics().OnExplicitFinish(status.error_code());
544 ugrpc::impl::UpdateSpanWithStatus(
GetSpan(), status);
547template <
typename Request,
typename Response>
549 WriteAndFinish(response);
552template <
typename Request,
typename Response>
554 UINVARIANT(!is_finished_,
"'WriteAndFinish' called on a finished stream");
555 if constexpr (std::is_base_of_v<google::protobuf::Message, Response>) {
556 ApplyResponseHook(&response);
564 grpc::WriteOptions write_options{};
566 const auto& status = grpc::Status::OK;
569 impl::WriteAndFinish(stream_, response, write_options, status, GetCallName());
570 GetStatistics().OnExplicitFinish(status.error_code());
571 ugrpc::impl::UpdateSpanWithStatus(
GetSpan(), status);
574template <
typename Request,
typename Response>