6#include <grpcpp/impl/codegen/proto_utils.h>
7#include <grpcpp/server_context.h>
9#include <userver/utils/assert.hpp>
11#include <userver/ugrpc/impl/deadline_timepoint.hpp>
12#include <userver/ugrpc/impl/internal_tag_fwd.hpp>
13#include <userver/ugrpc/impl/span.hpp>
14#include <userver/ugrpc/impl/statistics_scope.hpp>
15#include <userver/ugrpc/server/exceptions.hpp>
16#include <userver/ugrpc/server/impl/async_methods.hpp>
17#include <userver/ugrpc/server/impl/call_params.hpp>
19USERVER_NAMESPACE_BEGIN
25std::string FormatLogMessage(
26 const std::multimap<grpc::string_ref, grpc::string_ref>& metadata,
27 std::string_view peer, std::chrono::system_clock::time_point start_time,
28 std::string_view call_name, grpc::StatusCode code);
35 CallAnyBase(impl::CallParams&& params) : params_(std::move(params)) {}
48 grpc::ServerContext&
GetContext() {
return params_.context; }
53 tracing::Span& GetSpan() {
return params_.call_span; }
78 return params_.storage_context;
81 virtual bool IsFinished()
const = 0;
85 ugrpc::impl::RpcStatisticsScope& Statistics(ugrpc::impl::InternalTag);
89 ugrpc::impl::RpcStatisticsScope& Statistics() {
return params_.statistics; }
91 logging::LoggerRef AccessTskvLogger() {
return params_.access_tskv_logger; }
93 void LogFinish(grpc::Status status)
const;
96 impl::CallParams params_;
102template <
typename Response>
111 void Finish(
const Response& response);
122 UnaryCall(impl::CallParams&& call_params,
123 impl::RawResponseWriter<Response>& stream);
125 UnaryCall(UnaryCall&&) =
delete;
126 UnaryCall& operator=(UnaryCall&&) =
delete;
129 bool IsFinished()
const override;
132 impl::RawResponseWriter<Response>& stream_;
133 bool is_finished_{
false};
144template <
typename Request,
typename Response>
150 [[nodiscard]]
bool Read(Request& request);
158 void Finish(
const Response& response);
170 impl::RawReader<Request, Response>& stream);
172 InputStream(InputStream&&) =
delete;
173 InputStream& operator=(InputStream&&) =
delete;
176 bool IsFinished()
const override;
179 enum class State { kOpen, kReadsDone, kFinished };
181 impl::RawReader<Request, Response>& stream_;
182 State state_{State::kOpen};
193template <
typename Response>
199 void Write(
const Response& response);
228 impl::RawWriter<Response>& stream);
230 OutputStream(OutputStream&&) =
delete;
231 OutputStream& operator=(OutputStream&&) =
delete;
234 bool IsFinished()
const override;
237 enum class State { kNew, kOpen, kFinished };
239 impl::RawWriter<Response>& stream_;
240 State state_{State::kNew};
255template <
typename Request,
typename Response>
267 void Write(
const Response& response);
296 impl::RawReaderWriter<Request, Response>& stream);
300 ~BidirectionalStream();
302 bool IsFinished()
const override;
305 impl::RawReaderWriter<Request, Response>& stream_;
306 bool are_reads_done_{
false};
307 bool is_finished_{
false};
312template <
typename Response>
313UnaryCall<Response>::
UnaryCall(impl::CallParams&& call_params,
314 impl::RawResponseWriter<Response>& stream)
317template <
typename Response>
318UnaryCall<Response>::~UnaryCall() {
320 impl::CancelWithError(stream_, GetCallName());
321 LogFinish(impl::kUnknownErrorStatus);
325template <
typename Response>
326void UnaryCall<Response>::
Finish(
const Response& response) {
327 UINVARIANT(!is_finished_,
"'Finish' called on a finished call");
330 LogFinish(grpc::Status::OK);
331 impl::Finish(stream_, response, grpc::Status::OK, GetCallName());
332 Statistics().OnExplicitFinish(grpc::StatusCode::OK);
333 ugrpc::impl::UpdateSpanWithStatus(GetSpan(), grpc::Status::OK);
336template <
typename Response>
338 UINVARIANT(!is_finished_,
"'FinishWithError' called on a finished call");
341 impl::FinishWithError(stream_, status, GetCallName());
342 Statistics().OnExplicitFinish(status.error_code());
343 ugrpc::impl::UpdateSpanWithStatus(GetSpan(), status);
346template <
typename Response>
347bool UnaryCall<Response>::IsFinished()
const {
351template <
typename Request,
typename Response>
353 impl::CallParams&& call_params, impl::RawReader<Request, Response>& stream)
356template <
typename Request,
typename Response>
357InputStream<Request, Response>::~InputStream() {
358 if (state_ != State::kFinished) {
359 impl::CancelWithError(stream_, GetCallName());
360 LogFinish(impl::kUnknownErrorStatus);
364template <
typename Request,
typename Response>
365bool InputStream<Request, Response>::
Read(Request& request) {
367 "'Read' called while the stream is half-closed for reads");
368 if (impl::Read(stream_, request)) {
371 state_ = State::kReadsDone;
376template <
typename Request,
typename Response>
377void InputStream<Request, Response>::
Finish(
const Response& response) {
379 "'Finish' called on a finished stream");
380 state_ = State::kFinished;
381 LogFinish(grpc::Status::OK);
382 impl::Finish(stream_, response, grpc::Status::OK, GetCallName());
383 Statistics().OnExplicitFinish(grpc::StatusCode::OK);
384 ugrpc::impl::UpdateSpanWithStatus(GetSpan(), grpc::Status::OK);
387template <
typename Request,
typename Response>
389 const grpc::Status& status) {
392 "'FinishWithError' called on a finished stream");
393 state_ = State::kFinished;
395 impl::FinishWithError(stream_, status, GetCallName());
396 Statistics().OnExplicitFinish(status.error_code());
397 ugrpc::impl::UpdateSpanWithStatus(GetSpan(), status);
400template <
typename Request,
typename Response>
401bool InputStream<Request, Response>::IsFinished()
const {
402 return state_ == State::kFinished;
405template <
typename Response>
407 impl::RawWriter<Response>& stream)
410template <
typename Response>
411OutputStream<Response>::~OutputStream() {
412 if (state_ != State::kFinished) {
413 impl::Cancel(stream_, GetCallName());
414 LogFinish(impl::kUnknownErrorStatus);
418template <
typename Response>
419void OutputStream<Response>::
Write(
const Response& response) {
420 UINVARIANT(state_ != State::kFinished,
"'Write' called on a finished stream");
424 impl::SendInitialMetadataIfNew(stream_, GetCallName(), state_);
428 grpc::WriteOptions write_options{};
430 impl::Write(stream_, response, write_options, GetCallName());
433template <
typename Response>
436 "'Finish' called on a finished stream");
437 state_ = State::kFinished;
438 const auto status = grpc::Status::OK;
440 impl::Finish(stream_, status, GetCallName());
441 Statistics().OnExplicitFinish(grpc::StatusCode::OK);
442 ugrpc::impl::UpdateSpanWithStatus(GetSpan(), status);
445template <
typename Response>
449 "'Finish' called on a finished stream");
450 state_ = State::kFinished;
452 impl::Finish(stream_, status, GetCallName());
453 Statistics().OnExplicitFinish(status.error_code());
454 ugrpc::impl::UpdateSpanWithStatus(GetSpan(), status);
457template <
typename Response>
460 "'WriteAndFinish' called on a finished stream");
461 state_ = State::kFinished;
465 grpc::WriteOptions write_options{};
467 const auto status = grpc::Status::OK;
469 impl::WriteAndFinish(stream_, response, write_options, status, GetCallName());
472template <
typename Response>
473bool OutputStream<Response>::IsFinished()
const {
474 return state_ == State::kFinished;
477template <
typename Request,
typename Response>
479 impl::CallParams&& call_params,
480 impl::RawReaderWriter<Request, Response>& stream)
483template <
typename Request,
typename Response>
486 impl::Cancel(stream_, GetCallName());
487 LogFinish(impl::kUnknownErrorStatus);
491template <
typename Request,
typename Response>
494 "'Read' called while the stream is half-closed for reads");
495 if (impl::Read(stream_, request)) {
498 are_reads_done_ =
true;
503template <
typename Request,
typename Response>
505 UINVARIANT(!is_finished_,
"'Write' called on a finished stream");
508 grpc::WriteOptions write_options{};
511 impl::Write(stream_, response, write_options, GetCallName());
518template <
typename Request,
typename Response>
520 UINVARIANT(!is_finished_,
"'Finish' called on a finished stream");
522 const auto status = grpc::Status::OK;
524 impl::Finish(stream_, status, GetCallName());
525 Statistics().OnExplicitFinish(grpc::StatusCode::OK);
526 ugrpc::impl::UpdateSpanWithStatus(GetSpan(), status);
529template <
typename Request,
typename Response>
531 const grpc::Status& status) {
533 UINVARIANT(!is_finished_,
"'FinishWithError' called on a finished stream");
536 impl::Finish(stream_, status, GetCallName());
537 Statistics().OnExplicitFinish(status.error_code());
538 ugrpc::impl::UpdateSpanWithStatus(GetSpan(), status);
541template <
typename Request,
typename Response>
543 const Response& response) {
544 UINVARIANT(!is_finished_,
"'WriteAndFinish' called on a finished stream");
548 grpc::WriteOptions write_options{};
550 const auto status = grpc::Status::OK;
552 impl::WriteAndFinish(stream_, response, write_options, status, GetCallName());
555template <
typename Request,
typename Response>