11#include <grpcpp/completion_queue.h> 
   12#include <grpcpp/impl/service_type.h> 
   13#include <grpcpp/server_context.h> 
   15#include <userver/engine/async.hpp> 
   16#include <userver/engine/task/cancel.hpp> 
   17#include <userver/engine/task/task_processor_fwd.hpp> 
   18#include <userver/server/request/task_inherited_data.hpp> 
   19#include <userver/tracing/in_place_span.hpp> 
   20#include <userver/tracing/span.hpp> 
   21#include <userver/utils/assert.hpp> 
   22#include <userver/utils/fast_scope_guard.hpp> 
   23#include <userver/utils/impl/wait_token_storage.hpp> 
   24#include <userver/utils/lazy_prvalue.hpp> 
   25#include <userver/utils/statistics/entry.hpp> 
   27#include <userver/ugrpc/impl/static_metadata.hpp> 
   28#include <userver/ugrpc/impl/statistics.hpp> 
   29#include <userver/ugrpc/impl/statistics_scope.hpp> 
   30#include <userver/ugrpc/server/impl/async_method_invocation.hpp> 
   31#include <userver/ugrpc/server/impl/async_service.hpp> 
   32#include <userver/ugrpc/server/impl/call_params.hpp> 
   33#include <userver/ugrpc/server/impl/call_traits.hpp> 
   34#include <userver/ugrpc/server/impl/error_code.hpp> 
   35#include <userver/ugrpc/server/impl/service_worker.hpp> 
   36#include <userver/ugrpc/server/middlewares/base.hpp> 
   37#include <userver/ugrpc/server/rpc.hpp> 
   38#include <userver/ugrpc/server/service_base.hpp> 
   40USERVER_NAMESPACE_BEGIN
 
   42namespace ugrpc::
server::impl {
 
   44void ReportHandlerError(
 
   45    const std::exception& ex, std::string_view call_name, 
tracing::Span& span,
 
   46    ugrpc::impl::RpcStatisticsScope& statistics_scope) 
noexcept;
 
   48void ReportNetworkError(
 
   51    ugrpc::impl::RpcStatisticsScope& statistics_scope) 
noexcept;
 
   53void ReportCustomError(
 
   57void SetupSpan(std::optional<
tracing::InPlaceSpan>& span_holder,
 
   58               grpc::ServerContext& context, std::string_view call_name);
 
   61template <
typename GrpcppService>
 
   62struct ServiceData 
final {
 
   63  ServiceData(
const ServiceSettings& settings,
 
   64              const ugrpc::impl::StaticServiceMetadata& metadata)
 
   67        statistics(settings.statistics_storage.GetServiceStatistics(metadata)) {
 
   70  ~ServiceData() = 
default;
 
   72  const ServiceSettings settings;
 
   73  const ugrpc::impl::StaticServiceMetadata metadata;
 
   74  AsyncService<GrpcppService> async_service{metadata.method_full_names.size()};
 
   75  utils::impl::WaitTokenStorage wait_tokens;
 
   76  ugrpc::impl::ServiceStatistics& statistics;
 
   80template <
typename GrpcppService, 
typename CallTraits>
 
   81struct MethodData 
final {
 
   82  ServiceData<GrpcppService>& service_data;
 
   84  const std::size_t method_id{};
 
   85  typename CallTraits::ServiceBase& service;
 
   86  const typename CallTraits::ServiceMethod service_method;
 
   88  std::string_view call_name{
 
   89      service_data.metadata.method_full_names[method_id]};
 
   91  std::string_view method_name{
 
   92      call_name.substr(service_data.metadata.service_full_name.size() + 1)};
 
   93  ugrpc::impl::MethodStatistics& statistics{
 
   94      service_data.statistics.GetMethodStatistics(method_id)};
 
   97template <
typename GrpcppService, 
typename CallTraits>
 
  100  explicit CallData(
const MethodData<GrpcppService, CallTraits>& method_data)
 
  101      : wait_token_(method_data.service_data.wait_tokens.GetToken()),
 
  102        method_data_(method_data) {
 
  103    UASSERT(method_data.method_id <
 
  104            method_data.service_data.metadata.method_full_names.size());
 
  107  void operator()() && {
 
  113    ugrpc::
server::impl::RpcFinishedEvent notify_when_done(
 
  114        engine::current_task::GetCancellationToken(), context_);
 
  116    context_.AsyncNotifyWhenDone(notify_when_done.GetTag());
 
  119    auto& queue = method_data_.service_data.settings.queue.GetQueue(
 
  120        method_data_.queue_num);
 
  122    method_data_.service_data.async_service.
template Prepare<CallTraits>(
 
  123        method_data_.method_id, context_, initial_request_, raw_responder_,
 
  124        queue, queue, prepare_.GetTag());
 
  130    if (Wait(prepare_) != impl::AsyncMethodInvocation::WaitStatus::kOk) {
 
  140    ListenAsync(method_data_);
 
  148    notify_when_done.Wait();
 
  151  static void ListenAsync(
const MethodData<GrpcppService, CallTraits>& data) {
 
  152    engine::CriticalAsyncNoSpan(
 
  153        data.service_data.settings.task_processor,
 
  154        utils::LazyPrvalue([&] { 
return CallData(data); }))
 
  159  using InitialRequest = 
typename CallTraits::InitialRequest;
 
  160  using RawCall = 
typename CallTraits::RawCall;
 
  161  using Call = 
typename CallTraits::Call;
 
  164    const auto call_name = method_data_.call_name;
 
  165    auto& service = method_data_.service;
 
  166    const auto service_method = method_data_.service_method;
 
  168    const auto& service_name =
 
  169        method_data_.service_data.metadata.service_full_name;
 
  170    const auto& method_name = method_data_.method_name;
 
  172    SetupSpan(span_, context_, call_name);
 
  173    utils::FastScopeGuard destroy_span([&]() 
noexcept { span_.reset(); });
 
  175    ugrpc::impl::RpcStatisticsScope statistics_scope(method_data_.statistics);
 
  177    auto& access_tskv_logger =
 
  178        method_data_.service_data.settings.access_tskv_logger;
 
  181        CallParams{context_, call_name, statistics_scope, *access_tskv_logger,
 
  182                   span_->Get(), storage_context},
 
  185      if constexpr (std::is_same_v<InitialRequest, NoInitialRequest>) {
 
  186        (service.*service_method)(responder);
 
  188        (service.*service_method)(responder, std::move(initial_request_));
 
  193      ::google::protobuf::Message* initial_request = 
nullptr;
 
  194      if constexpr (!std::is_same_v<InitialRequest, NoInitialRequest>) {
 
  195        initial_request = &initial_request_;
 
  198      auto& middlewares = method_data_.service_data.settings.middlewares;
 
  199      MiddlewareCallContext middleware_context(
 
  200          middlewares, responder, do_call, service_name, method_name,
 
  201          method_data_.service_data.settings.config_source.GetSnapshot(),
 
  206      ReportCustomError(ex, responder, span_->Get());
 
  208      ReportNetworkError(ex, call_name, span_->Get(), statistics_scope);
 
  209    } 
catch (
const std::exception& ex) {
 
  210      ReportHandlerError(ex, call_name, span_->Get(), statistics_scope);
 
  216  const utils::impl::WaitTokenStorage::Token wait_token_;
 
  218  MethodData<GrpcppService, CallTraits> method_data_;
 
  220  grpc::ServerContext context_{};
 
  221  InitialRequest initial_request_{};
 
  222  RawCall raw_responder_{&context_};
 
  223  ugrpc::impl::AsyncMethodInvocation prepare_;
 
  224  std::optional<
tracing::InPlaceSpan> span_{};
 
  227template <
typename GrpcppService>
 
  228class ServiceWorkerImpl 
final : 
public ServiceWorker {
 
  230  template <
typename Service, 
typename... ServiceMethods>
 
  231  ServiceWorkerImpl(ServiceSettings&& settings,
 
  232                    ugrpc::impl::StaticServiceMetadata&& metadata,
 
  233                    Service& service, ServiceMethods... service_methods)
 
  234      : service_data_(settings, metadata),
 
  235        start_{[
this, &service, service_methods...] {
 
  236          for (size_t i = 0; i < service_data_.settings.queue.GetSize(); i++) {
 
  237            std::size_t method_id = 0;
 
  238            (CallData<GrpcppService, CallTraits<ServiceMethods>>::ListenAsync(
 
  239                 {service_data_, 
static_cast<
int>(i), method_id++, service,
 
  245  ~ServiceWorkerImpl() 
override {
 
  246    service_data_.wait_tokens.WaitForAllTokens();
 
  249  grpc::Service& GetService() 
override { 
return service_data_.async_service; }
 
  251  const ugrpc::impl::StaticServiceMetadata& GetMetadata() 
const override {
 
  252    return service_data_.metadata;
 
  255  void Start() 
override { start_(); }
 
  258  ServiceData<GrpcppService> service_data_;
 
  259  std::function<
void()> start_;
 
  263template <
typename GrpcppService, 
typename Service, 
typename... ServiceMethods>
 
  264std::unique_ptr<ServiceWorker> MakeServiceWorker(
 
  265    ServiceSettings&& settings,
 
  266    const std::string_view (&method_full_names)[
sizeof...(ServiceMethods)],
 
  267    Service& service, ServiceMethods... service_methods) {
 
  268  return std::make_unique<ServiceWorkerImpl<GrpcppService>>(
 
  270      ugrpc::impl::MakeStaticServiceMetadata<GrpcppService>(method_full_names),
 
  271      service, service_methods...);