userver: userver/ugrpc/server/rpc.hpp Source File
Loading...
Searching...
No Matches
rpc.hpp
Go to the documentation of this file.
1#pragma once
2
3/// @file userver/ugrpc/server/rpc.hpp
4/// @brief Classes representing an incoming RPC
5
6#include <google/protobuf/message.h>
7#include <grpcpp/impl/codegen/proto_utils.h>
8#include <grpcpp/server_context.h>
9
10#include <userver/utils/assert.hpp>
11
12#include <userver/ugrpc/impl/deadline_timepoint.hpp>
13#include <userver/ugrpc/impl/internal_tag_fwd.hpp>
14#include <userver/ugrpc/impl/span.hpp>
15#include <userver/ugrpc/impl/statistics_scope.hpp>
16#include <userver/ugrpc/server/exceptions.hpp>
17#include <userver/ugrpc/server/impl/async_methods.hpp>
18#include <userver/ugrpc/server/impl/call_params.hpp>
19#include <userver/ugrpc/server/middlewares/fwd.hpp>
20
21USERVER_NAMESPACE_BEGIN
22
23namespace ugrpc::server {
24
25namespace impl {
26
27std::string FormatLogMessage(
28 const std::multimap<grpc::string_ref, grpc::string_ref>& metadata,
29 std::string_view peer, std::chrono::system_clock::time_point start_time,
30 std::string_view call_name, grpc::StatusCode code);
31
32}
33
34/// @brief RPCs kinds
35enum class CallKind {
36 kUnaryCall,
37 kRequestStream,
38 kResponseStream,
39 kBidirectionalStream,
40};
41
42/// @brief A non-typed base class for any gRPC call
44 public:
45 /// @brief Complete the RPC with an error
46 ///
47 /// `Finish` must not be called multiple times for the same RPC.
48 ///
49 /// @param status error details
50 /// @throws ugrpc::server::RpcError on an RPC error
51 /// @see @ref IsFinished
52 virtual void FinishWithError(const grpc::Status& status) = 0;
53
54 /// @returns the `ServerContext` used for this RPC
55 /// @note Initial server metadata is not currently supported
56 /// @note Trailing metadata, if any, must be set before the `Finish` call
57 grpc::ServerContext& GetContext() { return params_.context; }
58
59 /// @brief Name of the RPC in the format `full.path.ServiceName/MethodName`
60 std::string_view GetCallName() const { return params_.call_name; }
61
62 /// @brief Get name of gRPC service
64
65 /// @brief Get name of called gRPC method
67
68 /// @brief Get the span of the current RPC. Span's lifetime covers the
69 /// `Handle` call of the outermost @ref MiddlewareBase "middleware".
70 tracing::Span& GetSpan() { return params_.call_span; }
71
72 /// @brief Get RPCs kind of method
73 CallKind GetCallKind() const { return call_kind_; }
74
75 /// @brief Returns call context for storing per-call custom data
76 ///
77 /// The context can be used to pass data from server middleware to client
78 /// handler or from one middleware to another one.
79 ///
80 /// ## Example usage:
81 ///
82 /// In authentication middleware:
83 ///
84 /// @code
85 /// if (password_is_correct) {
86 /// // Username is authenticated, set it in per-call storage context
87 /// ctx.GetCall().GetStorageContext().Emplace(kAuthUsername, username);
88 /// }
89 /// @endcode
90 ///
91 /// In client handler:
92 ///
93 /// @code
94 /// const auto& username = rpc.GetStorageContext().Get(kAuthUsername);
95 /// auto msg = fmt::format("Hello, {}!", username);
96 /// @endcode
97 utils::AnyStorage<StorageContext>& GetStorageContext() {
98 return params_.storage_context;
99 }
100
101 /// @brief Useful for generic error reporting via @ref FinishWithError
102 virtual bool IsFinished() const = 0;
103
104 /// @brief Set a custom call name for metric labels
105 void SetMetricsCallName(std::string_view call_name);
106
107 /// @cond
108 // For internal use only
109 CallAnyBase(utils::impl::InternalTag, impl::CallParams&& params,
110 CallKind call_kind)
111 : params_(std::move(params)), call_kind_(call_kind) {}
112
113 // For internal use only
114 ugrpc::impl::RpcStatisticsScope& GetStatistics(ugrpc::impl::InternalTag);
115
116 // For internal use only
117 void RunMiddlewarePipeline(utils::impl::InternalTag,
118 MiddlewareCallContext& md_call_context);
119 /// @endcond
120
121 protected:
122 ugrpc::impl::RpcStatisticsScope& GetStatistics() {
123 return params_.statistics;
124 }
125
126 logging::LoggerRef AccessTskvLogger() { return params_.access_tskv_logger; }
127
128 void LogFinish(grpc::Status status) const;
129
130 void ApplyRequestHook(google::protobuf::Message* request);
131
132 void ApplyResponseHook(google::protobuf::Message* response);
133
134 private:
135 impl::CallParams params_;
136 CallKind call_kind_;
137 MiddlewareCallContext* middleware_call_context_{nullptr};
138};
139
140/// @brief Controls a single request -> single response RPC
141///
142/// The RPC is cancelled on destruction unless `Finish` has been called.
143template <typename Response>
144class UnaryCall final : public CallAnyBase {
145 public:
146 /// @brief Complete the RPC successfully
147 ///
148 /// `Finish` must not be called multiple times for the same RPC.
149 ///
150 /// @param response the single Response to send to the client
151 /// @throws ugrpc::server::RpcError on an RPC error
152 void Finish(Response& response);
153
154 /// @brief Complete the RPC successfully
155 ///
156 /// `Finish` must not be called multiple times for the same RPC.
157 ///
158 /// @param response the single Response to send to the client
159 /// @throws ugrpc::server::RpcError on an RPC error
160 void Finish(Response&& response);
161
162 /// @brief Complete the RPC with an error
163 ///
164 /// `Finish` must not be called multiple times for the same RPC.
165 ///
166 /// @param status error details
167 /// @throws ugrpc::server::RpcError on an RPC error
168 void FinishWithError(const grpc::Status& status) override;
169
170 /// For internal use only
171 UnaryCall(impl::CallParams&& call_params,
172 impl::RawResponseWriter<Response>& stream);
173
174 UnaryCall(UnaryCall&&) = delete;
175 UnaryCall& operator=(UnaryCall&&) = delete;
176 ~UnaryCall();
177
178 bool IsFinished() const override;
179
180 private:
181 impl::RawResponseWriter<Response>& stream_;
182 bool is_finished_{false};
183};
184
185/// @brief Controls a request stream -> single response RPC
186///
187/// This class is not thread-safe except for `GetContext`.
188///
189/// The RPC is cancelled on destruction unless the stream has been finished.
190///
191/// If any method throws, further methods must not be called on the same stream,
192/// except for `GetContext`.
193template <typename Request, typename Response>
194class InputStream final : public CallAnyBase {
195 public:
196 /// @brief Await and read the next incoming message
197 /// @param request where to put the request on success
198 /// @returns `true` on success, `false` on end-of-input
199 [[nodiscard]] bool Read(Request& request);
200
201 /// @brief Complete the RPC successfully
202 ///
203 /// `Finish` must not be called multiple times for the same RPC.
204 ///
205 /// @param response the single Response to send to the client
206 /// @throws ugrpc::server::RpcError on an RPC error
207 void Finish(Response& response);
208
209 /// @brief Complete the RPC successfully
210 ///
211 /// `Finish` must not be called multiple times for the same RPC.
212 ///
213 /// @param response the single Response to send to the client
214 /// @throws ugrpc::server::RpcError on an RPC error
215 void Finish(Response&& response);
216
217 /// @brief Complete the RPC with an error
218 ///
219 /// `Finish` must not be called multiple times for the same RPC.
220 ///
221 /// @param status error details
222 /// @throws ugrpc::server::RpcError on an RPC error
223 void FinishWithError(const grpc::Status& status) override;
224
225 /// For internal use only
226 InputStream(impl::CallParams&& call_params,
227 impl::RawReader<Request, Response>& stream);
228
229 InputStream(InputStream&&) = delete;
230 InputStream& operator=(InputStream&&) = delete;
231 ~InputStream();
232
233 bool IsFinished() const override;
234
235 private:
236 enum class State { kOpen, kReadsDone, kFinished };
237
238 impl::RawReader<Request, Response>& stream_;
239 State state_{State::kOpen};
240};
241
242/// @brief Controls a single request -> response stream RPC
243///
244/// This class is not thread-safe except for `GetContext`.
245///
246/// The RPC is cancelled on destruction unless the stream has been finished.
247///
248/// If any method throws, further methods must not be called on the same stream,
249/// except for `GetContext`.
250template <typename Response>
251class OutputStream final : public CallAnyBase {
252 public:
253 /// @brief Write the next outgoing message
254 /// @param response the next message to write
255 /// @throws ugrpc::server::RpcError on an RPC error
256 void Write(Response& response);
257
258 /// @brief Write the next outgoing message
259 /// @param response the next message to write
260 /// @throws ugrpc::server::RpcError on an RPC error
261 void Write(Response&& response);
262
263 /// @brief Complete the RPC successfully
264 ///
265 /// `Finish` must not be called multiple times.
266 ///
267 /// @throws ugrpc::server::RpcError on an RPC error
268 void Finish();
269
270 /// @brief Complete the RPC with an error
271 ///
272 /// `Finish` must not be called multiple times.
273 ///
274 /// @param status error details
275 /// @throws ugrpc::server::RpcError on an RPC error
276 void FinishWithError(const grpc::Status& status) override;
277
278 /// @brief Equivalent to `Write + Finish`
279 ///
280 /// This call saves one round-trip, compared to separate `Write` and `Finish`.
281 ///
282 /// `Finish` must not be called multiple times.
283 ///
284 /// @param response the final response message
285 /// @throws ugrpc::server::RpcError on an RPC error
286 void WriteAndFinish(Response& response);
287
288 /// @brief Equivalent to `Write + Finish`
289 ///
290 /// This call saves one round-trip, compared to separate `Write` and `Finish`.
291 ///
292 /// `Finish` must not be called multiple times.
293 ///
294 /// @param response the final response message
295 /// @throws ugrpc::server::RpcError on an RPC error
296 void WriteAndFinish(Response&& response);
297
298 /// For internal use only
299 OutputStream(impl::CallParams&& call_params,
300 impl::RawWriter<Response>& stream);
301
302 OutputStream(OutputStream&&) = delete;
303 OutputStream& operator=(OutputStream&&) = delete;
304 ~OutputStream();
305
306 bool IsFinished() const override;
307
308 private:
309 enum class State { kNew, kOpen, kFinished };
310
311 impl::RawWriter<Response>& stream_;
312 State state_{State::kNew};
313};
314
315/// @brief Controls a request stream -> response stream RPC
316///
317/// This class allows the following concurrent calls:
318///
319/// - `GetContext`
320/// - `Read`;
321/// - one of (`Write`, `Finish`, `FinishWithError`, `WriteAndFinish`).
322///
323/// The RPC is cancelled on destruction unless the stream has been finished.
324///
325/// If any method throws, further methods must not be called on the same stream,
326/// except for `GetContext`.
327template <typename Request, typename Response>
329 public:
330 /// @brief Await and read the next incoming message
331 /// @param request where to put the request on success
332 /// @returns `true` on success, `false` on end-of-input
333 /// @throws ugrpc::server::RpcError on an RPC error
334 [[nodiscard]] bool Read(Request& request);
335
336 /// @brief Write the next outgoing message
337 /// @param response the next message to write
338 /// @throws ugrpc::server::RpcError on an RPC error
339 void Write(Response& response);
340
341 /// @brief Write the next outgoing message
342 /// @param response the next message to write
343 /// @throws ugrpc::server::RpcError on an RPC error
344 void Write(Response&& response);
345
346 /// @brief Complete the RPC successfully
347 ///
348 /// `Finish` must not be called multiple times.
349 ///
350 /// @throws ugrpc::server::RpcError on an RPC error
351 void Finish();
352
353 /// @brief Complete the RPC with an error
354 ///
355 /// `Finish` must not be called multiple times.
356 ///
357 /// @param status error details
358 /// @throws ugrpc::server::RpcError on an RPC error
359 void FinishWithError(const grpc::Status& status) override;
360
361 /// @brief Equivalent to `Write + Finish`
362 ///
363 /// This call saves one round-trip, compared to separate `Write` and `Finish`.
364 ///
365 /// `Finish` must not be called multiple times.
366 ///
367 /// @param response the final response message
368 /// @throws ugrpc::server::RpcError on an RPC error
369 void WriteAndFinish(Response& response);
370
371 /// @brief Equivalent to `Write + Finish`
372 ///
373 /// This call saves one round-trip, compared to separate `Write` and `Finish`.
374 ///
375 /// `Finish` must not be called multiple times.
376 ///
377 /// @param response the final response message
378 /// @throws ugrpc::server::RpcError on an RPC error
379 void WriteAndFinish(Response&& response);
380
381 /// For internal use only
382 BidirectionalStream(impl::CallParams&& call_params,
383 impl::RawReaderWriter<Request, Response>& stream);
384
385 BidirectionalStream(const BidirectionalStream&) = delete;
386 BidirectionalStream(BidirectionalStream&&) = delete;
387 ~BidirectionalStream();
388
389 bool IsFinished() const override;
390
391 private:
392 impl::RawReaderWriter<Request, Response>& stream_;
393 bool are_reads_done_{false};
394 bool is_finished_{false};
395};
396
397// ========================== Implementation follows ==========================
398
399template <typename Response>
400UnaryCall<Response>::UnaryCall(impl::CallParams&& call_params,
401 impl::RawResponseWriter<Response>& stream)
402 : CallAnyBase(utils::impl::InternalTag{}, std::move(call_params),
403 CallKind::kUnaryCall),
404 stream_(stream) {}
405
406template <typename Response>
407UnaryCall<Response>::~UnaryCall() {
408 if (!is_finished_) {
409 impl::CancelWithError(stream_, GetCallName());
410 LogFinish(impl::kUnknownErrorStatus);
411 }
412}
413
414template <typename Response>
415void UnaryCall<Response>::Finish(Response&& response) {
416 Finish(response);
417}
418
419template <typename Response>
420void UnaryCall<Response>::Finish(Response& response) {
421 UINVARIANT(!is_finished_, "'Finish' called on a finished call");
422 is_finished_ = true;
423
424 ApplyResponseHook(&response);
425
426 LogFinish(grpc::Status::OK);
427 impl::Finish(stream_, response, grpc::Status::OK, GetCallName());
428 GetStatistics().OnExplicitFinish(grpc::StatusCode::OK);
429 ugrpc::impl::UpdateSpanWithStatus(GetSpan(), grpc::Status::OK);
430}
431
432template <typename Response>
433void UnaryCall<Response>::FinishWithError(const grpc::Status& status) {
434 if (IsFinished()) return;
435 is_finished_ = true;
436 LogFinish(status);
437 impl::FinishWithError(stream_, status, GetCallName());
438 GetStatistics().OnExplicitFinish(status.error_code());
439 ugrpc::impl::UpdateSpanWithStatus(GetSpan(), status);
440}
441
442template <typename Response>
443bool UnaryCall<Response>::IsFinished() const {
444 return is_finished_;
445}
446
447template <typename Request, typename Response>
448InputStream<Request, Response>::InputStream(
449 impl::CallParams&& call_params, impl::RawReader<Request, Response>& stream)
450 : CallAnyBase(utils::impl::InternalTag{}, std::move(call_params),
451 CallKind::kRequestStream),
452 stream_(stream) {}
453
454template <typename Request, typename Response>
455InputStream<Request, Response>::~InputStream() {
456 if (state_ != State::kFinished) {
457 impl::CancelWithError(stream_, GetCallName());
458 LogFinish(impl::kUnknownErrorStatus);
459 }
460}
461
462template <typename Request, typename Response>
463bool InputStream<Request, Response>::Read(Request& request) {
464 UINVARIANT(state_ == State::kOpen,
465 "'Read' called while the stream is half-closed for reads");
466 if (impl::Read(stream_, request)) {
467 ApplyRequestHook(&request);
468 return true;
469 } else {
470 state_ = State::kReadsDone;
471 return false;
472 }
473}
474
475template <typename Request, typename Response>
476void InputStream<Request, Response>::Finish(Response&& response) {
477 Finish(response);
478}
479
480template <typename Request, typename Response>
481void InputStream<Request, Response>::Finish(Response& response) {
482 UINVARIANT(state_ != State::kFinished,
483 "'Finish' called on a finished stream");
484 state_ = State::kFinished;
485
486 const auto& status = grpc::Status::OK;
487 LogFinish(status);
488
489 ApplyResponseHook(&response);
490
491 impl::Finish(stream_, response, status, GetCallName());
492 GetStatistics().OnExplicitFinish(status.error_code());
493 ugrpc::impl::UpdateSpanWithStatus(GetSpan(), status);
494}
495
496template <typename Request, typename Response>
497void InputStream<Request, Response>::FinishWithError(
498 const grpc::Status& status) {
499 UASSERT(!status.ok());
500 if (IsFinished()) return;
501 state_ = State::kFinished;
502 LogFinish(status);
503 impl::FinishWithError(stream_, status, GetCallName());
504 GetStatistics().OnExplicitFinish(status.error_code());
505 ugrpc::impl::UpdateSpanWithStatus(GetSpan(), status);
506}
507
508template <typename Request, typename Response>
509bool InputStream<Request, Response>::IsFinished() const {
510 return state_ == State::kFinished;
511}
512
513template <typename Response>
514OutputStream<Response>::OutputStream(impl::CallParams&& call_params,
515 impl::RawWriter<Response>& stream)
516 : CallAnyBase(utils::impl::InternalTag{}, std::move(call_params),
517 CallKind::kResponseStream),
518 stream_(stream) {}
519
520template <typename Response>
521OutputStream<Response>::~OutputStream() {
522 if (state_ != State::kFinished) {
523 impl::Cancel(stream_, GetCallName());
524 LogFinish(impl::kUnknownErrorStatus);
525 }
526}
527
528template <typename Response>
529void OutputStream<Response>::Write(Response&& response) {
530 Write(response);
531}
532
533template <typename Response>
534void OutputStream<Response>::Write(Response& response) {
535 UINVARIANT(state_ != State::kFinished, "'Write' called on a finished stream");
536
537 // For some reason, gRPC requires explicit 'SendInitialMetadata' in output
538 // streams
539 impl::SendInitialMetadataIfNew(stream_, GetCallName(), state_);
540
541 // Don't buffer writes, otherwise in an event subscription scenario, events
542 // may never actually be delivered
543 grpc::WriteOptions write_options{};
544
545 ApplyResponseHook(&response);
546
547 impl::Write(stream_, response, write_options, GetCallName());
548}
549
550template <typename Response>
551void OutputStream<Response>::Finish() {
552 UINVARIANT(state_ != State::kFinished,
553 "'Finish' called on a finished stream");
554 state_ = State::kFinished;
555
556 const auto& status = grpc::Status::OK;
557 LogFinish(status);
558 impl::Finish(stream_, status, GetCallName());
559 GetStatistics().OnExplicitFinish(grpc::StatusCode::OK);
560 ugrpc::impl::UpdateSpanWithStatus(GetSpan(), status);
561}
562
563template <typename Response>
564void OutputStream<Response>::FinishWithError(const grpc::Status& status) {
565 UASSERT(!status.ok());
566 if (IsFinished()) return;
567 state_ = State::kFinished;
568 LogFinish(status);
569 impl::Finish(stream_, status, GetCallName());
570 GetStatistics().OnExplicitFinish(status.error_code());
571 ugrpc::impl::UpdateSpanWithStatus(GetSpan(), status);
572}
573
574template <typename Response>
575void OutputStream<Response>::WriteAndFinish(Response&& response) {
576 WriteAndFinish(response);
577}
578
579template <typename Response>
580void OutputStream<Response>::WriteAndFinish(Response& response) {
581 UINVARIANT(state_ != State::kFinished,
582 "'WriteAndFinish' called on a finished stream");
583 state_ = State::kFinished;
584
585 // Don't buffer writes, otherwise in an event subscription scenario, events
586 // may never actually be delivered
587 grpc::WriteOptions write_options{};
588
589 const auto& status = grpc::Status::OK;
590 LogFinish(status);
591
592 ApplyResponseHook(&response);
593
594 impl::WriteAndFinish(stream_, response, write_options, status, GetCallName());
595 GetStatistics().OnExplicitFinish(grpc::StatusCode::OK);
596 ugrpc::impl::UpdateSpanWithStatus(GetSpan(), status);
597}
598
599template <typename Response>
600bool OutputStream<Response>::IsFinished() const {
601 return state_ == State::kFinished;
602}
603
604template <typename Request, typename Response>
606 impl::CallParams&& call_params,
607 impl::RawReaderWriter<Request, Response>& stream)
608 : CallAnyBase(utils::impl::InternalTag{}, std::move(call_params),
609 CallKind::kBidirectionalStream),
610 stream_(stream) {}
611
612template <typename Request, typename Response>
613BidirectionalStream<Request, Response>::~BidirectionalStream() {
614 if (!is_finished_) {
615 impl::Cancel(stream_, GetCallName());
616 LogFinish(impl::kUnknownErrorStatus);
617 }
618}
619
620template <typename Request, typename Response>
621bool BidirectionalStream<Request, Response>::Read(Request& request) {
622 UINVARIANT(!are_reads_done_,
623 "'Read' called while the stream is half-closed for reads");
624 if (impl::Read(stream_, request)) {
625 if constexpr (std::is_base_of_v<google::protobuf::Message, Request>) {
626 ApplyRequestHook(&request);
627 }
628 return true;
629 } else {
630 are_reads_done_ = true;
631 return false;
632 }
633}
634
635template <typename Request, typename Response>
636void BidirectionalStream<Request, Response>::Write(Response&& response) {
637 Write(response);
638}
639
640template <typename Request, typename Response>
641void BidirectionalStream<Request, Response>::Write(Response& response) {
642 UINVARIANT(!is_finished_, "'Write' called on a finished stream");
643
644 // Don't buffer writes, optimize for ping-pong-style interaction
645 grpc::WriteOptions write_options{};
646
647 if constexpr (std::is_base_of_v<google::protobuf::Message, Response>) {
648 ApplyResponseHook(&response);
649 }
650
651 try {
652 impl::Write(stream_, response, write_options, GetCallName());
653 } catch (const RpcInterruptedError&) {
654 is_finished_ = true;
655 throw;
656 }
657}
658
659template <typename Request, typename Response>
660void BidirectionalStream<Request, Response>::Finish() {
661 UINVARIANT(!is_finished_, "'Finish' called on a finished stream");
662 is_finished_ = true;
663
664 const auto& status = grpc::Status::OK;
665 LogFinish(status);
666 impl::Finish(stream_, status, GetCallName());
667 GetStatistics().OnExplicitFinish(grpc::StatusCode::OK);
668 ugrpc::impl::UpdateSpanWithStatus(GetSpan(), status);
669}
670
671template <typename Request, typename Response>
672void BidirectionalStream<Request, Response>::FinishWithError(
673 const grpc::Status& status) {
674 UASSERT(!status.ok());
675 if (IsFinished()) return;
676 is_finished_ = true;
677 LogFinish(status);
678 impl::Finish(stream_, status, GetCallName());
679 GetStatistics().OnExplicitFinish(status.error_code());
680 ugrpc::impl::UpdateSpanWithStatus(GetSpan(), status);
681}
682
683template <typename Request, typename Response>
684void BidirectionalStream<Request, Response>::WriteAndFinish(
685 Response&& response) {
686 WriteAndFinish(response);
687}
688
689template <typename Request, typename Response>
690void BidirectionalStream<Request, Response>::WriteAndFinish(
691 Response& response) {
692 UINVARIANT(!is_finished_, "'WriteAndFinish' called on a finished stream");
693 is_finished_ = true;
694
695 // Don't buffer writes, optimize for ping-pong-style interaction
696 grpc::WriteOptions write_options{};
697
698 const auto& status = grpc::Status::OK;
699 LogFinish(status);
700
701 if constexpr (std::is_base_of_v<google::protobuf::Message, Response>) {
702 ApplyResponseHook(&response);
703 }
704
705 impl::WriteAndFinish(stream_, response, write_options, status, GetCallName());
706 GetStatistics().OnExplicitFinish(status.error_code());
707 ugrpc::impl::UpdateSpanWithStatus(GetSpan(), status);
708}
709
710template <typename Request, typename Response>
711bool BidirectionalStream<Request, Response>::IsFinished() const {
712 return is_finished_;
713}
714
715} // namespace ugrpc::server
716
717USERVER_NAMESPACE_END