6#include <grpcpp/impl/codegen/proto_utils.h> 
    7#include <grpcpp/server_context.h> 
    9#include <userver/utils/assert.hpp> 
   11#include <userver/ugrpc/impl/deadline_timepoint.hpp> 
   12#include <userver/ugrpc/impl/internal_tag_fwd.hpp> 
   13#include <userver/ugrpc/impl/span.hpp> 
   14#include <userver/ugrpc/impl/statistics_scope.hpp> 
   15#include <userver/ugrpc/server/exceptions.hpp> 
   16#include <userver/ugrpc/server/impl/async_methods.hpp> 
   17#include <userver/ugrpc/server/impl/call_params.hpp> 
   19USERVER_NAMESPACE_BEGIN
 
   25std::string FormatLogMessage(
 
   26    const std::multimap<grpc::string_ref, grpc::string_ref>& metadata,
 
   27    std::string_view peer, std::chrono::system_clock::time_point start_time,
 
   28    std::string_view call_name, grpc::StatusCode code);
 
   35  CallAnyBase(impl::CallParams&& params) : params_(std::move(params)) {}
 
   48  grpc::ServerContext& 
GetContext() { 
return params_.context; }
 
   53  tracing::Span& GetSpan() { 
return params_.call_span; }
 
   78    return params_.storage_context;
 
   81  virtual bool IsFinished() 
const = 0;
 
   85  ugrpc::impl::RpcStatisticsScope& Statistics(ugrpc::impl::InternalTag);
 
   89  ugrpc::impl::RpcStatisticsScope& Statistics() { 
return params_.statistics; }
 
   91  logging::LoggerRef AccessTskvLogger() { 
return params_.access_tskv_logger; }
 
   93  void LogFinish(grpc::Status status) 
const;
 
   96  impl::CallParams params_;
 
  102template <
typename Response>
 
  111  void Finish(
const Response& response);
 
  122  UnaryCall(impl::CallParams&& call_params,
 
  123            impl::RawResponseWriter<Response>& stream);
 
  125  UnaryCall(UnaryCall&&) = 
delete;
 
  126  UnaryCall& operator=(UnaryCall&&) = 
delete;
 
  129  bool IsFinished() 
const override;
 
  132  impl::RawResponseWriter<Response>& stream_;
 
  133  bool is_finished_{
false};
 
  144template <
typename Request, 
typename Response>
 
  150  [[nodiscard]] 
bool Read(Request& request);
 
  158  void Finish(
const Response& response);
 
  170              impl::RawReader<Request, Response>& stream);
 
  172  InputStream(InputStream&&) = 
delete;
 
  173  InputStream& operator=(InputStream&&) = 
delete;
 
  176  bool IsFinished() 
const override;
 
  179  enum class State { kOpen, kReadsDone, kFinished };
 
  181  impl::RawReader<Request, Response>& stream_;
 
  182  State state_{State::kOpen};
 
  193template <
typename Response>
 
  199  void Write(
const Response& response);
 
  228               impl::RawWriter<Response>& stream);
 
  230  OutputStream(OutputStream&&) = 
delete;
 
  231  OutputStream& operator=(OutputStream&&) = 
delete;
 
  234  bool IsFinished() 
const override;
 
  237  enum class State { kNew, kOpen, kFinished };
 
  239  impl::RawWriter<Response>& stream_;
 
  240  State state_{State::kNew};
 
  255template <
typename Request, 
typename Response>
 
  267  void Write(
const Response& response);
 
  296                      impl::RawReaderWriter<Request, Response>& stream);
 
  300  ~BidirectionalStream();
 
  302  bool IsFinished() 
const override;
 
  305  impl::RawReaderWriter<Request, Response>& stream_;
 
  306  bool are_reads_done_{
false};
 
  307  bool is_finished_{
false};
 
  312template <
typename Response>
 
  313UnaryCall<Response>::
UnaryCall(impl::CallParams&& call_params,
 
  314                               impl::RawResponseWriter<Response>& stream)
 
  317template <
typename Response>
 
  318UnaryCall<Response>::~UnaryCall() {
 
  320    impl::CancelWithError(stream_, GetCallName());
 
  321    LogFinish(impl::kUnknownErrorStatus);
 
  325template <
typename Response>
 
  326void UnaryCall<Response>::
Finish(
const Response& response) {
 
  327  UINVARIANT(!is_finished_, 
"'Finish' called on a finished call");
 
  330  LogFinish(grpc::Status::OK);
 
  331  impl::Finish(stream_, response, grpc::Status::OK, GetCallName());
 
  332  Statistics().OnExplicitFinish(grpc::StatusCode::OK);
 
  333  ugrpc::impl::UpdateSpanWithStatus(GetSpan(), grpc::Status::OK);
 
  336template <
typename Response>
 
  338  UINVARIANT(!is_finished_, 
"'FinishWithError' called on a finished call");
 
  341  impl::FinishWithError(stream_, status, GetCallName());
 
  342  Statistics().OnExplicitFinish(status.error_code());
 
  343  ugrpc::impl::UpdateSpanWithStatus(GetSpan(), status);
 
  346template <
typename Response>
 
  347bool UnaryCall<Response>::IsFinished() 
const {
 
  351template <
typename Request, 
typename Response>
 
  353    impl::CallParams&& call_params, impl::RawReader<Request, Response>& stream)
 
  356template <
typename Request, 
typename Response>
 
  357InputStream<Request, Response>::~InputStream() {
 
  358  if (state_ != State::kFinished) {
 
  359    impl::CancelWithError(stream_, GetCallName());
 
  360    LogFinish(impl::kUnknownErrorStatus);
 
  364template <
typename Request, 
typename Response>
 
  365bool InputStream<Request, Response>::
Read(Request& request) {
 
  367             "'Read' called while the stream is half-closed for reads");
 
  368  if (impl::Read(stream_, request)) {
 
  371    state_ = State::kReadsDone;
 
  376template <
typename Request, 
typename Response>
 
  377void InputStream<Request, Response>::
Finish(
const Response& response) {
 
  379             "'Finish' called on a finished stream");
 
  380  state_ = State::kFinished;
 
  381  LogFinish(grpc::Status::OK);
 
  382  impl::Finish(stream_, response, grpc::Status::OK, GetCallName());
 
  383  Statistics().OnExplicitFinish(grpc::StatusCode::OK);
 
  384  ugrpc::impl::UpdateSpanWithStatus(GetSpan(), grpc::Status::OK);
 
  387template <
typename Request, 
typename Response>
 
  389    const grpc::Status& status) {
 
  392             "'FinishWithError' called on a finished stream");
 
  393  state_ = State::kFinished;
 
  395  impl::FinishWithError(stream_, status, GetCallName());
 
  396  Statistics().OnExplicitFinish(status.error_code());
 
  397  ugrpc::impl::UpdateSpanWithStatus(GetSpan(), status);
 
  400template <
typename Request, 
typename Response>
 
  401bool InputStream<Request, Response>::IsFinished() 
const {
 
  402  return state_ == State::kFinished;
 
  405template <
typename Response>
 
  407                                     impl::RawWriter<Response>& stream)
 
  410template <
typename Response>
 
  411OutputStream<Response>::~OutputStream() {
 
  412  if (state_ != State::kFinished) {
 
  413    impl::Cancel(stream_, GetCallName());
 
  414    LogFinish(impl::kUnknownErrorStatus);
 
  418template <
typename Response>
 
  419void OutputStream<Response>::
Write(
const Response& response) {
 
  420  UINVARIANT(state_ != State::kFinished, 
"'Write' called on a finished stream");
 
  424  impl::SendInitialMetadataIfNew(stream_, GetCallName(), state_);
 
  428  grpc::WriteOptions write_options{};
 
  430  impl::Write(stream_, response, write_options, GetCallName());
 
  433template <
typename Response>
 
  436             "'Finish' called on a finished stream");
 
  437  state_ = State::kFinished;
 
  438  const auto status = grpc::Status::OK;
 
  440  impl::Finish(stream_, status, GetCallName());
 
  441  Statistics().OnExplicitFinish(grpc::StatusCode::OK);
 
  442  ugrpc::impl::UpdateSpanWithStatus(GetSpan(), status);
 
  445template <
typename Response>
 
  449             "'Finish' called on a finished stream");
 
  450  state_ = State::kFinished;
 
  452  impl::Finish(stream_, status, GetCallName());
 
  453  Statistics().OnExplicitFinish(status.error_code());
 
  454  ugrpc::impl::UpdateSpanWithStatus(GetSpan(), status);
 
  457template <
typename Response>
 
  460             "'WriteAndFinish' called on a finished stream");
 
  461  state_ = State::kFinished;
 
  465  grpc::WriteOptions write_options{};
 
  467  const auto status = grpc::Status::OK;
 
  469  impl::WriteAndFinish(stream_, response, write_options, status, GetCallName());
 
  472template <
typename Response>
 
  473bool OutputStream<Response>::IsFinished() 
const {
 
  474  return state_ == State::kFinished;
 
  477template <
typename Request, 
typename Response>
 
  479    impl::CallParams&& call_params,
 
  480    impl::RawReaderWriter<Request, Response>& stream)
 
  483template <
typename Request, 
typename Response>
 
  486    impl::Cancel(stream_, GetCallName());
 
  487    LogFinish(impl::kUnknownErrorStatus);
 
  491template <
typename Request, 
typename Response>
 
  494             "'Read' called while the stream is half-closed for reads");
 
  495  if (impl::Read(stream_, request)) {
 
  498    are_reads_done_ = 
true;
 
  503template <
typename Request, 
typename Response>
 
  505  UINVARIANT(!is_finished_, 
"'Write' called on a finished stream");
 
  508  grpc::WriteOptions write_options{};
 
  511    impl::Write(stream_, response, write_options, GetCallName());
 
  518template <
typename Request, 
typename Response>
 
  520  UINVARIANT(!is_finished_, 
"'Finish' called on a finished stream");
 
  522  const auto status = grpc::Status::OK;
 
  524  impl::Finish(stream_, status, GetCallName());
 
  525  Statistics().OnExplicitFinish(grpc::StatusCode::OK);
 
  526  ugrpc::impl::UpdateSpanWithStatus(GetSpan(), status);
 
  529template <
typename Request, 
typename Response>
 
  531    const grpc::Status& status) {
 
  533  UINVARIANT(!is_finished_, 
"'FinishWithError' called on a finished stream");
 
  536  impl::Finish(stream_, status, GetCallName());
 
  537  Statistics().OnExplicitFinish(status.error_code());
 
  538  ugrpc::impl::UpdateSpanWithStatus(GetSpan(), status);
 
  541template <
typename Request, 
typename Response>
 
  543    const Response& response) {
 
  544  UINVARIANT(!is_finished_, 
"'WriteAndFinish' called on a finished stream");
 
  548  grpc::WriteOptions write_options{};
 
  550  const auto status = grpc::Status::OK;
 
  552  impl::WriteAndFinish(stream_, response, write_options, status, GetCallName());
 
  555template <
typename Request, 
typename Response>