6#include <google/protobuf/message.h>
7#include <grpcpp/impl/codegen/proto_utils.h>
8#include <grpcpp/server_context.h>
10#include <userver/utils/assert.hpp>
12#include <userver/ugrpc/impl/deadline_timepoint.hpp>
13#include <userver/ugrpc/impl/internal_tag_fwd.hpp>
14#include <userver/ugrpc/impl/span.hpp>
15#include <userver/ugrpc/impl/statistics_scope.hpp>
16#include <userver/ugrpc/server/exceptions.hpp>
17#include <userver/ugrpc/server/impl/async_methods.hpp>
18#include <userver/ugrpc/server/impl/call_params.hpp>
19#include <userver/ugrpc/server/middlewares/fwd.hpp>
21USERVER_NAMESPACE_BEGIN
27std::string FormatLogMessage(
28 const std::multimap<grpc::string_ref, grpc::string_ref>& metadata,
29 std::string_view peer, std::chrono::system_clock::time_point start_time,
30 std::string_view call_name, grpc::StatusCode code);
57 grpc::ServerContext&
GetContext() {
return params_.context; }
98 return params_.storage_context;
109 CallAnyBase(utils::impl::InternalTag, impl::CallParams&& params,
111 : params_(std::move(params)), call_kind_(call_kind) {}
114 ugrpc::impl::RpcStatisticsScope& GetStatistics(ugrpc::impl::InternalTag);
117 void RunMiddlewarePipeline(utils::impl::InternalTag,
118 MiddlewareCallContext& md_call_context);
122 ugrpc::impl::RpcStatisticsScope& GetStatistics() {
123 return params_.statistics;
126 logging::LoggerRef AccessTskvLogger() {
return params_.access_tskv_logger; }
128 void LogFinish(grpc::Status status)
const;
130 void ApplyRequestHook(google::protobuf::Message* request);
132 void ApplyResponseHook(google::protobuf::Message* response);
135 impl::CallParams params_;
137 MiddlewareCallContext* middleware_call_context_{
nullptr};
143template <
typename Response>
152 void Finish(Response& response);
160 void Finish(Response&& response);
171 UnaryCall(impl::CallParams&& call_params,
172 impl::RawResponseWriter<Response>& stream);
174 UnaryCall(UnaryCall&&) =
delete;
175 UnaryCall& operator=(UnaryCall&&) =
delete;
181 impl::RawResponseWriter<Response>& stream_;
182 bool is_finished_{
false};
193template <
typename Request,
typename Response>
199 [[nodiscard]]
bool Read(Request& request);
207 void Finish(Response& response);
215 void Finish(Response&& response);
227 impl::RawReader<Request, Response>& stream);
229 InputStream(InputStream&&) =
delete;
230 InputStream& operator=(InputStream&&) =
delete;
236 enum class State { kOpen, kReadsDone, kFinished };
238 impl::RawReader<Request, Response>& stream_;
239 State state_{State::kOpen};
250template <
typename Response>
256 void Write(Response& response);
261 void Write(Response&& response);
300 impl::RawWriter<Response>& stream);
302 OutputStream(OutputStream&&) =
delete;
303 OutputStream& operator=(OutputStream&&) =
delete;
309 enum class State { kNew, kOpen, kFinished };
311 impl::RawWriter<Response>& stream_;
312 State state_{State::kNew};
327template <
typename Request,
typename Response>
339 void Write(Response& response);
344 void Write(Response&& response);
383 impl::RawReaderWriter<Request, Response>& stream);
387 ~BidirectionalStream();
392 impl::RawReaderWriter<Request, Response>& stream_;
393 bool are_reads_done_{
false};
394 bool is_finished_{
false};
399template <
typename Response>
400UnaryCall<Response>::
UnaryCall(impl::CallParams&& call_params,
401 impl::RawResponseWriter<Response>& stream)
402 :
CallAnyBase(utils::impl::InternalTag{}, std::move(call_params),
406template <
typename Response>
407UnaryCall<Response>::~UnaryCall() {
409 impl::CancelWithError(stream_, GetCallName());
410 LogFinish(impl::kUnknownErrorStatus);
414template <
typename Response>
415void UnaryCall<Response>::
Finish(Response&& response) {
419template <
typename Response>
420void UnaryCall<Response>::
Finish(Response& response) {
421 UINVARIANT(!is_finished_,
"'Finish' called on a finished call");
424 ApplyResponseHook(&response);
426 LogFinish(grpc::Status::OK);
427 impl::Finish(stream_, response, grpc::Status::OK, GetCallName());
428 GetStatistics().OnExplicitFinish(grpc::StatusCode::OK);
429 ugrpc::impl::UpdateSpanWithStatus(
GetSpan(), grpc::Status::OK);
432template <
typename Response>
437 impl::FinishWithError(stream_, status, GetCallName());
438 GetStatistics().OnExplicitFinish(status.error_code());
439 ugrpc::impl::UpdateSpanWithStatus(
GetSpan(), status);
442template <
typename Response>
447template <
typename Request,
typename Response>
449 impl::CallParams&& call_params, impl::RawReader<Request, Response>& stream)
450 :
CallAnyBase(utils::impl::InternalTag{}, std::move(call_params),
454template <
typename Request,
typename Response>
455InputStream<Request, Response>::~InputStream() {
456 if (state_ != State::kFinished) {
457 impl::CancelWithError(stream_, GetCallName());
458 LogFinish(impl::kUnknownErrorStatus);
462template <
typename Request,
typename Response>
463bool InputStream<Request, Response>::
Read(Request& request) {
465 "'Read' called while the stream is half-closed for reads");
466 if (impl::Read(stream_, request)) {
467 ApplyRequestHook(&request);
470 state_ = State::kReadsDone;
475template <
typename Request,
typename Response>
476void InputStream<Request, Response>::
Finish(Response&& response) {
480template <
typename Request,
typename Response>
481void InputStream<Request, Response>::
Finish(Response& response) {
483 "'Finish' called on a finished stream");
484 state_ = State::kFinished;
486 const auto& status = grpc::Status::OK;
489 ApplyResponseHook(&response);
491 impl::Finish(stream_, response, status, GetCallName());
492 GetStatistics().OnExplicitFinish(status.error_code());
493 ugrpc::impl::UpdateSpanWithStatus(
GetSpan(), status);
496template <
typename Request,
typename Response>
498 const grpc::Status& status) {
501 state_ = State::kFinished;
503 impl::FinishWithError(stream_, status, GetCallName());
504 GetStatistics().OnExplicitFinish(status.error_code());
505 ugrpc::impl::UpdateSpanWithStatus(
GetSpan(), status);
508template <
typename Request,
typename Response>
510 return state_ == State::kFinished;
513template <
typename Response>
515 impl::RawWriter<Response>& stream)
516 :
CallAnyBase(utils::impl::InternalTag{}, std::move(call_params),
520template <
typename Response>
521OutputStream<Response>::~OutputStream() {
522 if (state_ != State::kFinished) {
523 impl::Cancel(stream_, GetCallName());
524 LogFinish(impl::kUnknownErrorStatus);
528template <
typename Response>
529void OutputStream<Response>::
Write(Response&& response) {
533template <
typename Response>
534void OutputStream<Response>::
Write(Response& response) {
535 UINVARIANT(state_ != State::kFinished,
"'Write' called on a finished stream");
539 impl::SendInitialMetadataIfNew(stream_, GetCallName(), state_);
543 grpc::WriteOptions write_options{};
545 ApplyResponseHook(&response);
547 impl::Write(stream_, response, write_options, GetCallName());
550template <
typename Response>
553 "'Finish' called on a finished stream");
554 state_ = State::kFinished;
556 const auto& status = grpc::Status::OK;
558 impl::Finish(stream_, status, GetCallName());
559 GetStatistics().OnExplicitFinish(grpc::StatusCode::OK);
560 ugrpc::impl::UpdateSpanWithStatus(
GetSpan(), status);
563template <
typename Response>
567 state_ = State::kFinished;
569 impl::Finish(stream_, status, GetCallName());
570 GetStatistics().OnExplicitFinish(status.error_code());
571 ugrpc::impl::UpdateSpanWithStatus(
GetSpan(), status);
574template <
typename Response>
576 WriteAndFinish(response);
579template <
typename Response>
582 "'WriteAndFinish' called on a finished stream");
583 state_ = State::kFinished;
587 grpc::WriteOptions write_options{};
589 const auto& status = grpc::Status::OK;
592 ApplyResponseHook(&response);
594 impl::WriteAndFinish(stream_, response, write_options, status, GetCallName());
595 GetStatistics().OnExplicitFinish(grpc::StatusCode::OK);
596 ugrpc::impl::UpdateSpanWithStatus(
GetSpan(), status);
599template <
typename Response>
601 return state_ == State::kFinished;
604template <
typename Request,
typename Response>
606 impl::CallParams&& call_params,
607 impl::RawReaderWriter<Request, Response>& stream)
608 :
CallAnyBase(utils::impl::InternalTag{}, std::move(call_params),
612template <
typename Request,
typename Response>
615 impl::Cancel(stream_, GetCallName());
616 LogFinish(impl::kUnknownErrorStatus);
620template <
typename Request,
typename Response>
623 "'Read' called while the stream is half-closed for reads");
624 if (impl::Read(stream_, request)) {
625 if constexpr (std::is_base_of_v<google::protobuf::Message, Request>) {
626 ApplyRequestHook(&request);
630 are_reads_done_ =
true;
635template <
typename Request,
typename Response>
640template <
typename Request,
typename Response>
642 UINVARIANT(!is_finished_,
"'Write' called on a finished stream");
645 grpc::WriteOptions write_options{};
647 if constexpr (std::is_base_of_v<google::protobuf::Message, Response>) {
648 ApplyResponseHook(&response);
652 impl::Write(stream_, response, write_options, GetCallName());
659template <
typename Request,
typename Response>
661 UINVARIANT(!is_finished_,
"'Finish' called on a finished stream");
664 const auto& status = grpc::Status::OK;
666 impl::Finish(stream_, status, GetCallName());
667 GetStatistics().OnExplicitFinish(grpc::StatusCode::OK);
668 ugrpc::impl::UpdateSpanWithStatus(
GetSpan(), status);
671template <
typename Request,
typename Response>
673 const grpc::Status& status) {
678 impl::Finish(stream_, status, GetCallName());
679 GetStatistics().OnExplicitFinish(status.error_code());
680 ugrpc::impl::UpdateSpanWithStatus(
GetSpan(), status);
683template <
typename Request,
typename Response>
685 Response&& response) {
686 WriteAndFinish(response);
689template <
typename Request,
typename Response>
691 Response& response) {
692 UINVARIANT(!is_finished_,
"'WriteAndFinish' called on a finished stream");
696 grpc::WriteOptions write_options{};
698 const auto& status = grpc::Status::OK;
701 if constexpr (std::is_base_of_v<google::protobuf::Message, Response>) {
702 ApplyResponseHook(&response);
705 impl::WriteAndFinish(stream_, response, write_options, status, GetCallName());
706 GetStatistics().OnExplicitFinish(status.error_code());
707 ugrpc::impl::UpdateSpanWithStatus(
GetSpan(), status);
710template <
typename Request,
typename Response>