userver: userver/ugrpc/server/rpc.hpp Source File
⚠️ This is the documentation for an old userver version. Click here to switch to the latest version.
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages Concepts
rpc.hpp
Go to the documentation of this file.
1#pragma once
2
3/// @file userver/ugrpc/server/rpc.hpp
4/// @brief Classes representing an incoming RPC
5
6#include <grpcpp/impl/codegen/proto_utils.h>
7#include <grpcpp/server_context.h>
8
9#include <userver/utils/assert.hpp>
10
11#include <userver/ugrpc/impl/deadline_timepoint.hpp>
12#include <userver/ugrpc/impl/internal_tag_fwd.hpp>
13#include <userver/ugrpc/impl/span.hpp>
14#include <userver/ugrpc/impl/statistics_scope.hpp>
15#include <userver/ugrpc/server/exceptions.hpp>
16#include <userver/ugrpc/server/impl/async_methods.hpp>
17#include <userver/ugrpc/server/impl/call_params.hpp>
18
19USERVER_NAMESPACE_BEGIN
20
21namespace ugrpc::server {
22
23namespace impl {
24
25std::string FormatLogMessage(
26 const std::multimap<grpc::string_ref, grpc::string_ref>& metadata,
27 std::string_view peer, std::chrono::system_clock::time_point start_time,
28 std::string_view call_name, int code);
29
30}
31
32/// @brief A non-typed base class for any gRPC call
34 public:
35 CallAnyBase(impl::CallParams&& params) : params_(params) {}
36
37 /// @brief Complete the RPC with an error
38 ///
39 /// `Finish` must not be called multiple times for the same RPC.
40 ///
41 /// @param status error details
42 /// @throws ugrpc::server::RpcError on an RPC error
43 virtual void FinishWithError(const grpc::Status& status) = 0;
44
45 /// @returns the `ServerContext` used for this RPC
46 /// @note Initial server metadata is not currently supported
47 /// @note Trailing metadata, if any, must be set before the `Finish` call
48 grpc::ServerContext& GetContext() { return params_.context; }
49
50 /// @brief Name of the call. Consists of service and method names
51 std::string_view GetCallName() const { return params_.call_name; }
52
53 tracing::Span& GetSpan() { return params_.call_span; }
54
55 virtual bool IsFinished() const = 0;
56
57 /// @cond
58 // For internal use only
59 ugrpc::impl::RpcStatisticsScope& Statistics(ugrpc::impl::InternalTag);
60 /// @endcond
61
62 protected:
63 ugrpc::impl::RpcStatisticsScope& Statistics() { return params_.statistics; }
64
65 logging::LoggerRef AccessTskvLogger() { return params_.access_tskv_logger; }
66
67 void LogFinish(grpc::Status status) const;
68
69 private:
70 impl::CallParams params_;
71};
72
73/// @brief Controls a single request -> single response RPC
74///
75/// The RPC is cancelled on destruction unless `Finish` has been called.
76template <typename Response>
77class UnaryCall final : public CallAnyBase {
78 public:
79 /// @brief Complete the RPC successfully
80 ///
81 /// `Finish` must not be called multiple times for the same RPC.
82 ///
83 /// @param response the single Response to send to the client
84 /// @throws ugrpc::server::RpcError on an RPC error
85 void Finish(const Response& response);
86
87 /// @brief Complete the RPC with an error
88 ///
89 /// `Finish` must not be called multiple times for the same RPC.
90 ///
91 /// @param status error details
92 /// @throws ugrpc::server::RpcError on an RPC error
93 void FinishWithError(const grpc::Status& status) override;
94
95 /// For internal use only
96 UnaryCall(impl::CallParams&& call_params,
97 impl::RawResponseWriter<Response>& stream);
98
99 UnaryCall(UnaryCall&&) = delete;
100 UnaryCall& operator=(UnaryCall&&) = delete;
101 ~UnaryCall();
102
103 bool IsFinished() const override;
104
105 private:
106 impl::RawResponseWriter<Response>& stream_;
107 bool is_finished_{false};
108};
109
110/// @brief Controls a request stream -> single response RPC
111///
112/// This class is not thread-safe except for `GetContext`.
113///
114/// The RPC is cancelled on destruction unless the stream has been finished.
115///
116/// If any method throws, further methods must not be called on the same stream,
117/// except for `GetContext`.
118template <typename Request, typename Response>
119class InputStream final : public CallAnyBase {
120 public:
121 /// @brief Await and read the next incoming message
122 /// @param request where to put the request on success
123 /// @returns `true` on success, `false` on end-of-input
124 [[nodiscard]] bool Read(Request& request);
125
126 /// @brief Complete the RPC successfully
127 ///
128 /// `Finish` must not be called multiple times for the same RPC.
129 ///
130 /// @param response the single Response to send to the client
131 /// @throws ugrpc::server::RpcError on an RPC error
132 void Finish(const Response& response);
133
134 /// @brief Complete the RPC with an error
135 ///
136 /// `Finish` must not be called multiple times for the same RPC.
137 ///
138 /// @param status error details
139 /// @throws ugrpc::server::RpcError on an RPC error
140 void FinishWithError(const grpc::Status& status) override;
141
142 /// For internal use only
143 InputStream(impl::CallParams&& call_params,
144 impl::RawReader<Request, Response>& stream);
145
146 InputStream(InputStream&&) = delete;
147 InputStream& operator=(InputStream&&) = delete;
148 ~InputStream();
149
150 bool IsFinished() const override;
151
152 private:
153 enum class State { kOpen, kReadsDone, kFinished };
154
155 impl::RawReader<Request, Response>& stream_;
156 State state_{State::kOpen};
157};
158
159/// @brief Controls a single request -> response stream RPC
160///
161/// This class is not thread-safe except for `GetContext`.
162///
163/// The RPC is cancelled on destruction unless the stream has been finished.
164///
165/// If any method throws, further methods must not be called on the same stream,
166/// except for `GetContext`.
167template <typename Response>
168class OutputStream final : public CallAnyBase {
169 public:
170 /// @brief Write the next outgoing message
171 /// @param response the next message to write
172 /// @throws ugrpc::server::RpcError on an RPC error
173 void Write(const Response& response);
174
175 /// @brief Complete the RPC successfully
176 ///
177 /// `Finish` must not be called multiple times.
178 ///
179 /// @throws ugrpc::server::RpcError on an RPC error
180 void Finish();
181
182 /// @brief Complete the RPC with an error
183 ///
184 /// `Finish` must not be called multiple times.
185 ///
186 /// @param status error details
187 /// @throws ugrpc::server::RpcError on an RPC error
188 void FinishWithError(const grpc::Status& status) override;
189
190 /// @brief Equivalent to `Write + Finish`
191 ///
192 /// This call saves one round-trip, compared to separate `Write` and `Finish`.
193 ///
194 /// `Finish` must not be called multiple times.
195 ///
196 /// @param response the final response message
197 /// @throws ugrpc::server::RpcError on an RPC error
198 void WriteAndFinish(const Response& response);
199
200 /// For internal use only
201 OutputStream(impl::CallParams&& call_params,
202 impl::RawWriter<Response>& stream);
203
204 OutputStream(OutputStream&&) = delete;
205 OutputStream& operator=(OutputStream&&) = delete;
206 ~OutputStream();
207
208 bool IsFinished() const override;
209
210 private:
211 enum class State { kNew, kOpen, kFinished };
212
213 impl::RawWriter<Response>& stream_;
214 State state_{State::kNew};
215};
216
217/// @brief Controls a request stream -> response stream RPC
218///
219/// This class is not thread-safe except for `GetContext`.
220///
221/// The RPC is cancelled on destruction unless the stream has been finished.
222///
223/// If any method throws, further methods must not be called on the same stream,
224/// except for `GetContext`.
225template <typename Request, typename Response>
227 public:
228 /// @brief Await and read the next incoming message
229 /// @param request where to put the request on success
230 /// @returns `true` on success, `false` on end-of-input
231 /// @throws ugrpc::server::RpcError on an RPC error
232 [[nodiscard]] bool Read(Request& request);
233
234 /// @brief Write the next outgoing message
235 /// @param response the next message to write
236 /// @throws ugrpc::server::RpcError on an RPC error
237 void Write(const Response& response);
238
239 /// @brief Complete the RPC successfully
240 ///
241 /// `Finish` must not be called multiple times.
242 ///
243 /// @throws ugrpc::server::RpcError on an RPC error
244 void Finish();
245
246 /// @brief Complete the RPC with an error
247 ///
248 /// `Finish` must not be called multiple times.
249 ///
250 /// @param status error details
251 /// @throws ugrpc::server::RpcError on an RPC error
252 void FinishWithError(const grpc::Status& status) override;
253
254 /// @brief Equivalent to `Write + Finish`
255 ///
256 /// This call saves one round-trip, compared to separate `Write` and `Finish`.
257 ///
258 /// `Finish` must not be called multiple times.
259 ///
260 /// @param response the final response message
261 /// @throws ugrpc::server::RpcError on an RPC error
262 void WriteAndFinish(const Response& response);
263
264 /// For internal use only
265 BidirectionalStream(impl::CallParams&& call_params,
266 impl::RawReaderWriter<Request, Response>& stream);
267
268 BidirectionalStream(const BidirectionalStream&) = delete;
269 BidirectionalStream(BidirectionalStream&&) = delete;
270 ~BidirectionalStream();
271
272 bool IsFinished() const override;
273
274 private:
275 enum class State { kOpen, kReadsDone, kFinished };
276
277 impl::RawReaderWriter<Request, Response>& stream_;
278 State state_{State::kOpen};
279};
280
281// ========================== Implementation follows ==========================
282
283template <typename Response>
284UnaryCall<Response>::UnaryCall(impl::CallParams&& call_params,
285 impl::RawResponseWriter<Response>& stream)
286 : CallAnyBase(std::move(call_params)), stream_(stream) {}
287
288template <typename Response>
289UnaryCall<Response>::~UnaryCall() {
290 if (!is_finished_) {
291 impl::CancelWithError(stream_, GetCallName());
292 LogFinish(impl::kUnknownErrorStatus);
293 }
294}
295
296template <typename Response>
297void UnaryCall<Response>::Finish(const Response& response) {
298 UINVARIANT(!is_finished_, "'Finish' called on a finished call");
299 is_finished_ = true;
300
301 LogFinish(grpc::Status::OK);
302 impl::Finish(stream_, response, grpc::Status::OK, GetCallName());
303 Statistics().OnExplicitFinish(grpc::StatusCode::OK);
304 ugrpc::impl::UpdateSpanWithStatus(GetSpan(), grpc::Status::OK);
305}
306
307template <typename Response>
308void UnaryCall<Response>::FinishWithError(const grpc::Status& status) {
309 UINVARIANT(!is_finished_, "'FinishWithError' called on a finished call");
310 is_finished_ = true;
311 LogFinish(status);
312 impl::FinishWithError(stream_, status, GetCallName());
313 Statistics().OnExplicitFinish(status.error_code());
314 ugrpc::impl::UpdateSpanWithStatus(GetSpan(), status);
315}
316
317template <typename Response>
318bool UnaryCall<Response>::IsFinished() const {
319 return is_finished_;
320}
321
322template <typename Request, typename Response>
323InputStream<Request, Response>::InputStream(
324 impl::CallParams&& call_params, impl::RawReader<Request, Response>& stream)
325 : CallAnyBase(std::move(call_params)), stream_(stream) {}
326
327template <typename Request, typename Response>
328InputStream<Request, Response>::~InputStream() {
329 if (state_ != State::kFinished) {
330 impl::CancelWithError(stream_, GetCallName());
331 LogFinish(impl::kUnknownErrorStatus);
332 }
333}
334
335template <typename Request, typename Response>
336bool InputStream<Request, Response>::Read(Request& request) {
337 UINVARIANT(state_ == State::kOpen,
338 "'Read' called while the stream is half-closed for reads");
339 if (impl::Read(stream_, request)) {
340 return true;
341 } else {
342 state_ = State::kReadsDone;
343 return false;
344 }
345}
346
347template <typename Request, typename Response>
348void InputStream<Request, Response>::Finish(const Response& response) {
349 UINVARIANT(state_ != State::kFinished,
350 "'Finish' called on a finished stream");
351 state_ = State::kFinished;
352 LogFinish(grpc::Status::OK);
353 impl::Finish(stream_, response, grpc::Status::OK, GetCallName());
354 Statistics().OnExplicitFinish(grpc::StatusCode::OK);
355 ugrpc::impl::UpdateSpanWithStatus(GetSpan(), grpc::Status::OK);
356}
357
358template <typename Request, typename Response>
359void InputStream<Request, Response>::FinishWithError(
360 const grpc::Status& status) {
361 UASSERT(!status.ok());
362 UINVARIANT(state_ != State::kFinished,
363 "'FinishWithError' called on a finished stream");
364 state_ = State::kFinished;
365 LogFinish(status);
366 impl::FinishWithError(stream_, status, GetCallName());
367 Statistics().OnExplicitFinish(status.error_code());
368 ugrpc::impl::UpdateSpanWithStatus(GetSpan(), status);
369}
370
371template <typename Request, typename Response>
372bool InputStream<Request, Response>::IsFinished() const {
373 return state_ == State::kFinished;
374}
375
376template <typename Response>
377OutputStream<Response>::OutputStream(impl::CallParams&& call_params,
378 impl::RawWriter<Response>& stream)
379 : CallAnyBase(std::move(call_params)), stream_(stream) {}
380
381template <typename Response>
382OutputStream<Response>::~OutputStream() {
383 if (state_ != State::kFinished) {
384 impl::Cancel(stream_, GetCallName());
385 LogFinish(impl::kUnknownErrorStatus);
386 }
387}
388
389template <typename Response>
390void OutputStream<Response>::Write(const Response& response) {
391 UINVARIANT(state_ != State::kFinished, "'Write' called on a finished stream");
392
393 // For some reason, gRPC requires explicit 'SendInitialMetadata' in output
394 // streams
395 impl::SendInitialMetadataIfNew(stream_, GetCallName(), state_);
396
397 // Don't buffer writes, otherwise in an event subscription scenario, events
398 // may never actually be delivered
399 grpc::WriteOptions write_options{};
400
401 impl::Write(stream_, response, write_options, GetCallName());
402}
403
404template <typename Response>
405void OutputStream<Response>::Finish() {
406 UINVARIANT(state_ != State::kFinished,
407 "'Finish' called on a finished stream");
408 state_ = State::kFinished;
409 const auto status = grpc::Status::OK;
410 LogFinish(status);
411 impl::Finish(stream_, status, GetCallName());
412 Statistics().OnExplicitFinish(grpc::StatusCode::OK);
413 ugrpc::impl::UpdateSpanWithStatus(GetSpan(), status);
414}
415
416template <typename Response>
417void OutputStream<Response>::FinishWithError(const grpc::Status& status) {
418 UASSERT(!status.ok());
419 UINVARIANT(state_ != State::kFinished,
420 "'Finish' called on a finished stream");
421 state_ = State::kFinished;
422 LogFinish(status);
423 impl::Finish(stream_, status, GetCallName());
424 Statistics().OnExplicitFinish(status.error_code());
425 ugrpc::impl::UpdateSpanWithStatus(GetSpan(), status);
426}
427
428template <typename Response>
429void OutputStream<Response>::WriteAndFinish(const Response& response) {
430 UINVARIANT(state_ != State::kFinished,
431 "'WriteAndFinish' called on a finished stream");
432 state_ = State::kFinished;
433
434 // Don't buffer writes, otherwise in an event subscription scenario, events
435 // may never actually be delivered
436 grpc::WriteOptions write_options{};
437
438 const auto status = grpc::Status::OK;
439 LogFinish(status);
440 impl::WriteAndFinish(stream_, response, write_options, status, GetCallName());
441}
442
443template <typename Response>
444bool OutputStream<Response>::IsFinished() const {
445 return state_ == State::kFinished;
446}
447
448template <typename Request, typename Response>
450 impl::CallParams&& call_params,
451 impl::RawReaderWriter<Request, Response>& stream)
452 : CallAnyBase(std::move(call_params)), stream_(stream) {}
453
454template <typename Request, typename Response>
455BidirectionalStream<Request, Response>::~BidirectionalStream() {
456 if (state_ != State::kFinished) {
457 impl::Cancel(stream_, GetCallName());
458 LogFinish(impl::kUnknownErrorStatus);
459 }
460}
461
462template <typename Request, typename Response>
463bool BidirectionalStream<Request, Response>::Read(Request& request) {
464 UINVARIANT(state_ == State::kOpen,
465 "'Read' called while the stream is half-closed for reads");
466 if (impl::Read(stream_, request)) {
467 return true;
468 } else {
469 state_ = State::kReadsDone;
470 return false;
471 }
472}
473
474template <typename Request, typename Response>
475void BidirectionalStream<Request, Response>::Write(const Response& response) {
476 UINVARIANT(state_ != State::kFinished, "'Write' called on a finished stream");
477
478 // Don't buffer writes, optimize for ping-pong-style interaction
479 grpc::WriteOptions write_options{};
480
481 impl::Write(stream_, response, write_options, GetCallName());
482}
483
484template <typename Request, typename Response>
485void BidirectionalStream<Request, Response>::Finish() {
486 UINVARIANT(state_ != State::kFinished,
487 "'Finish' called on a finished stream");
488 state_ = State::kFinished;
489 const auto status = grpc::Status::OK;
490 LogFinish(status);
491 impl::Finish(stream_, status, GetCallName());
492 Statistics().OnExplicitFinish(grpc::StatusCode::OK);
493 ugrpc::impl::UpdateSpanWithStatus(GetSpan(), status);
494}
495
496template <typename Request, typename Response>
497void BidirectionalStream<Request, Response>::FinishWithError(
498 const grpc::Status& status) {
499 UASSERT(!status.ok());
500 UINVARIANT(state_ != State::kFinished,
501 "'FinishWithError' called on a finished stream");
502 state_ = State::kFinished;
503 LogFinish(status);
504 impl::Finish(stream_, status, GetCallName());
505 Statistics().OnExplicitFinish(status.error_code());
506 ugrpc::impl::UpdateSpanWithStatus(GetSpan(), status);
507}
508
509template <typename Request, typename Response>
510void BidirectionalStream<Request, Response>::WriteAndFinish(
511 const Response& response) {
512 UINVARIANT(state_ != State::kFinished,
513 "'WriteAndFinish' called on a finished stream");
514 state_ = State::kFinished;
515
516 // Don't buffer writes, optimize for ping-pong-style interaction
517 grpc::WriteOptions write_options{};
518
519 const auto status = grpc::Status::OK;
520 LogFinish(status);
521 impl::WriteAndFinish(stream_, response, write_options, status, GetCallName());
522}
523
524template <typename Request, typename Response>
525bool BidirectionalStream<Request, Response>::IsFinished() const {
526 return state_ == State::kFinished;
527}
528
529} // namespace ugrpc::server
530
531USERVER_NAMESPACE_END