6#include <grpcpp/impl/codegen/proto_utils.h> 
    7#include <grpcpp/server_context.h> 
    9#include <userver/utils/assert.hpp> 
   11#include <userver/ugrpc/impl/deadline_timepoint.hpp> 
   12#include <userver/ugrpc/impl/internal_tag_fwd.hpp> 
   13#include <userver/ugrpc/impl/span.hpp> 
   14#include <userver/ugrpc/impl/statistics_scope.hpp> 
   15#include <userver/ugrpc/server/exceptions.hpp> 
   16#include <userver/ugrpc/server/impl/async_methods.hpp> 
   17#include <userver/ugrpc/server/impl/call_params.hpp> 
   19USERVER_NAMESPACE_BEGIN
 
   25std::string FormatLogMessage(
 
   26    const std::multimap<grpc::string_ref, grpc::string_ref>& metadata,
 
   27    std::string_view peer, std::chrono::system_clock::time_point start_time,
 
   28    std::string_view call_name, 
int code);
 
   35  CallAnyBase(impl::CallParams&& params) : params_(params) {}
 
   48  grpc::ServerContext& 
GetContext() { 
return params_.context; }
 
   53  tracing::Span& GetSpan() { 
return params_.call_span; }
 
   55  virtual bool IsFinished() 
const = 0;
 
   59  ugrpc::impl::RpcStatisticsScope& Statistics(ugrpc::impl::InternalTag);
 
   63  ugrpc::impl::RpcStatisticsScope& Statistics() { 
return params_.statistics; }
 
   65  logging::LoggerRef AccessTskvLogger() { 
return params_.access_tskv_logger; }
 
   67  void LogFinish(grpc::Status status) 
const;
 
   70  impl::CallParams params_;
 
   76template <
typename Response>
 
   85  void Finish(
const Response& response);
 
   97            impl::RawResponseWriter<Response>& stream);
 
   99  UnaryCall(UnaryCall&&) = 
delete;
 
  100  UnaryCall& operator=(UnaryCall&&) = 
delete;
 
  103  bool IsFinished() 
const override;
 
  106  impl::RawResponseWriter<Response>& stream_;
 
  107  bool is_finished_{
false};
 
  118template <
typename Request, 
typename Response>
 
  124  [[nodiscard]] 
bool Read(Request& request);
 
  132  void Finish(
const Response& response);
 
  144              impl::RawReader<Request, Response>& stream);
 
  146  InputStream(InputStream&&) = 
delete;
 
  147  InputStream& operator=(InputStream&&) = 
delete;
 
  150  bool IsFinished() 
const override;
 
  153  enum class State { kOpen, kReadsDone, kFinished };
 
  155  impl::RawReader<Request, Response>& stream_;
 
  156  State state_{State::kOpen};
 
  167template <
typename Response>
 
  173  void Write(
const Response& response);
 
  202               impl::RawWriter<Response>& stream);
 
  204  OutputStream(OutputStream&&) = 
delete;
 
  205  OutputStream& operator=(OutputStream&&) = 
delete;
 
  208  bool IsFinished() 
const override;
 
  211  enum class State { kNew, kOpen, kFinished };
 
  213  impl::RawWriter<Response>& stream_;
 
  214  State state_{State::kNew};
 
  225template <
typename Request, 
typename Response>
 
  237  void Write(
const Response& response);
 
  266                      impl::RawReaderWriter<Request, Response>& stream);
 
  270  ~BidirectionalStream();
 
  272  bool IsFinished() 
const override;
 
  275  enum class State { kOpen, kReadsDone, kFinished };
 
  277  impl::RawReaderWriter<Request, Response>& stream_;
 
  278  State state_{State::kOpen};
 
  283template <
typename Response>
 
  284UnaryCall<Response>::
UnaryCall(impl::CallParams&& call_params,
 
  285                               impl::RawResponseWriter<Response>& stream)
 
  288template <
typename Response>
 
  289UnaryCall<Response>::~UnaryCall() {
 
  291    impl::CancelWithError(stream_, GetCallName());
 
  292    LogFinish(impl::kUnknownErrorStatus);
 
  296template <
typename Response>
 
  297void UnaryCall<Response>::
Finish(
const Response& response) {
 
  298  UINVARIANT(!is_finished_, 
"'Finish' called on a finished call");
 
  301  LogFinish(grpc::Status::OK);
 
  302  impl::Finish(stream_, response, grpc::Status::OK, GetCallName());
 
  303  Statistics().OnExplicitFinish(grpc::StatusCode::OK);
 
  304  ugrpc::impl::UpdateSpanWithStatus(GetSpan(), grpc::Status::OK);
 
  307template <
typename Response>
 
  309  UINVARIANT(!is_finished_, 
"'FinishWithError' called on a finished call");
 
  312  impl::FinishWithError(stream_, status, GetCallName());
 
  313  Statistics().OnExplicitFinish(status.error_code());
 
  314  ugrpc::impl::UpdateSpanWithStatus(GetSpan(), status);
 
  317template <
typename Response>
 
  318bool UnaryCall<Response>::IsFinished() 
const {
 
  322template <
typename Request, 
typename Response>
 
  324    impl::CallParams&& call_params, impl::RawReader<Request, Response>& stream)
 
  327template <
typename Request, 
typename Response>
 
  328InputStream<Request, Response>::~InputStream() {
 
  329  if (state_ != State::kFinished) {
 
  330    impl::CancelWithError(stream_, GetCallName());
 
  331    LogFinish(impl::kUnknownErrorStatus);
 
  335template <
typename Request, 
typename Response>
 
  336bool InputStream<Request, Response>::
Read(Request& request) {
 
  338             "'Read' called while the stream is half-closed for reads");
 
  339  if (impl::Read(stream_, request)) {
 
  342    state_ = State::kReadsDone;
 
  347template <
typename Request, 
typename Response>
 
  348void InputStream<Request, Response>::
Finish(
const Response& response) {
 
  350             "'Finish' called on a finished stream");
 
  351  state_ = State::kFinished;
 
  352  LogFinish(grpc::Status::OK);
 
  353  impl::Finish(stream_, response, grpc::Status::OK, GetCallName());
 
  354  Statistics().OnExplicitFinish(grpc::StatusCode::OK);
 
  355  ugrpc::impl::UpdateSpanWithStatus(GetSpan(), grpc::Status::OK);
 
  358template <
typename Request, 
typename Response>
 
  360    const grpc::Status& status) {
 
  363             "'FinishWithError' called on a finished stream");
 
  364  state_ = State::kFinished;
 
  366  impl::FinishWithError(stream_, status, GetCallName());
 
  367  Statistics().OnExplicitFinish(status.error_code());
 
  368  ugrpc::impl::UpdateSpanWithStatus(GetSpan(), status);
 
  371template <
typename Request, 
typename Response>
 
  372bool InputStream<Request, Response>::IsFinished() 
const {
 
  373  return state_ == State::kFinished;
 
  376template <
typename Response>
 
  378                                     impl::RawWriter<Response>& stream)
 
  381template <
typename Response>
 
  382OutputStream<Response>::~OutputStream() {
 
  383  if (state_ != State::kFinished) {
 
  384    impl::Cancel(stream_, GetCallName());
 
  385    LogFinish(impl::kUnknownErrorStatus);
 
  389template <
typename Response>
 
  390void OutputStream<Response>::
Write(
const Response& response) {
 
  391  UINVARIANT(state_ != State::kFinished, 
"'Write' called on a finished stream");
 
  395  impl::SendInitialMetadataIfNew(stream_, GetCallName(), state_);
 
  399  grpc::WriteOptions write_options{};
 
  401  impl::Write(stream_, response, write_options, GetCallName());
 
  404template <
typename Response>
 
  407             "'Finish' called on a finished stream");
 
  408  state_ = State::kFinished;
 
  409  const auto status = grpc::Status::OK;
 
  411  impl::Finish(stream_, status, GetCallName());
 
  412  Statistics().OnExplicitFinish(grpc::StatusCode::OK);
 
  413  ugrpc::impl::UpdateSpanWithStatus(GetSpan(), status);
 
  416template <
typename Response>
 
  420             "'Finish' called on a finished stream");
 
  421  state_ = State::kFinished;
 
  423  impl::Finish(stream_, status, GetCallName());
 
  424  Statistics().OnExplicitFinish(status.error_code());
 
  425  ugrpc::impl::UpdateSpanWithStatus(GetSpan(), status);
 
  428template <
typename Response>
 
  431             "'WriteAndFinish' called on a finished stream");
 
  432  state_ = State::kFinished;
 
  436  grpc::WriteOptions write_options{};
 
  438  const auto status = grpc::Status::OK;
 
  440  impl::WriteAndFinish(stream_, response, write_options, status, GetCallName());
 
  443template <
typename Response>
 
  444bool OutputStream<Response>::IsFinished() 
const {
 
  445  return state_ == State::kFinished;
 
  448template <
typename Request, 
typename Response>
 
  450    impl::CallParams&& call_params,
 
  451    impl::RawReaderWriter<Request, Response>& stream)
 
  454template <
typename Request, 
typename Response>
 
  456  if (state_ != State::kFinished) {
 
  457    impl::Cancel(stream_, GetCallName());
 
  458    LogFinish(impl::kUnknownErrorStatus);
 
  462template <
typename Request, 
typename Response>
 
  465             "'Read' called while the stream is half-closed for reads");
 
  466  if (impl::Read(stream_, request)) {
 
  469    state_ = State::kReadsDone;
 
  474template <
typename Request, 
typename Response>
 
  476  UINVARIANT(state_ != State::kFinished, 
"'Write' called on a finished stream");
 
  479  grpc::WriteOptions write_options{};
 
  481  impl::Write(stream_, response, write_options, GetCallName());
 
  484template <
typename Request, 
typename Response>
 
  487             "'Finish' called on a finished stream");
 
  488  state_ = State::kFinished;
 
  489  const auto status = grpc::Status::OK;
 
  491  impl::Finish(stream_, status, GetCallName());
 
  492  Statistics().OnExplicitFinish(grpc::StatusCode::OK);
 
  493  ugrpc::impl::UpdateSpanWithStatus(GetSpan(), status);
 
  496template <
typename Request, 
typename Response>
 
  498    const grpc::Status& status) {
 
  501             "'FinishWithError' called on a finished stream");
 
  502  state_ = State::kFinished;
 
  504  impl::Finish(stream_, status, GetCallName());
 
  505  Statistics().OnExplicitFinish(status.error_code());
 
  506  ugrpc::impl::UpdateSpanWithStatus(GetSpan(), status);
 
  509template <
typename Request, 
typename Response>
 
  511    const Response& response) {
 
  513             "'WriteAndFinish' called on a finished stream");
 
  514  state_ = State::kFinished;
 
  517  grpc::WriteOptions write_options{};
 
  519  const auto status = grpc::Status::OK;
 
  521  impl::WriteAndFinish(stream_, response, write_options, status, GetCallName());
 
  524template <
typename Request, 
typename Response>
 
  526  return state_ == State::kFinished;