11#include <grpcpp/completion_queue.h>
12#include <grpcpp/impl/service_type.h>
13#include <grpcpp/server_context.h>
15#include <userver/engine/async.hpp>
16#include <userver/engine/task/cancel.hpp>
17#include <userver/engine/task/task_processor_fwd.hpp>
18#include <userver/server/request/task_inherited_data.hpp>
19#include <userver/tracing/in_place_span.hpp>
20#include <userver/tracing/span.hpp>
21#include <userver/utils/assert.hpp>
22#include <userver/utils/fast_scope_guard.hpp>
23#include <userver/utils/impl/wait_token_storage.hpp>
24#include <userver/utils/lazy_prvalue.hpp>
25#include <userver/utils/statistics/entry.hpp>
27#include <userver/ugrpc/impl/static_metadata.hpp>
28#include <userver/ugrpc/impl/statistics.hpp>
29#include <userver/ugrpc/impl/statistics_scope.hpp>
30#include <userver/ugrpc/server/impl/async_method_invocation.hpp>
31#include <userver/ugrpc/server/impl/async_service.hpp>
32#include <userver/ugrpc/server/impl/call_params.hpp>
33#include <userver/ugrpc/server/impl/call_traits.hpp>
34#include <userver/ugrpc/server/impl/error_code.hpp>
35#include <userver/ugrpc/server/impl/service_worker.hpp>
36#include <userver/ugrpc/server/middlewares/base.hpp>
37#include <userver/ugrpc/server/rpc.hpp>
38#include <userver/ugrpc/server/service_base.hpp>
40USERVER_NAMESPACE_BEGIN
42namespace ugrpc::
server::impl {
44void ReportHandlerError(
45 const std::exception& ex, std::string_view call_name,
tracing::Span& span,
46 ugrpc::impl::RpcStatisticsScope& statistics_scope)
noexcept;
48void ReportNetworkError(
51 ugrpc::impl::RpcStatisticsScope& statistics_scope)
noexcept;
53void ReportCustomError(
57void SetupSpan(std::optional<
tracing::InPlaceSpan>& span_holder,
58 grpc::ServerContext& context, std::string_view call_name);
61template <
typename GrpcppService>
62struct ServiceData
final {
63 ServiceData(
const ServiceSettings& settings,
64 const ugrpc::impl::StaticServiceMetadata& metadata)
67 statistics(settings.statistics_storage.GetServiceStatistics(metadata)) {
70 ~ServiceData() =
default;
72 const ServiceSettings settings;
73 const ugrpc::impl::StaticServiceMetadata metadata;
74 AsyncService<GrpcppService> async_service{metadata.method_full_names.size()};
75 utils::impl::WaitTokenStorage wait_tokens;
76 ugrpc::impl::ServiceStatistics& statistics;
80template <
typename GrpcppService,
typename CallTraits>
81struct MethodData
final {
82 ServiceData<GrpcppService>& service_data;
84 const std::size_t method_id{};
85 typename CallTraits::ServiceBase& service;
86 const typename CallTraits::ServiceMethod service_method;
88 std::string_view call_name{
89 service_data.metadata.method_full_names[method_id]};
91 std::string_view method_name{
92 call_name.substr(service_data.metadata.service_full_name.size() + 1)};
93 ugrpc::impl::MethodStatistics& statistics{
94 service_data.statistics.GetMethodStatistics(method_id)};
97template <
typename GrpcppService,
typename CallTraits>
100 explicit CallData(
const MethodData<GrpcppService, CallTraits>& method_data)
101 : wait_token_(method_data.service_data.wait_tokens.GetToken()),
102 method_data_(method_data) {
103 UASSERT(method_data.method_id <
104 method_data.service_data.metadata.method_full_names.size());
107 void operator()() && {
113 ugrpc::
server::impl::RpcFinishedEvent notify_when_done(
114 engine::current_task::GetCancellationToken(), context_);
116 context_.AsyncNotifyWhenDone(notify_when_done.GetTag());
119 auto& queue = method_data_.service_data.settings.queue.GetQueue(
120 method_data_.queue_num);
122 method_data_.service_data.async_service.
template Prepare<CallTraits>(
123 method_data_.method_id, context_, initial_request_, raw_responder_,
124 queue, queue, prepare_.GetTag());
130 if (Wait(prepare_) != impl::AsyncMethodInvocation::WaitStatus::kOk) {
140 ListenAsync(method_data_);
148 notify_when_done.Wait();
151 static void ListenAsync(
const MethodData<GrpcppService, CallTraits>& data) {
152 engine::CriticalAsyncNoSpan(
153 data.service_data.settings.task_processor,
154 utils::LazyPrvalue([&] {
return CallData(data); }))
159 using InitialRequest =
typename CallTraits::InitialRequest;
160 using RawCall =
typename CallTraits::RawCall;
161 using Call =
typename CallTraits::Call;
164 const auto call_name = method_data_.call_name;
165 auto& service = method_data_.service;
166 const auto service_method = method_data_.service_method;
168 const auto& service_name =
169 method_data_.service_data.metadata.service_full_name;
170 const auto& method_name = method_data_.method_name;
172 SetupSpan(span_, context_, call_name);
173 utils::FastScopeGuard destroy_span([&]()
noexcept { span_.reset(); });
175 ugrpc::impl::RpcStatisticsScope statistics_scope(method_data_.statistics);
177 auto& access_tskv_logger =
178 method_data_.service_data.settings.access_tskv_logger;
181 CallParams{context_, call_name, statistics_scope, *access_tskv_logger,
182 span_->Get(), storage_context},
185 if constexpr (std::is_same_v<InitialRequest, NoInitialRequest>) {
186 (service.*service_method)(responder);
188 (service.*service_method)(responder, std::move(initial_request_));
193 ::google::protobuf::Message* initial_request =
nullptr;
194 if constexpr (!std::is_same_v<InitialRequest, NoInitialRequest>) {
195 initial_request = &initial_request_;
198 auto& middlewares = method_data_.service_data.settings.middlewares;
199 MiddlewareCallContext middleware_context(
200 middlewares, responder, do_call, service_name, method_name,
201 method_data_.service_data.settings.config_source.GetSnapshot(),
206 ReportCustomError(ex, responder, span_->Get());
208 ReportNetworkError(ex, call_name, span_->Get(), statistics_scope);
209 }
catch (
const std::exception& ex) {
210 ReportHandlerError(ex, call_name, span_->Get(), statistics_scope);
216 const utils::impl::WaitTokenStorage::Token wait_token_;
218 MethodData<GrpcppService, CallTraits> method_data_;
220 grpc::ServerContext context_{};
221 InitialRequest initial_request_{};
222 RawCall raw_responder_{&context_};
223 ugrpc::impl::AsyncMethodInvocation prepare_;
224 std::optional<
tracing::InPlaceSpan> span_{};
227template <
typename GrpcppService>
228class ServiceWorkerImpl
final :
public ServiceWorker {
230 template <
typename Service,
typename... ServiceMethods>
231 ServiceWorkerImpl(ServiceSettings&& settings,
232 ugrpc::impl::StaticServiceMetadata&& metadata,
233 Service& service, ServiceMethods... service_methods)
234 : service_data_(settings, metadata),
235 start_{[
this, &service, service_methods...] {
236 for (size_t i = 0; i < service_data_.settings.queue.GetSize(); i++) {
237 std::size_t method_id = 0;
238 (CallData<GrpcppService, CallTraits<ServiceMethods>>::ListenAsync(
239 {service_data_,
static_cast<
int>(i), method_id++, service,
245 ~ServiceWorkerImpl()
override {
246 service_data_.wait_tokens.WaitForAllTokens();
249 grpc::Service& GetService()
override {
return service_data_.async_service; }
251 const ugrpc::impl::StaticServiceMetadata& GetMetadata()
const override {
252 return service_data_.metadata;
255 void Start()
override { start_(); }
258 ServiceData<GrpcppService> service_data_;
259 std::function<
void()> start_;
263template <
typename GrpcppService,
typename Service,
typename... ServiceMethods>
264std::unique_ptr<ServiceWorker> MakeServiceWorker(
265 ServiceSettings&& settings,
266 const std::string_view (&method_full_names)[
sizeof...(ServiceMethods)],
267 Service& service, ServiceMethods... service_methods) {
268 return std::make_unique<ServiceWorkerImpl<GrpcppService>>(
270 ugrpc::impl::MakeStaticServiceMetadata<GrpcppService>(method_full_names),
271 service, service_methods...);