11#include <grpcpp/completion_queue.h> 
   12#include <grpcpp/impl/service_type.h> 
   13#include <grpcpp/server_context.h> 
   15#include <userver/engine/async.hpp> 
   16#include <userver/engine/task/cancel.hpp> 
   17#include <userver/engine/task/task_processor_fwd.hpp> 
   18#include <userver/server/request/task_inherited_data.hpp> 
   19#include <userver/tracing/in_place_span.hpp> 
   20#include <userver/tracing/span.hpp> 
   21#include <userver/utils/assert.hpp> 
   22#include <userver/utils/fast_scope_guard.hpp> 
   23#include <userver/utils/impl/wait_token_storage.hpp> 
   24#include <userver/utils/lazy_prvalue.hpp> 
   25#include <userver/utils/statistics/entry.hpp> 
   27#include <userver/ugrpc/impl/static_metadata.hpp> 
   28#include <userver/ugrpc/impl/statistics.hpp> 
   29#include <userver/ugrpc/impl/statistics_scope.hpp> 
   30#include <userver/ugrpc/server/impl/async_method_invocation.hpp> 
   31#include <userver/ugrpc/server/impl/async_service.hpp> 
   32#include <userver/ugrpc/server/impl/call_params.hpp> 
   33#include <userver/ugrpc/server/impl/call_traits.hpp> 
   34#include <userver/ugrpc/server/impl/error_code.hpp> 
   35#include <userver/ugrpc/server/impl/service_worker.hpp> 
   36#include <userver/ugrpc/server/middlewares/base.hpp> 
   37#include <userver/ugrpc/server/rpc.hpp> 
   38#include <userver/ugrpc/server/service_base.hpp> 
   40USERVER_NAMESPACE_BEGIN
 
   42namespace ugrpc::
server::impl {
 
   44void ReportHandlerError(
const std::exception& ex, std::string_view call_name,
 
   48                        std::string_view call_name,
 
   51void ReportCustomError(
 
   55void SetupSpan(std::optional<
tracing::InPlaceSpan>& span_holder,
 
   56               grpc::ServerContext& context, std::string_view call_name);
 
   59template <
typename GrpcppService>
 
   60struct ServiceData 
final {
 
   61  ServiceData(
const ServiceSettings& settings,
 
   62              const ugrpc::impl::StaticServiceMetadata& metadata)
 
   65        statistics(settings.statistics_storage.GetServiceStatistics(metadata)) {
 
   68  ~ServiceData() = 
default;
 
   70  const ServiceSettings settings;
 
   71  const ugrpc::impl::StaticServiceMetadata metadata;
 
   72  AsyncService<GrpcppService> async_service{metadata.method_full_names.size()};
 
   73  utils::impl::WaitTokenStorage wait_tokens;
 
   74  ugrpc::impl::ServiceStatistics& statistics;
 
   78template <
typename GrpcppService, 
typename CallTraits>
 
   79struct MethodData 
final {
 
   80  ServiceData<GrpcppService>& service_data;
 
   81  const std::size_t method_id{};
 
   82  typename CallTraits::ServiceBase& service;
 
   83  const typename CallTraits::ServiceMethod service_method;
 
   85  std::string_view call_name{
 
   86      service_data.metadata.method_full_names[method_id]};
 
   88  std::string_view method_name{
 
   89      call_name.substr(service_data.metadata.service_full_name.size() + 1)};
 
   90  ugrpc::impl::MethodStatistics& statistics{
 
   91      service_data.statistics.GetMethodStatistics(method_id)};
 
   94template <
typename GrpcppService, 
typename CallTraits>
 
   97  explicit CallData(
const MethodData<GrpcppService, CallTraits>& method_data)
 
   98      : wait_token_(method_data.service_data.wait_tokens.GetToken()),
 
   99        method_data_(method_data) {
 
  100    UASSERT(method_data.method_id <
 
  101            method_data.service_data.metadata.method_full_names.size());
 
  104  void operator()() && {
 
  110    ugrpc::
server::impl::RpcFinishedEvent notify_when_done(
 
  111        engine::current_task::GetCancellationToken(), context_);
 
  113    context_.AsyncNotifyWhenDone(notify_when_done.GetTag());
 
  116    auto& queue = method_data_.service_data.settings.queue;
 
  117    method_data_.service_data.async_service.
template Prepare<CallTraits>(
 
  118        method_data_.method_id, context_, initial_request_, raw_responder_,
 
  119        queue, queue, prepare_.GetTag());
 
  125    if (Wait(prepare_) != impl::AsyncMethodInvocation::WaitStatus::kOk) {
 
  135    ListenAsync(method_data_);
 
  143    notify_when_done.Wait();
 
  146  static void ListenAsync(
const MethodData<GrpcppService, CallTraits>& data) {
 
  147    engine::CriticalAsyncNoSpan(
 
  148        data.service_data.settings.task_processor,
 
  149        utils::LazyPrvalue([&] { 
return CallData(data); }))
 
  154  using InitialRequest = 
typename CallTraits::InitialRequest;
 
  155  using RawCall = 
typename CallTraits::RawCall;
 
  156  using Call = 
typename CallTraits::Call;
 
  159    const auto call_name = method_data_.call_name;
 
  160    auto& service = method_data_.service;
 
  161    const auto service_method = method_data_.service_method;
 
  163    const auto& service_name =
 
  164        method_data_.service_data.metadata.service_full_name;
 
  165    const auto& method_name = method_data_.method_name;
 
  167    SetupSpan(span_, context_, call_name);
 
  168    utils::FastScopeGuard destroy_span([&]() 
noexcept { span_.reset(); });
 
  170    ugrpc::impl::RpcStatisticsScope statistics_scope(method_data_.statistics);
 
  172    auto& access_tskv_logger =
 
  173        method_data_.service_data.settings.access_tskv_logger;
 
  174    Call responder(CallParams{context_, call_name, statistics_scope,
 
  175                              *access_tskv_logger, span_->Get()},
 
  178      if constexpr (std::is_same_v<InitialRequest, NoInitialRequest>) {
 
  179        (service.*service_method)(responder);
 
  181        (service.*service_method)(responder, std::move(initial_request_));
 
  186      ::google::protobuf::Message* initial_request = 
nullptr;
 
  187      if constexpr (!std::is_same_v<InitialRequest, NoInitialRequest>) {
 
  188        initial_request = &initial_request_;
 
  191      auto& middlewares = method_data_.service_data.settings.middlewares;
 
  192      MiddlewareCallContext middleware_context(
 
  193          middlewares, responder, do_call, service_name, method_name,
 
  194          method_data_.service_data.settings.config_source.GetSnapshot(),
 
  199      ReportCustomError(ex, responder, span_->Get());
 
  201      ReportNetworkError(ex, call_name, span_->Get());
 
  202      statistics_scope.OnNetworkError();
 
  203    } 
catch (
const std::exception& ex) {
 
  204      ReportHandlerError(ex, call_name, span_->Get());
 
  210  const utils::impl::WaitTokenStorage::Token wait_token_;
 
  212  MethodData<GrpcppService, CallTraits> method_data_;
 
  214  grpc::ServerContext context_{};
 
  215  InitialRequest initial_request_{};
 
  216  RawCall raw_responder_{&context_};
 
  217  ugrpc::impl::AsyncMethodInvocation prepare_;
 
  218  std::optional<
tracing::InPlaceSpan> span_{};
 
  221template <
typename GrpcppService>
 
  222class ServiceWorkerImpl 
final : 
public ServiceWorker {
 
  224  template <
typename Service, 
typename... ServiceMethods>
 
  225  ServiceWorkerImpl(ServiceSettings&& settings,
 
  226                    ugrpc::impl::StaticServiceMetadata&& metadata,
 
  227                    Service& service, ServiceMethods... service_methods)
 
  228      : service_data_(settings, metadata),
 
  229        start_{[
this, &service, service_methods...] {
 
  230          std::size_t method_id = 0;
 
  231          (CallData<GrpcppService, CallTraits<ServiceMethods>>::ListenAsync(
 
  232               {service_data_, method_id++, service, service_methods}),
 
  236  ~ServiceWorkerImpl() 
override {
 
  237    service_data_.wait_tokens.WaitForAllTokens();
 
  240  grpc::Service& GetService() 
override { 
return service_data_.async_service; }
 
  242  const ugrpc::impl::StaticServiceMetadata& GetMetadata() 
const override {
 
  243    return service_data_.metadata;
 
  246  void Start() 
override { start_(); }
 
  249  ServiceData<GrpcppService> service_data_;
 
  250  std::function<
void()> start_;
 
  254template <
typename GrpcppService, 
typename Service, 
typename... ServiceMethods>
 
  255std::unique_ptr<ServiceWorker> MakeServiceWorker(
 
  256    ServiceSettings&& settings,
 
  257    const std::string_view (&method_full_names)[
sizeof...(ServiceMethods)],
 
  258    Service& service, ServiceMethods... service_methods) {
 
  259  return std::make_unique<ServiceWorkerImpl<GrpcppService>>(
 
  261      ugrpc::impl::MakeStaticServiceMetadata<GrpcppService>(method_full_names),
 
  262      service, service_methods...);