userver: userver/ugrpc/server/impl/service_worker_impl.hpp Source File
Loading...
Searching...
No Matches
service_worker_impl.hpp
1#pragma once
2
3#include <chrono>
4#include <exception>
5#include <functional>
6#include <string>
7#include <string_view>
8#include <type_traits>
9#include <utility>
10
11#include <grpcpp/completion_queue.h>
12#include <grpcpp/impl/service_type.h>
13#include <grpcpp/server_context.h>
14
15#include <userver/engine/async.hpp>
16#include <userver/engine/task/cancel.hpp>
17#include <userver/engine/task/task_processor_fwd.hpp>
18#include <userver/server/request/task_inherited_data.hpp>
19#include <userver/tracing/in_place_span.hpp>
20#include <userver/tracing/span.hpp>
21#include <userver/utils/assert.hpp>
22#include <userver/utils/fast_scope_guard.hpp>
23#include <userver/utils/impl/wait_token_storage.hpp>
24#include <userver/utils/lazy_prvalue.hpp>
25#include <userver/utils/statistics/entry.hpp>
26
27#include <userver/ugrpc/impl/static_metadata.hpp>
28#include <userver/ugrpc/impl/statistics.hpp>
29#include <userver/ugrpc/impl/statistics_scope.hpp>
30#include <userver/ugrpc/server/impl/async_method_invocation.hpp>
31#include <userver/ugrpc/server/impl/async_service.hpp>
32#include <userver/ugrpc/server/impl/call_params.hpp>
33#include <userver/ugrpc/server/impl/call_traits.hpp>
34#include <userver/ugrpc/server/impl/error_code.hpp>
35#include <userver/ugrpc/server/impl/service_worker.hpp>
36#include <userver/ugrpc/server/middlewares/base.hpp>
37#include <userver/ugrpc/server/rpc.hpp>
38#include <userver/ugrpc/server/service_base.hpp>
39
40USERVER_NAMESPACE_BEGIN
41
42namespace ugrpc::server::impl {
43
44void ReportHandlerError(
45 const std::exception& ex, std::string_view call_name, tracing::Span& span,
46 ugrpc::impl::RpcStatisticsScope& statistics_scope) noexcept;
47
48void ReportNetworkError(
49 const RpcInterruptedError& ex, std::string_view call_name,
50 tracing::Span& span,
51 ugrpc::impl::RpcStatisticsScope& statistics_scope) noexcept;
52
53void ReportCustomError(
54 const USERVER_NAMESPACE::server::handlers::CustomHandlerException& ex,
55 CallAnyBase& call, tracing::Span& span);
56
57void SetupSpan(std::optional<tracing::InPlaceSpan>& span_holder,
58 grpc::ServerContext& context, std::string_view call_name);
59
60/// Per-gRPC-service data
61template <typename GrpcppService>
62struct ServiceData final {
63 ServiceData(const ServiceSettings& settings,
64 const ugrpc::impl::StaticServiceMetadata& metadata)
65 : settings(settings),
66 metadata(metadata),
67 statistics(settings.statistics_storage.GetServiceStatistics(metadata)) {
68 }
69
70 ~ServiceData() = default;
71
72 const ServiceSettings settings;
73 const ugrpc::impl::StaticServiceMetadata metadata;
74 AsyncService<GrpcppService> async_service{metadata.method_full_names.size()};
75 utils::impl::WaitTokenStorage wait_tokens;
76 ugrpc::impl::ServiceStatistics& statistics;
77};
78
79/// Per-gRPC-method data
80template <typename GrpcppService, typename CallTraits>
81struct MethodData final {
82 ServiceData<GrpcppService>& service_data;
83 int queue_num{0};
84 const std::size_t method_id{};
85 typename CallTraits::ServiceBase& service;
86 const typename CallTraits::ServiceMethod service_method;
87
88 std::string_view call_name{
89 service_data.metadata.method_full_names[method_id]};
90 // Remove name of the service and slash
91 std::string_view method_name{
92 call_name.substr(service_data.metadata.service_full_name.size() + 1)};
93 ugrpc::impl::MethodStatistics& statistics{
94 service_data.statistics.GetMethodStatistics(method_id)};
95};
96
97template <typename GrpcppService, typename CallTraits>
98class CallData final {
99 public:
100 explicit CallData(const MethodData<GrpcppService, CallTraits>& method_data)
101 : wait_token_(method_data.service_data.wait_tokens.GetToken()),
102 method_data_(method_data) {
103 UASSERT(method_data.method_id <
104 method_data.service_data.metadata.method_full_names.size());
105 }
106
107 void operator()() && {
108 // Based on the tensorflow code, we must first call AsyncNotifyWhenDone
109 // and only then Prepare<>
110 // see
111 // https://git.ecdf.ed.ac.uk/s1886313/tensorflow/-/blob/438604fc885208ee05f9eef2d0f2c630e1360a83/tensorflow/core/distributed_runtime/rpc/grpc_call.h#L201
112 // and grpc::ServerContext::AsyncNotifyWhenDone
113 ugrpc::server::impl::RpcFinishedEvent notify_when_done(
114 engine::current_task::GetCancellationToken(), context_);
115
116 context_.AsyncNotifyWhenDone(notify_when_done.GetTag());
117
118 // the request for an incoming RPC must be performed synchronously
119 auto& queue = method_data_.service_data.settings.queue.GetQueue(
120 method_data_.queue_num);
121
122 method_data_.service_data.async_service.template Prepare<CallTraits>(
123 method_data_.method_id, context_, initial_request_, raw_responder_,
124 queue, queue, prepare_.GetTag());
125
126 // Note: we ignore task cancellations here. Even if notify_when_done has
127 // already cancelled this RPC, we want to:
128 // 1. listen to further RPCs for the same method
129 // 2. handle this RPC correctly, including metrics, logs, etc.
130 if (Wait(prepare_) != impl::AsyncMethodInvocation::WaitStatus::kOk) {
131 // the CompletionQueue is shutting down
132
133 // Do not wait for notify_when_done. When queue is shutting down, it will
134 // not be called.
135 // https://github.com/grpc/grpc/issues/10136
136 return;
137 }
138
139 // start a concurrent listener immediately, as advised by gRPC docs
140 ListenAsync(method_data_);
141
142 HandleRpc();
143
144 // Even if we finished before receiving notification that call is done, we
145 // should wait on this async operation. CompletionQueue has a pointer to
146 // stack-allocated object, that object is going to be freed upon exit. To
147 // prevent segfaults, wait until queue is done with this object.
148 notify_when_done.Wait();
149 }
150
151 static void ListenAsync(const MethodData<GrpcppService, CallTraits>& data) {
152 engine::CriticalAsyncNoSpan(
153 data.service_data.settings.task_processor,
154 utils::LazyPrvalue([&] { return CallData(data); }))
155 .Detach();
156 }
157
158 private:
159 using InitialRequest = typename CallTraits::InitialRequest;
160 using RawCall = typename CallTraits::RawCall;
161 using Call = typename CallTraits::Call;
162
163 void HandleRpc() {
164 const auto call_name = method_data_.call_name;
165 auto& service = method_data_.service;
166 const auto service_method = method_data_.service_method;
167
168 const auto& service_name =
169 method_data_.service_data.metadata.service_full_name;
170 const auto& method_name = method_data_.method_name;
171
172 SetupSpan(span_, context_, call_name);
173 utils::FastScopeGuard destroy_span([&]() noexcept { span_.reset(); });
174
175 ugrpc::impl::RpcStatisticsScope statistics_scope(method_data_.statistics);
176
177 auto& access_tskv_logger =
178 method_data_.service_data.settings.access_tskv_logger;
179 utils::AnyStorage<StorageContext> storage_context;
180 Call responder(
181 CallParams{context_, call_name, statistics_scope, *access_tskv_logger,
182 span_->Get(), storage_context},
183 raw_responder_);
184 auto do_call = [&] {
185 if constexpr (std::is_same_v<InitialRequest, NoInitialRequest>) {
186 (service.*service_method)(responder);
187 } else {
188 (service.*service_method)(responder, std::move(initial_request_));
189 }
190 };
191
192 try {
193 ::google::protobuf::Message* initial_request = nullptr;
194 if constexpr (!std::is_same_v<InitialRequest, NoInitialRequest>) {
195 initial_request = &initial_request_;
196 }
197
198 auto& middlewares = method_data_.service_data.settings.middlewares;
199 MiddlewareCallContext middleware_context(
200 middlewares, responder, do_call, service_name, method_name,
201 method_data_.service_data.settings.config_source.GetSnapshot(),
202 initial_request);
203 middleware_context.Next();
204 } catch (
205 const USERVER_NAMESPACE::server::handlers::CustomHandlerException& ex) {
206 ReportCustomError(ex, responder, span_->Get());
207 } catch (const RpcInterruptedError& ex) {
208 ReportNetworkError(ex, call_name, span_->Get(), statistics_scope);
209 } catch (const std::exception& ex) {
210 ReportHandlerError(ex, call_name, span_->Get(), statistics_scope);
211 }
212 }
213
214 // 'wait_token_' must be the first field, because its lifetime keeps
215 // ServiceData alive during server shutdown.
216 const utils::impl::WaitTokenStorage::Token wait_token_;
217
218 MethodData<GrpcppService, CallTraits> method_data_;
219
220 grpc::ServerContext context_{};
221 InitialRequest initial_request_{};
222 RawCall raw_responder_{&context_};
223 ugrpc::impl::AsyncMethodInvocation prepare_;
224 std::optional<tracing::InPlaceSpan> span_{};
225};
226
227template <typename GrpcppService>
228class ServiceWorkerImpl final : public ServiceWorker {
229 public:
230 template <typename Service, typename... ServiceMethods>
231 ServiceWorkerImpl(ServiceSettings&& settings,
232 ugrpc::impl::StaticServiceMetadata&& metadata,
233 Service& service, ServiceMethods... service_methods)
234 : service_data_(settings, metadata),
235 start_{[this, &service, service_methods...] {
236 for (size_t i = 0; i < service_data_.settings.queue.GetSize(); i++) {
237 std::size_t method_id = 0;
238 (CallData<GrpcppService, CallTraits<ServiceMethods>>::ListenAsync(
239 {service_data_, static_cast<int>(i), method_id++, service,
240 service_methods}),
241 ...);
242 }
243 }} {}
244
245 ~ServiceWorkerImpl() override {
246 service_data_.wait_tokens.WaitForAllTokens();
247 }
248
249 grpc::Service& GetService() override { return service_data_.async_service; }
250
251 const ugrpc::impl::StaticServiceMetadata& GetMetadata() const override {
252 return service_data_.metadata;
253 }
254
255 void Start() override { start_(); }
256
257 private:
258 ServiceData<GrpcppService> service_data_;
259 std::function<void()> start_;
260};
261
262// Called from 'MakeWorker' of code-generated service base classes
263template <typename GrpcppService, typename Service, typename... ServiceMethods>
264std::unique_ptr<ServiceWorker> MakeServiceWorker(
265 ServiceSettings&& settings,
266 const std::string_view (&method_full_names)[sizeof...(ServiceMethods)],
267 Service& service, ServiceMethods... service_methods) {
268 return std::make_unique<ServiceWorkerImpl<GrpcppService>>(
269 std::move(settings),
270 ugrpc::impl::MakeStaticServiceMetadata<GrpcppService>(method_full_names),
271 service, service_methods...);
272}
273
274} // namespace ugrpc::server::impl
275
276USERVER_NAMESPACE_END