userver: userver/ugrpc/server/impl/service_worker_impl.hpp Source File
⚠️ This is the documentation for an old userver version. Click here to switch to the latest version.
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages Concepts
service_worker_impl.hpp
1#pragma once
2
3#include <chrono>
4#include <exception>
5#include <functional>
6#include <string>
7#include <string_view>
8#include <type_traits>
9#include <utility>
10
11#include <grpcpp/completion_queue.h>
12#include <grpcpp/impl/service_type.h>
13#include <grpcpp/server_context.h>
14
15#include <userver/engine/async.hpp>
16#include <userver/engine/task/cancel.hpp>
17#include <userver/engine/task/task_processor_fwd.hpp>
18#include <userver/server/request/task_inherited_data.hpp>
19#include <userver/tracing/in_place_span.hpp>
20#include <userver/tracing/span.hpp>
21#include <userver/utils/assert.hpp>
22#include <userver/utils/fast_scope_guard.hpp>
23#include <userver/utils/impl/wait_token_storage.hpp>
24#include <userver/utils/lazy_prvalue.hpp>
25#include <userver/utils/statistics/entry.hpp>
26
27#include <userver/ugrpc/impl/static_metadata.hpp>
28#include <userver/ugrpc/impl/statistics.hpp>
29#include <userver/ugrpc/impl/statistics_scope.hpp>
30#include <userver/ugrpc/server/impl/async_method_invocation.hpp>
31#include <userver/ugrpc/server/impl/async_service.hpp>
32#include <userver/ugrpc/server/impl/call_params.hpp>
33#include <userver/ugrpc/server/impl/call_traits.hpp>
34#include <userver/ugrpc/server/impl/error_code.hpp>
35#include <userver/ugrpc/server/impl/service_worker.hpp>
36#include <userver/ugrpc/server/middlewares/base.hpp>
37#include <userver/ugrpc/server/rpc.hpp>
38#include <userver/ugrpc/server/service_base.hpp>
39
40USERVER_NAMESPACE_BEGIN
41
42namespace ugrpc::server::impl {
43
44void ReportHandlerError(
45 const std::exception& ex, std::string_view call_name, tracing::Span& span,
46 ugrpc::impl::RpcStatisticsScope& statistics_scope) noexcept;
47
48void ReportNetworkError(
49 const RpcInterruptedError& ex, std::string_view call_name,
50 tracing::Span& span,
51 ugrpc::impl::RpcStatisticsScope& statistics_scope) noexcept;
52
53void ReportCustomError(
54 const USERVER_NAMESPACE::server::handlers::CustomHandlerException& ex,
55 CallAnyBase& call, tracing::Span& span);
56
57void SetupSpan(std::optional<tracing::InPlaceSpan>& span_holder,
58 grpc::ServerContext& context, std::string_view call_name);
59
60/// Per-gRPC-service data
61template <typename GrpcppService>
62struct ServiceData final {
63 ServiceData(const ServiceSettings& settings,
64 const ugrpc::impl::StaticServiceMetadata& metadata)
65 : settings(settings),
66 metadata(metadata),
67 statistics(settings.statistics_storage.GetServiceStatistics(metadata)) {
68 }
69
70 ~ServiceData() = default;
71
72 const ServiceSettings settings;
73 const ugrpc::impl::StaticServiceMetadata metadata;
74 AsyncService<GrpcppService> async_service{metadata.method_full_names.size()};
75 utils::impl::WaitTokenStorage wait_tokens;
76 ugrpc::impl::ServiceStatistics& statistics;
77};
78
79/// Per-gRPC-method data
80template <typename GrpcppService, typename CallTraits>
81struct MethodData final {
82 ServiceData<GrpcppService>& service_data;
83 int queue_num{0};
84 const std::size_t method_id{};
85 typename CallTraits::ServiceBase& service;
86 const typename CallTraits::ServiceMethod service_method;
87
88 std::string_view call_name{
89 service_data.metadata.method_full_names[method_id]};
90 // Remove name of the service and slash
91 std::string_view method_name{
92 call_name.substr(service_data.metadata.service_full_name.size() + 1)};
93 ugrpc::impl::MethodStatistics& statistics{
94 service_data.statistics.GetMethodStatistics(method_id)};
95};
96
97template <typename GrpcppService, typename CallTraits>
98class CallData final {
99 public:
100 explicit CallData(const MethodData<GrpcppService, CallTraits>& method_data)
101 : wait_token_(method_data.service_data.wait_tokens.GetToken()),
102 method_data_(method_data) {
103 UASSERT(method_data.method_id <
104 method_data.service_data.metadata.method_full_names.size());
105 }
106
107 void operator()() && {
108 // Based on the tensorflow code, we must first call AsyncNotifyWhenDone
109 // and only then Prepare<>
110 // see
111 // https://git.ecdf.ed.ac.uk/s1886313/tensorflow/-/blob/438604fc885208ee05f9eef2d0f2c630e1360a83/tensorflow/core/distributed_runtime/rpc/grpc_call.h#L201
112 // and grpc::ServerContext::AsyncNotifyWhenDone
113 ugrpc::server::impl::RpcFinishedEvent notify_when_done(
114 engine::current_task::GetCancellationToken(), context_);
115
116 context_.AsyncNotifyWhenDone(notify_when_done.GetTag());
117
118 // the request for an incoming RPC must be performed synchronously
119 auto& queue = method_data_.service_data.settings.queue.GetQueue(
120 method_data_.queue_num);
121
122 method_data_.service_data.async_service.template Prepare<CallTraits>(
123 method_data_.method_id, context_, initial_request_, raw_responder_,
124 queue, queue, prepare_.GetTag());
125
126 // Note: we ignore task cancellations here. Even if notify_when_done has
127 // already cancelled this RPC, we want to:
128 // 1. listen to further RPCs for the same method
129 // 2. handle this RPC correctly, including metrics, logs, etc.
130 if (Wait(prepare_) != impl::AsyncMethodInvocation::WaitStatus::kOk) {
131 // the CompletionQueue is shutting down
132
133 // Do not wait for notify_when_done. When queue is shutting down, it will
134 // not be called.
135 // https://github.com/grpc/grpc/issues/10136
136 return;
137 }
138
139 // start a concurrent listener immediately, as advised by gRPC docs
140 ListenAsync(method_data_);
141
142 HandleRpc();
143
144 // Even if we finished before receiving notification that call is done, we
145 // should wait on this async operation. CompletionQueue has a pointer to
146 // stack-allocated object, that object is going to be freed upon exit. To
147 // prevent segfaults, wait until queue is done with this object.
148 notify_when_done.Wait();
149 }
150
151 static void ListenAsync(const MethodData<GrpcppService, CallTraits>& data) {
152 engine::CriticalAsyncNoSpan(
153 data.service_data.settings.task_processor,
154 utils::LazyPrvalue([&] { return CallData(data); }))
155 .Detach();
156 }
157
158 private:
159 using InitialRequest = typename CallTraits::InitialRequest;
160 using RawCall = typename CallTraits::RawCall;
161 using Call = typename CallTraits::Call;
162
163 void HandleRpc() {
164 const auto call_name = method_data_.call_name;
165 auto& service = method_data_.service;
166 const auto service_method = method_data_.service_method;
167
168 const auto& service_name =
169 method_data_.service_data.metadata.service_full_name;
170 const auto& method_name = method_data_.method_name;
171
172 SetupSpan(span_, context_, call_name);
173 utils::FastScopeGuard destroy_span([&]() noexcept { span_.reset(); });
174
175 ugrpc::impl::RpcStatisticsScope statistics_scope(method_data_.statistics);
176
177 auto& access_tskv_logger =
178 method_data_.service_data.settings.access_tskv_logger;
179 utils::AnyStorage<StorageContext> storage_context;
180 Call responder(
181 CallParams{context_, call_name, statistics_scope, *access_tskv_logger,
182 span_->Get(), storage_context},
183 raw_responder_);
184 auto do_call = [&] {
185 if constexpr (std::is_same_v<InitialRequest, NoInitialRequest>) {
186 (service.*service_method)(responder);
187 } else {
188 (service.*service_method)(responder, std::move(initial_request_));
189 }
190 };
191
192 try {
193 ::google::protobuf::Message* initial_request = nullptr;
194 if constexpr (!std::is_same_v<InitialRequest, NoInitialRequest>) {
195 initial_request = &initial_request_;
196 }
197
198 auto& middlewares = method_data_.service_data.settings.middlewares;
199 MiddlewareCallContext middleware_context(
200 middlewares, responder, do_call, service_name, method_name,
201 method_data_.service_data.settings.config_source.GetSnapshot(),
202 initial_request);
203 middleware_context.Next();
204 } catch (
205 const USERVER_NAMESPACE::server::handlers::CustomHandlerException& ex) {
206 ReportCustomError(ex, responder, span_->Get());
207 } catch (const RpcInterruptedError& ex) {
208 ReportNetworkError(ex, call_name, span_->Get(), statistics_scope);
209 } catch (const std::exception& ex) {
210 ReportHandlerError(ex, call_name, span_->Get(), statistics_scope);
211 }
212 }
213
214 // 'wait_token_' must be the first field, because its lifetime keeps
215 // ServiceData alive during server shutdown.
216 const utils::impl::WaitTokenStorage::Token wait_token_;
217
218 MethodData<GrpcppService, CallTraits> method_data_;
219
220 grpc::ServerContext context_{};
221 InitialRequest initial_request_{};
222 RawCall raw_responder_{&context_};
223 ugrpc::impl::AsyncMethodInvocation prepare_;
224 std::optional<tracing::InPlaceSpan> span_{};
225};
226
227template <typename GrpcppService>
228class ServiceWorkerImpl final : public ServiceWorker {
229 public:
230 template <typename Service, typename... ServiceMethods>
231 ServiceWorkerImpl(ServiceSettings&& settings,
232 ugrpc::impl::StaticServiceMetadata&& metadata,
233 Service& service, ServiceMethods... service_methods)
234 : service_data_(settings, metadata),
235 start_{[this, &service, service_methods...] {
236 for (size_t i = 0; i < service_data_.settings.queue.GetSize(); i++) {
237 std::size_t method_id = 0;
238 (CallData<GrpcppService, CallTraits<ServiceMethods>>::ListenAsync(
239 {service_data_, static_cast<int>(i), method_id++, service,
240 service_methods}),
241 ...);
242 }
243 }} {}
244
245 ~ServiceWorkerImpl() override {
246 service_data_.wait_tokens.WaitForAllTokens();
247 }
248
249 grpc::Service& GetService() override { return service_data_.async_service; }
250
251 const ugrpc::impl::StaticServiceMetadata& GetMetadata() const override {
252 return service_data_.metadata;
253 }
254
255 void Start() override { start_(); }
256
257 private:
258 ServiceData<GrpcppService> service_data_;
259 std::function<void()> start_;
260};
261
262// Called from 'MakeWorker' of code-generated service base classes
263template <typename GrpcppService, typename Service, typename... ServiceMethods>
264std::unique_ptr<ServiceWorker> MakeServiceWorker(
265 ServiceSettings&& settings,
266 const std::string_view (&method_full_names)[sizeof...(ServiceMethods)],
267 Service& service, ServiceMethods... service_methods) {
268 return std::make_unique<ServiceWorkerImpl<GrpcppService>>(
269 std::move(settings),
270 ugrpc::impl::MakeStaticServiceMetadata<GrpcppService>(method_full_names),
271 service, service_methods...);
272}
273
274} // namespace ugrpc::server::impl
275
276USERVER_NAMESPACE_END