Github   Telegram
Loading...
Searching...
No Matches
service_worker_impl.hpp
1#pragma once
2
3#include <exception>
4#include <functional>
5#include <string>
6#include <string_view>
7#include <type_traits>
8#include <utility>
9
10#include <grpcpp/completion_queue.h>
11#include <grpcpp/impl/service_type.h>
12#include <grpcpp/server_context.h>
13
14#include <userver/engine/async.hpp>
15#include <userver/engine/task/cancel.hpp>
16#include <userver/engine/task/task_processor_fwd.hpp>
17#include <userver/tracing/in_place_span.hpp>
18#include <userver/tracing/span.hpp>
19#include <userver/utils/assert.hpp>
20#include <userver/utils/fast_scope_guard.hpp>
21#include <userver/utils/impl/wait_token_storage.hpp>
22#include <userver/utils/lazy_prvalue.hpp>
23
24#include <userver/ugrpc/impl/static_metadata.hpp>
25#include <userver/ugrpc/impl/statistics.hpp>
26#include <userver/ugrpc/impl/statistics_scope.hpp>
27#include <userver/ugrpc/server/impl/async_method_invocation.hpp>
28#include <userver/ugrpc/server/impl/async_service.hpp>
29#include <userver/ugrpc/server/impl/call_traits.hpp>
30#include <userver/ugrpc/server/impl/service_worker.hpp>
31#include <userver/ugrpc/server/middleware_base.hpp>
32#include <userver/ugrpc/server/rpc.hpp>
33#include <userver/ugrpc/server/service_base.hpp>
34#include <userver/utils/statistics/entry.hpp>
35
36USERVER_NAMESPACE_BEGIN
37
38namespace ugrpc::server::impl {
39
40void ReportHandlerError(const std::exception& ex, std::string_view call_name,
41 tracing::Span& span) noexcept;
42
43void ReportNetworkError(const RpcInterruptedError& ex,
44 std::string_view call_name,
45 tracing::Span& span) noexcept;
46
47void SetupSpan(std::optional<tracing::InPlaceSpan>& span_holder,
48 grpc::ServerContext& context, std::string_view call_name);
49
50/// Per-gRPC-service data
51template <typename GrpcppService>
52struct ServiceData final {
53 ServiceData(const ServiceSettings& settings,
54 const ugrpc::impl::StaticServiceMetadata& metadata)
55 : settings(settings),
56 metadata(metadata),
57 statistics(settings.statistics_storage.GetServiceStatistics(metadata)) {
58 }
59
60 ~ServiceData() = default;
61
62 const ServiceSettings settings;
63 const ugrpc::impl::StaticServiceMetadata metadata;
64 AsyncService<GrpcppService> async_service{metadata.method_full_names.size()};
65 utils::impl::WaitTokenStorage wait_tokens;
66 ugrpc::impl::ServiceStatistics& statistics;
67};
68
69/// Per-gRPC-method data
70template <typename GrpcppService, typename CallTraits>
71struct MethodData final {
72 ServiceData<GrpcppService>& service_data;
73 const std::size_t method_id{};
74 typename CallTraits::ServiceBase& service;
75 const typename CallTraits::ServiceMethod service_method;
76
77 std::string_view call_name{
78 service_data.metadata.method_full_names[method_id]};
79 ugrpc::impl::MethodStatistics& statistics{
80 service_data.statistics.GetMethodStatistics(method_id)};
81};
82
83template <typename GrpcppService, typename CallTraits>
84class CallData final {
85 public:
86 explicit CallData(const MethodData<GrpcppService, CallTraits>& method_data)
87 : wait_token_(method_data.service_data.wait_tokens.GetToken()),
88 method_data_(method_data) {
89 UASSERT(method_data.method_id <
90 method_data.service_data.metadata.method_full_names.size());
91 }
92
93 void operator()() && {
94 // Based on the tensorflow code, we must first call AsyncNotifyWhenDone
95 // and only then Prepare<>
96 // see
97 // https://git.ecdf.ed.ac.uk/s1886313/tensorflow/-/blob/438604fc885208ee05f9eef2d0f2c630e1360a83/tensorflow/core/distributed_runtime/rpc/grpc_call.h#L201
98 // and grpc::ServerContext::AsyncNotifyWhenDone
99 ugrpc::server::impl::RpcFinishedEvent notify_when_done(
100 engine::current_task::GetCancellationToken(), context_);
101
102 context_.AsyncNotifyWhenDone(notify_when_done.GetTag());
103
104 // the request for an incoming RPC must be performed synchronously
105 auto& queue = method_data_.service_data.settings.queue;
106 method_data_.service_data.async_service.template Prepare<CallTraits>(
107 method_data_.method_id, context_, initial_request_, raw_responder_,
108 queue, queue, prepare_.GetTag());
109
110 if (!prepare_.Wait()) {
111 // the CompletionQueue is shutting down
112
113 // Do not wait for notify_when_done. When queue is shutting down, it will
114 // not be called.
115 // https://github.com/grpc/grpc/issues/10136
116 return;
117 }
118
119 // start a concurrent listener immediately, as advised by gRPC docs
120 ListenAsync(method_data_);
121
122 HandleRpc();
123
124 // Even if we finished before receiving notification that call is done, we
125 // should wait on this async operation. CompletionQueue has a pointer to
126 // stack-allocated object, that object is going to be freed upon exit. To
127 // prevent segfaults, wait until queue is done with this object.
128 notify_when_done.Wait();
129 }
130
131 static void ListenAsync(const MethodData<GrpcppService, CallTraits>& data) {
132 engine::CriticalAsyncNoSpan(
133 data.service_data.settings.task_processor,
134 utils::LazyPrvalue([&] { return CallData(data); }))
135 .Detach();
136 }
137
138 private:
139 using InitialRequest = typename CallTraits::InitialRequest;
140 using RawCall = typename CallTraits::RawCall;
141 using Call = typename CallTraits::Call;
142
143 void HandleRpc() {
144 const auto call_name = method_data_.call_name;
145 auto& service = method_data_.service;
146 const auto service_method = method_data_.service_method;
147
148 SetupSpan(span_, context_, call_name);
149 utils::FastScopeGuard destroy_span([&]() noexcept { span_.reset(); });
150
151 ugrpc::impl::RpcStatisticsScope statistics_scope(method_data_.statistics);
152
153 Call responder(context_, call_name, raw_responder_, statistics_scope,
154 span_->Get());
155 auto do_call = [&] {
156 if constexpr (std::is_same_v<InitialRequest, NoInitialRequest>) {
157 (service.*service_method)(responder);
158 } else {
159 (service.*service_method)(responder, std::move(initial_request_));
160 }
161 };
162
163 try {
164 ::google::protobuf::Message* initial_request = nullptr;
165 if constexpr (!std::is_same_v<InitialRequest, NoInitialRequest>) {
166 initial_request = &initial_request_;
167 }
168
169 // TODO: pass responder as function_ref?
170 auto& middlewares = method_data_.service_data.settings.middlewares;
171 MiddlewareCallContext middleware_context(middlewares, responder, do_call,
172 initial_request);
173 middleware_context.Next();
174 } catch (const RpcInterruptedError& ex) {
175 ReportNetworkError(ex, call_name, span_->Get());
176 statistics_scope.OnNetworkError();
177 } catch (const std::exception& ex) {
178 ReportHandlerError(ex, call_name, span_->Get());
179 }
180 }
181
182 // 'wait_token_' must be the first field, because its lifetime keeps
183 // ServiceData alive during server shutdown.
184 const utils::impl::WaitTokenStorage::Token wait_token_;
185
186 MethodData<GrpcppService, CallTraits> method_data_;
187
188 grpc::ServerContext context_{};
189 InitialRequest initial_request_{};
190 RawCall raw_responder_{&context_};
191 ugrpc::impl::AsyncMethodInvocation prepare_{};
192 std::optional<tracing::InPlaceSpan> span_{};
193};
194
195template <typename GrpcppService>
196class ServiceWorkerImpl final : public ServiceWorker {
197 public:
198 template <typename Service, typename... ServiceMethods>
199 ServiceWorkerImpl(ServiceSettings&& settings,
200 ugrpc::impl::StaticServiceMetadata&& metadata,
201 Service& service, ServiceMethods... service_methods)
202 : service_data_(settings, metadata),
203 start_{[this, &service, service_methods...] {
204 std::size_t method_id = 0;
205 (CallData<GrpcppService, CallTraits<ServiceMethods>>::ListenAsync(
206 {service_data_, method_id++, service, service_methods}),
207 ...);
208 }} {}
209
210 ~ServiceWorkerImpl() override {
211 service_data_.wait_tokens.WaitForAllTokens();
212 }
213
214 grpc::Service& GetService() override { return service_data_.async_service; }
215
216 const ugrpc::impl::StaticServiceMetadata& GetMetadata() const override {
217 return service_data_.metadata;
218 }
219
220 void Start() override { start_(); }
221
222 private:
223 ServiceData<GrpcppService> service_data_;
224 std::function<void()> start_;
225};
226
227// Called from 'MakeWorker' of code-generated service base classes
228template <typename GrpcppService, typename Service, typename... ServiceMethods>
229std::unique_ptr<ServiceWorker> MakeServiceWorker(
230 ServiceSettings&& settings,
231 const std::string_view (&method_full_names)[sizeof...(ServiceMethods)],
232 Service& service, ServiceMethods... service_methods) {
233 return std::make_unique<ServiceWorkerImpl<GrpcppService>>(
234 ServiceSettings(settings),
235 ugrpc::impl::MakeStaticServiceMetadata<GrpcppService>(method_full_names),
236 service, service_methods...);
237}
238
239} // namespace ugrpc::server::impl
240
241USERVER_NAMESPACE_END