7#include <grpcpp/impl/codegen/async_stream.h> 
    8#include <grpcpp/impl/codegen/async_unary_call.h> 
    9#include <grpcpp/impl/codegen/status.h> 
   11#include <userver/ugrpc/server/exceptions.hpp> 
   12#include <userver/ugrpc/server/impl/async_method_invocation.hpp> 
   14USERVER_NAMESPACE_BEGIN
 
   16namespace ugrpc::
server::impl {
 
   22template <
typename Response>
 
   23using RawResponseWriter = grpc::ServerAsyncResponseWriter<Response>;
 
   25template <
typename Request, 
typename Response>
 
   26using RawReader = grpc::ServerAsyncReader<Response, Request>;
 
   28template <
typename Response>
 
   29using RawWriter = grpc::ServerAsyncWriter<Response>;
 
   31template <
typename Request, 
typename Response>
 
   32using RawReaderWriter = grpc::ServerAsyncReaderWriter<Response, Request>;
 
   35using ugrpc::impl::AsyncMethodInvocation;
 
   37void ReportErrorWhileCancelling(std::string_view call_name) 
noexcept;
 
   39void ThrowOnError(impl::AsyncMethodInvocation::WaitStatus status,
 
   40                  std::string_view call_name, std::string_view stage_name);
 
   42extern const grpc::Status kUnimplementedStatus;
 
   43extern const grpc::Status kUnknownErrorStatus;
 
   45template <
typename GrpcStream, 
typename Response>
 
   46void Finish(GrpcStream& stream, 
const Response& response,
 
   47            const grpc::Status& status, std::string_view call_name) {
 
   48  AsyncMethodInvocation finish;
 
   49  stream.Finish(response, status, finish.GetTag());
 
   50  ThrowOnError(Wait(finish), call_name, 
"Finish");
 
   53template <
typename GrpcStream>
 
   54void Finish(GrpcStream& stream, 
const grpc::Status& status,
 
   55            std::string_view call_name) {
 
   56  AsyncMethodInvocation finish;
 
   57  stream.Finish(status, finish.GetTag());
 
   58  ThrowOnError(Wait(finish), call_name, 
"Finish");
 
   61template <
typename GrpcStream>
 
   62void Cancel(GrpcStream& stream, std::string_view call_name) 
noexcept {
 
   63  AsyncMethodInvocation cancel;
 
   64  stream.Finish(kUnknownErrorStatus, cancel.GetTag());
 
   65  if (Wait(cancel) != impl::AsyncMethodInvocation::WaitStatus::kOk)
 
   66    ReportErrorWhileCancelling(call_name);
 
   69template <
typename GrpcStream>
 
   70void CancelWithError(GrpcStream& stream, std::string_view call_name) 
noexcept {
 
   71  AsyncMethodInvocation cancel;
 
   72  stream.FinishWithError(kUnknownErrorStatus, cancel.GetTag());
 
   73  if (Wait(cancel) != impl::AsyncMethodInvocation::WaitStatus::kOk)
 
   74    ReportErrorWhileCancelling(call_name);
 
   77template <
typename GrpcStream>
 
   78void FinishWithError(GrpcStream& stream, 
const grpc::Status& status,
 
   79                     std::string_view call_name) {
 
   80  AsyncMethodInvocation finish;
 
   81  stream.FinishWithError(status, finish.GetTag());
 
   82  ThrowOnError(Wait(finish), call_name, 
"FinishWithError");
 
   85template <
typename GrpcStream>
 
   86void SendInitialMetadata(GrpcStream& stream, std::string_view call_name) {
 
   87  AsyncMethodInvocation metadata;
 
   88  stream.SendInitialMetadata(metadata.GetTag());
 
   89  ThrowOnError(Wait(metadata), call_name, 
"SendInitialMetadata");
 
   92template <
typename GrpcStream, 
typename Request>
 
   93bool Read(GrpcStream& stream, Request& request) {
 
   94  AsyncMethodInvocation read;
 
   95  stream.Read(&request, read.GetTag());
 
   96  return Wait(read) == impl::AsyncMethodInvocation::WaitStatus::kOk;
 
   99template <
typename GrpcStream, 
typename Response>
 
  100void Write(GrpcStream& stream, 
const Response& response,
 
  101           grpc::WriteOptions options, std::string_view call_name) {
 
  102  AsyncMethodInvocation write;
 
  103  stream.Write(response, options, write.GetTag());
 
  104  ThrowOnError(Wait(write), call_name, 
"Write");
 
  107template <
typename GrpcStream, 
typename Response>
 
  108void WriteAndFinish(GrpcStream& stream, 
const Response& response,
 
  109                    grpc::WriteOptions options, 
const grpc::Status& status,
 
  110                    std::string_view call_name) {
 
  111  AsyncMethodInvocation write_and_finish;
 
  112  stream.WriteAndFinish(response, options, status, write_and_finish.GetTag());
 
  113  ThrowOnError(Wait(write_and_finish), call_name, 
"WriteAndFinish");
 
  116template <
typename GrpcStream, 
typename State>
 
  117void SendInitialMetadataIfNew(GrpcStream& stream, std::string_view call_name,
 
  119  if (state == State::kNew) {
 
  120    state = State::kOpen;
 
  121    SendInitialMetadata(stream, call_name);