10#include <grpcpp/completion_queue.h>
11#include <grpcpp/impl/service_type.h>
12#include <grpcpp/server_context.h>
14#include <userver/engine/async.hpp>
15#include <userver/engine/task/cancel.hpp>
16#include <userver/engine/task/task_processor_fwd.hpp>
17#include <userver/tracing/in_place_span.hpp>
18#include <userver/tracing/span.hpp>
19#include <userver/utils/assert.hpp>
20#include <userver/utils/fast_scope_guard.hpp>
21#include <userver/utils/impl/wait_token_storage.hpp>
22#include <userver/utils/lazy_prvalue.hpp>
24#include <userver/ugrpc/impl/static_metadata.hpp>
25#include <userver/ugrpc/impl/statistics.hpp>
26#include <userver/ugrpc/impl/statistics_scope.hpp>
27#include <userver/ugrpc/server/impl/async_method_invocation.hpp>
28#include <userver/ugrpc/server/impl/async_service.hpp>
29#include <userver/ugrpc/server/impl/call_traits.hpp>
30#include <userver/ugrpc/server/impl/service_worker.hpp>
31#include <userver/ugrpc/server/middleware_base.hpp>
32#include <userver/ugrpc/server/rpc.hpp>
33#include <userver/ugrpc/server/service_base.hpp>
34#include <userver/utils/statistics/entry.hpp>
36USERVER_NAMESPACE_BEGIN
38namespace ugrpc::
server::impl {
40void ReportHandlerError(
const std::exception& ex, std::string_view call_name,
44 std::string_view call_name,
47void SetupSpan(std::optional<
tracing::InPlaceSpan>& span_holder,
48 grpc::ServerContext& context, std::string_view call_name);
51template <
typename GrpcppService>
52struct ServiceData
final {
53 ServiceData(
const ServiceSettings& settings,
54 const ugrpc::impl::StaticServiceMetadata& metadata)
57 statistics(settings.statistics_storage.GetServiceStatistics(metadata)) {
60 ~ServiceData() =
default;
62 const ServiceSettings settings;
63 const ugrpc::impl::StaticServiceMetadata metadata;
64 AsyncService<GrpcppService> async_service{metadata.method_full_names.size()};
65 utils::impl::WaitTokenStorage wait_tokens;
66 ugrpc::impl::ServiceStatistics& statistics;
70template <
typename GrpcppService,
typename CallTraits>
71struct MethodData
final {
72 ServiceData<GrpcppService>& service_data;
73 const std::size_t method_id{};
74 typename CallTraits::ServiceBase& service;
75 const typename CallTraits::ServiceMethod service_method;
77 std::string_view call_name{
78 service_data.metadata.method_full_names[method_id]};
79 ugrpc::impl::MethodStatistics& statistics{
80 service_data.statistics.GetMethodStatistics(method_id)};
83template <
typename GrpcppService,
typename CallTraits>
86 explicit CallData(
const MethodData<GrpcppService, CallTraits>& method_data)
87 : wait_token_(method_data.service_data.wait_tokens.GetToken()),
88 method_data_(method_data) {
90 method_data.service_data.metadata.method_full_names.size());
93 void operator()() && {
99 ugrpc::
server::impl::RpcFinishedEvent notify_when_done(
100 engine::current_task::GetCancellationToken(), context_);
102 context_.AsyncNotifyWhenDone(notify_when_done.GetTag());
105 auto& queue = method_data_.service_data.settings.queue;
106 method_data_.service_data.async_service.
template Prepare<CallTraits>(
107 method_data_.method_id, context_, initial_request_, raw_responder_,
108 queue, queue, prepare_.GetTag());
110 if (!prepare_.Wait()) {
120 ListenAsync(method_data_);
128 notify_when_done.Wait();
131 static void ListenAsync(
const MethodData<GrpcppService, CallTraits>& data) {
132 engine::CriticalAsyncNoSpan(
133 data.service_data.settings.task_processor,
134 utils::LazyPrvalue([&] {
return CallData(data); }))
139 using InitialRequest =
typename CallTraits::InitialRequest;
140 using RawCall =
typename CallTraits::RawCall;
141 using Call =
typename CallTraits::Call;
144 const auto call_name = method_data_.call_name;
145 auto& service = method_data_.service;
146 const auto service_method = method_data_.service_method;
148 SetupSpan(span_, context_, call_name);
149 utils::FastScopeGuard destroy_span([&]()
noexcept { span_.reset(); });
151 ugrpc::impl::RpcStatisticsScope statistics_scope(method_data_.statistics);
153 Call responder(context_, call_name, raw_responder_, statistics_scope,
156 if constexpr (std::is_same_v<InitialRequest, NoInitialRequest>) {
157 (service.*service_method)(responder);
159 (service.*service_method)(responder, std::move(initial_request_));
164 ::google::protobuf::Message* initial_request =
nullptr;
165 if constexpr (!std::is_same_v<InitialRequest, NoInitialRequest>) {
166 initial_request = &initial_request_;
170 auto& middlewares = method_data_.service_data.settings.middlewares;
171 MiddlewareCallContext middleware_context(middlewares, responder, do_call,
175 ReportNetworkError(ex, call_name, span_->Get());
176 statistics_scope.OnNetworkError();
177 }
catch (
const std::exception& ex) {
178 ReportHandlerError(ex, call_name, span_->Get());
184 const utils::impl::WaitTokenStorage::Token wait_token_;
186 MethodData<GrpcppService, CallTraits> method_data_;
188 grpc::ServerContext context_{};
189 InitialRequest initial_request_{};
190 RawCall raw_responder_{&context_};
191 ugrpc::impl::AsyncMethodInvocation prepare_{};
192 std::optional<
tracing::InPlaceSpan> span_{};
195template <
typename GrpcppService>
196class ServiceWorkerImpl
final :
public ServiceWorker {
198 template <
typename Service,
typename... ServiceMethods>
199 ServiceWorkerImpl(ServiceSettings&& settings,
200 ugrpc::impl::StaticServiceMetadata&& metadata,
201 Service& service, ServiceMethods... service_methods)
202 : service_data_(settings, metadata),
203 start_{[
this, &service, service_methods...] {
204 std::size_t method_id = 0;
205 (CallData<GrpcppService, CallTraits<ServiceMethods>>::ListenAsync(
206 {service_data_, method_id++, service, service_methods}),
210 ~ServiceWorkerImpl()
override {
211 service_data_.wait_tokens.WaitForAllTokens();
214 grpc::Service& GetService()
override {
return service_data_.async_service; }
216 const ugrpc::impl::StaticServiceMetadata& GetMetadata()
const override {
217 return service_data_.metadata;
220 void Start()
override { start_(); }
223 ServiceData<GrpcppService> service_data_;
224 std::function<
void()> start_;
228template <
typename GrpcppService,
typename Service,
typename... ServiceMethods>
229std::unique_ptr<ServiceWorker> MakeServiceWorker(
230 ServiceSettings&& settings,
231 const std::string_view (&method_full_names)[
sizeof...(ServiceMethods)],
232 Service& service, ServiceMethods... service_methods) {
233 return std::make_unique<ServiceWorkerImpl<GrpcppService>>(
234 ServiceSettings(settings),
235 ugrpc::impl::MakeStaticServiceMetadata<GrpcppService>(method_full_names),
236 service, service_methods...);