6#include <grpcpp/impl/codegen/proto_utils.h>
7#include <grpcpp/server_context.h>
9#include <userver/utils/assert.hpp>
11#include <userver/ugrpc/impl/deadline_timepoint.hpp>
12#include <userver/ugrpc/impl/internal_tag_fwd.hpp>
13#include <userver/ugrpc/impl/span.hpp>
14#include <userver/ugrpc/impl/statistics_scope.hpp>
15#include <userver/ugrpc/server/exceptions.hpp>
16#include <userver/ugrpc/server/impl/async_methods.hpp>
17#include <userver/ugrpc/server/impl/call_params.hpp>
19USERVER_NAMESPACE_BEGIN
25std::string FormatLogMessage(
26 const std::multimap<grpc::string_ref, grpc::string_ref>& metadata,
27 std::string_view peer, std::chrono::system_clock::time_point start_time,
28 std::string_view call_name,
int code);
35 CallAnyBase(impl::CallParams&& params) : params_(params) {}
48 grpc::ServerContext&
GetContext() {
return params_.context; }
53 tracing::Span& GetSpan() {
return params_.call_span; }
55 virtual bool IsFinished()
const = 0;
59 ugrpc::impl::RpcStatisticsScope& Statistics(ugrpc::impl::InternalTag);
63 ugrpc::impl::RpcStatisticsScope& Statistics() {
return params_.statistics; }
65 logging::LoggerRef AccessTskvLogger() {
return params_.access_tskv_logger; }
67 void LogFinish(grpc::Status status)
const;
70 impl::CallParams params_;
76template <
typename Response>
85 void Finish(
const Response& response);
97 impl::RawResponseWriter<Response>& stream);
99 UnaryCall(UnaryCall&&) =
delete;
100 UnaryCall& operator=(UnaryCall&&) =
delete;
103 bool IsFinished()
const override;
106 impl::RawResponseWriter<Response>& stream_;
107 bool is_finished_{
false};
118template <
typename Request,
typename Response>
124 [[nodiscard]]
bool Read(Request& request);
132 void Finish(
const Response& response);
144 impl::RawReader<Request, Response>& stream);
146 InputStream(InputStream&&) =
delete;
147 InputStream& operator=(InputStream&&) =
delete;
150 bool IsFinished()
const override;
153 enum class State { kOpen, kReadsDone, kFinished };
155 impl::RawReader<Request, Response>& stream_;
156 State state_{State::kOpen};
167template <
typename Response>
173 void Write(
const Response& response);
202 impl::RawWriter<Response>& stream);
204 OutputStream(OutputStream&&) =
delete;
205 OutputStream& operator=(OutputStream&&) =
delete;
208 bool IsFinished()
const override;
211 enum class State { kNew, kOpen, kFinished };
213 impl::RawWriter<Response>& stream_;
214 State state_{State::kNew};
225template <
typename Request,
typename Response>
237 void Write(
const Response& response);
266 impl::RawReaderWriter<Request, Response>& stream);
270 ~BidirectionalStream();
272 bool IsFinished()
const override;
275 enum class State { kOpen, kReadsDone, kFinished };
277 impl::RawReaderWriter<Request, Response>& stream_;
278 State state_{State::kOpen};
283template <
typename Response>
284UnaryCall<Response>::
UnaryCall(impl::CallParams&& call_params,
285 impl::RawResponseWriter<Response>& stream)
288template <
typename Response>
289UnaryCall<Response>::~UnaryCall() {
291 impl::CancelWithError(stream_, GetCallName());
292 LogFinish(impl::kUnknownErrorStatus);
296template <
typename Response>
297void UnaryCall<Response>::
Finish(
const Response& response) {
298 UINVARIANT(!is_finished_,
"'Finish' called on a finished call");
301 LogFinish(grpc::Status::OK);
302 impl::Finish(stream_, response, grpc::Status::OK, GetCallName());
303 Statistics().OnExplicitFinish(grpc::StatusCode::OK);
304 ugrpc::impl::UpdateSpanWithStatus(GetSpan(), grpc::Status::OK);
307template <
typename Response>
309 UINVARIANT(!is_finished_,
"'FinishWithError' called on a finished call");
312 impl::FinishWithError(stream_, status, GetCallName());
313 Statistics().OnExplicitFinish(status.error_code());
314 ugrpc::impl::UpdateSpanWithStatus(GetSpan(), status);
317template <
typename Response>
318bool UnaryCall<Response>::IsFinished()
const {
322template <
typename Request,
typename Response>
324 impl::CallParams&& call_params, impl::RawReader<Request, Response>& stream)
327template <
typename Request,
typename Response>
328InputStream<Request, Response>::~InputStream() {
329 if (state_ != State::kFinished) {
330 impl::CancelWithError(stream_, GetCallName());
331 LogFinish(impl::kUnknownErrorStatus);
335template <
typename Request,
typename Response>
336bool InputStream<Request, Response>::
Read(Request& request) {
338 "'Read' called while the stream is half-closed for reads");
339 if (impl::Read(stream_, request)) {
342 state_ = State::kReadsDone;
347template <
typename Request,
typename Response>
348void InputStream<Request, Response>::
Finish(
const Response& response) {
350 "'Finish' called on a finished stream");
351 state_ = State::kFinished;
352 LogFinish(grpc::Status::OK);
353 impl::Finish(stream_, response, grpc::Status::OK, GetCallName());
354 Statistics().OnExplicitFinish(grpc::StatusCode::OK);
355 ugrpc::impl::UpdateSpanWithStatus(GetSpan(), grpc::Status::OK);
358template <
typename Request,
typename Response>
360 const grpc::Status& status) {
363 "'FinishWithError' called on a finished stream");
364 state_ = State::kFinished;
366 impl::FinishWithError(stream_, status, GetCallName());
367 Statistics().OnExplicitFinish(status.error_code());
368 ugrpc::impl::UpdateSpanWithStatus(GetSpan(), status);
371template <
typename Request,
typename Response>
372bool InputStream<Request, Response>::IsFinished()
const {
373 return state_ == State::kFinished;
376template <
typename Response>
378 impl::RawWriter<Response>& stream)
381template <
typename Response>
382OutputStream<Response>::~OutputStream() {
383 if (state_ != State::kFinished) {
384 impl::Cancel(stream_, GetCallName());
385 LogFinish(impl::kUnknownErrorStatus);
389template <
typename Response>
390void OutputStream<Response>::
Write(
const Response& response) {
391 UINVARIANT(state_ != State::kFinished,
"'Write' called on a finished stream");
395 impl::SendInitialMetadataIfNew(stream_, GetCallName(), state_);
399 grpc::WriteOptions write_options{};
401 impl::Write(stream_, response, write_options, GetCallName());
404template <
typename Response>
407 "'Finish' called on a finished stream");
408 state_ = State::kFinished;
409 const auto status = grpc::Status::OK;
411 impl::Finish(stream_, status, GetCallName());
412 Statistics().OnExplicitFinish(grpc::StatusCode::OK);
413 ugrpc::impl::UpdateSpanWithStatus(GetSpan(), status);
416template <
typename Response>
420 "'Finish' called on a finished stream");
421 state_ = State::kFinished;
423 impl::Finish(stream_, status, GetCallName());
424 Statistics().OnExplicitFinish(status.error_code());
425 ugrpc::impl::UpdateSpanWithStatus(GetSpan(), status);
428template <
typename Response>
431 "'WriteAndFinish' called on a finished stream");
432 state_ = State::kFinished;
436 grpc::WriteOptions write_options{};
438 const auto status = grpc::Status::OK;
440 impl::WriteAndFinish(stream_, response, write_options, status, GetCallName());
443template <
typename Response>
444bool OutputStream<Response>::IsFinished()
const {
445 return state_ == State::kFinished;
448template <
typename Request,
typename Response>
450 impl::CallParams&& call_params,
451 impl::RawReaderWriter<Request, Response>& stream)
454template <
typename Request,
typename Response>
456 if (state_ != State::kFinished) {
457 impl::Cancel(stream_, GetCallName());
458 LogFinish(impl::kUnknownErrorStatus);
462template <
typename Request,
typename Response>
465 "'Read' called while the stream is half-closed for reads");
466 if (impl::Read(stream_, request)) {
469 state_ = State::kReadsDone;
474template <
typename Request,
typename Response>
476 UINVARIANT(state_ != State::kFinished,
"'Write' called on a finished stream");
479 grpc::WriteOptions write_options{};
481 impl::Write(stream_, response, write_options, GetCallName());
484template <
typename Request,
typename Response>
487 "'Finish' called on a finished stream");
488 state_ = State::kFinished;
489 const auto status = grpc::Status::OK;
491 impl::Finish(stream_, status, GetCallName());
492 Statistics().OnExplicitFinish(grpc::StatusCode::OK);
493 ugrpc::impl::UpdateSpanWithStatus(GetSpan(), status);
496template <
typename Request,
typename Response>
498 const grpc::Status& status) {
501 "'FinishWithError' called on a finished stream");
502 state_ = State::kFinished;
504 impl::Finish(stream_, status, GetCallName());
505 Statistics().OnExplicitFinish(status.error_code());
506 ugrpc::impl::UpdateSpanWithStatus(GetSpan(), status);
509template <
typename Request,
typename Response>
511 const Response& response) {
513 "'WriteAndFinish' called on a finished stream");
514 state_ = State::kFinished;
517 grpc::WriteOptions write_options{};
519 const auto status = grpc::Status::OK;
521 impl::WriteAndFinish(stream_, response, write_options, status, GetCallName());
524template <
typename Request,
typename Response>
526 return state_ == State::kFinished;