userver: userver/ugrpc/server/rpc.hpp Source File
⚠️ This is the documentation for an old userver version. Click here to switch to the latest version.
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages Concepts
rpc.hpp
Go to the documentation of this file.
1#pragma once
2
3/// @file userver/ugrpc/server/rpc.hpp
4/// @brief Classes representing an incoming RPC
5
6#include <grpcpp/impl/codegen/proto_utils.h>
7#include <grpcpp/server_context.h>
8
9#include <userver/utils/assert.hpp>
10
11#include <userver/ugrpc/impl/deadline_timepoint.hpp>
12#include <userver/ugrpc/impl/internal_tag_fwd.hpp>
13#include <userver/ugrpc/impl/span.hpp>
14#include <userver/ugrpc/impl/statistics_scope.hpp>
15#include <userver/ugrpc/server/exceptions.hpp>
16#include <userver/ugrpc/server/impl/async_methods.hpp>
17#include <userver/ugrpc/server/impl/call_params.hpp>
18
19USERVER_NAMESPACE_BEGIN
20
21namespace ugrpc::server {
22
23namespace impl {
24
25std::string FormatLogMessage(
26 const std::multimap<grpc::string_ref, grpc::string_ref>& metadata,
27 std::string_view peer, std::chrono::system_clock::time_point start_time,
28 std::string_view call_name, grpc::StatusCode code);
29
30}
31
32/// @brief A non-typed base class for any gRPC call
34 public:
35 CallAnyBase(impl::CallParams&& params) : params_(std::move(params)) {}
36
37 /// @brief Complete the RPC with an error
38 ///
39 /// `Finish` must not be called multiple times for the same RPC.
40 ///
41 /// @param status error details
42 /// @throws ugrpc::server::RpcError on an RPC error
43 virtual void FinishWithError(const grpc::Status& status) = 0;
44
45 /// @returns the `ServerContext` used for this RPC
46 /// @note Initial server metadata is not currently supported
47 /// @note Trailing metadata, if any, must be set before the `Finish` call
48 grpc::ServerContext& GetContext() { return params_.context; }
49
50 /// @brief Name of the call. Consists of service and method names
51 std::string_view GetCallName() const { return params_.call_name; }
52
53 tracing::Span& GetSpan() { return params_.call_span; }
54
55 /// @brief Returns call context for storing per-call custom data
56 ///
57 /// The context can be used to pass data from server middleware to client
58 /// handler or from one middleware to another one.
59 ///
60 /// ## Example usage:
61 ///
62 /// In authentication middleware:
63 ///
64 /// @code
65 /// if (password_is_correct) {
66 /// // Username is authenticated, set it in per-call storage context
67 /// ctx.GetCall().GetStorageContext().Emplace(kAuthUsername, username);
68 /// }
69 /// @endcode
70 ///
71 /// In client handler:
72 ///
73 /// @code
74 /// const auto& username = rpc.GetStorageContext().Get(kAuthUsername);
75 /// auto msg = fmt::format("Hello, {}!", username);
76 /// @endcode
77 utils::AnyStorage<StorageContext>& GetStorageContext() {
78 return params_.storage_context;
79 }
80
81 virtual bool IsFinished() const = 0;
82
83 /// @cond
84 // For internal use only
85 ugrpc::impl::RpcStatisticsScope& Statistics(ugrpc::impl::InternalTag);
86 /// @endcond
87
88 protected:
89 ugrpc::impl::RpcStatisticsScope& Statistics() { return params_.statistics; }
90
91 logging::LoggerRef AccessTskvLogger() { return params_.access_tskv_logger; }
92
93 void LogFinish(grpc::Status status) const;
94
95 private:
96 impl::CallParams params_;
97};
98
99/// @brief Controls a single request -> single response RPC
100///
101/// The RPC is cancelled on destruction unless `Finish` has been called.
102template <typename Response>
103class UnaryCall final : public CallAnyBase {
104 public:
105 /// @brief Complete the RPC successfully
106 ///
107 /// `Finish` must not be called multiple times for the same RPC.
108 ///
109 /// @param response the single Response to send to the client
110 /// @throws ugrpc::server::RpcError on an RPC error
111 void Finish(const Response& response);
112
113 /// @brief Complete the RPC with an error
114 ///
115 /// `Finish` must not be called multiple times for the same RPC.
116 ///
117 /// @param status error details
118 /// @throws ugrpc::server::RpcError on an RPC error
119 void FinishWithError(const grpc::Status& status) override;
120
121 /// For internal use only
122 UnaryCall(impl::CallParams&& call_params,
123 impl::RawResponseWriter<Response>& stream);
124
125 UnaryCall(UnaryCall&&) = delete;
126 UnaryCall& operator=(UnaryCall&&) = delete;
127 ~UnaryCall();
128
129 bool IsFinished() const override;
130
131 private:
132 impl::RawResponseWriter<Response>& stream_;
133 bool is_finished_{false};
134};
135
136/// @brief Controls a request stream -> single response RPC
137///
138/// This class is not thread-safe except for `GetContext`.
139///
140/// The RPC is cancelled on destruction unless the stream has been finished.
141///
142/// If any method throws, further methods must not be called on the same stream,
143/// except for `GetContext`.
144template <typename Request, typename Response>
145class InputStream final : public CallAnyBase {
146 public:
147 /// @brief Await and read the next incoming message
148 /// @param request where to put the request on success
149 /// @returns `true` on success, `false` on end-of-input
150 [[nodiscard]] bool Read(Request& request);
151
152 /// @brief Complete the RPC successfully
153 ///
154 /// `Finish` must not be called multiple times for the same RPC.
155 ///
156 /// @param response the single Response to send to the client
157 /// @throws ugrpc::server::RpcError on an RPC error
158 void Finish(const Response& response);
159
160 /// @brief Complete the RPC with an error
161 ///
162 /// `Finish` must not be called multiple times for the same RPC.
163 ///
164 /// @param status error details
165 /// @throws ugrpc::server::RpcError on an RPC error
166 void FinishWithError(const grpc::Status& status) override;
167
168 /// For internal use only
169 InputStream(impl::CallParams&& call_params,
170 impl::RawReader<Request, Response>& stream);
171
172 InputStream(InputStream&&) = delete;
173 InputStream& operator=(InputStream&&) = delete;
174 ~InputStream();
175
176 bool IsFinished() const override;
177
178 private:
179 enum class State { kOpen, kReadsDone, kFinished };
180
181 impl::RawReader<Request, Response>& stream_;
182 State state_{State::kOpen};
183};
184
185/// @brief Controls a single request -> response stream RPC
186///
187/// This class is not thread-safe except for `GetContext`.
188///
189/// The RPC is cancelled on destruction unless the stream has been finished.
190///
191/// If any method throws, further methods must not be called on the same stream,
192/// except for `GetContext`.
193template <typename Response>
194class OutputStream final : public CallAnyBase {
195 public:
196 /// @brief Write the next outgoing message
197 /// @param response the next message to write
198 /// @throws ugrpc::server::RpcError on an RPC error
199 void Write(const Response& response);
200
201 /// @brief Complete the RPC successfully
202 ///
203 /// `Finish` must not be called multiple times.
204 ///
205 /// @throws ugrpc::server::RpcError on an RPC error
206 void Finish();
207
208 /// @brief Complete the RPC with an error
209 ///
210 /// `Finish` must not be called multiple times.
211 ///
212 /// @param status error details
213 /// @throws ugrpc::server::RpcError on an RPC error
214 void FinishWithError(const grpc::Status& status) override;
215
216 /// @brief Equivalent to `Write + Finish`
217 ///
218 /// This call saves one round-trip, compared to separate `Write` and `Finish`.
219 ///
220 /// `Finish` must not be called multiple times.
221 ///
222 /// @param response the final response message
223 /// @throws ugrpc::server::RpcError on an RPC error
224 void WriteAndFinish(const Response& response);
225
226 /// For internal use only
227 OutputStream(impl::CallParams&& call_params,
228 impl::RawWriter<Response>& stream);
229
230 OutputStream(OutputStream&&) = delete;
231 OutputStream& operator=(OutputStream&&) = delete;
232 ~OutputStream();
233
234 bool IsFinished() const override;
235
236 private:
237 enum class State { kNew, kOpen, kFinished };
238
239 impl::RawWriter<Response>& stream_;
240 State state_{State::kNew};
241};
242
243/// @brief Controls a request stream -> response stream RPC
244///
245/// This class allows the following concurrent calls:
246///
247/// - `GetContext`
248/// - `Read`;
249/// - one of (`Write`, `Finish`, `FinishWithError`, `WriteAndFinish`).
250///
251/// The RPC is cancelled on destruction unless the stream has been finished.
252///
253/// If any method throws, further methods must not be called on the same stream,
254/// except for `GetContext`.
255template <typename Request, typename Response>
257 public:
258 /// @brief Await and read the next incoming message
259 /// @param request where to put the request on success
260 /// @returns `true` on success, `false` on end-of-input
261 /// @throws ugrpc::server::RpcError on an RPC error
262 [[nodiscard]] bool Read(Request& request);
263
264 /// @brief Write the next outgoing message
265 /// @param response the next message to write
266 /// @throws ugrpc::server::RpcError on an RPC error
267 void Write(const Response& response);
268
269 /// @brief Complete the RPC successfully
270 ///
271 /// `Finish` must not be called multiple times.
272 ///
273 /// @throws ugrpc::server::RpcError on an RPC error
274 void Finish();
275
276 /// @brief Complete the RPC with an error
277 ///
278 /// `Finish` must not be called multiple times.
279 ///
280 /// @param status error details
281 /// @throws ugrpc::server::RpcError on an RPC error
282 void FinishWithError(const grpc::Status& status) override;
283
284 /// @brief Equivalent to `Write + Finish`
285 ///
286 /// This call saves one round-trip, compared to separate `Write` and `Finish`.
287 ///
288 /// `Finish` must not be called multiple times.
289 ///
290 /// @param response the final response message
291 /// @throws ugrpc::server::RpcError on an RPC error
292 void WriteAndFinish(const Response& response);
293
294 /// For internal use only
295 BidirectionalStream(impl::CallParams&& call_params,
296 impl::RawReaderWriter<Request, Response>& stream);
297
298 BidirectionalStream(const BidirectionalStream&) = delete;
299 BidirectionalStream(BidirectionalStream&&) = delete;
300 ~BidirectionalStream();
301
302 bool IsFinished() const override;
303
304 private:
305 impl::RawReaderWriter<Request, Response>& stream_;
306 bool are_reads_done_{false};
307 bool is_finished_{false};
308};
309
310// ========================== Implementation follows ==========================
311
312template <typename Response>
313UnaryCall<Response>::UnaryCall(impl::CallParams&& call_params,
314 impl::RawResponseWriter<Response>& stream)
315 : CallAnyBase(std::move(call_params)), stream_(stream) {}
316
317template <typename Response>
318UnaryCall<Response>::~UnaryCall() {
319 if (!is_finished_) {
320 impl::CancelWithError(stream_, GetCallName());
321 LogFinish(impl::kUnknownErrorStatus);
322 }
323}
324
325template <typename Response>
326void UnaryCall<Response>::Finish(const Response& response) {
327 UINVARIANT(!is_finished_, "'Finish' called on a finished call");
328 is_finished_ = true;
329
330 LogFinish(grpc::Status::OK);
331 impl::Finish(stream_, response, grpc::Status::OK, GetCallName());
332 Statistics().OnExplicitFinish(grpc::StatusCode::OK);
333 ugrpc::impl::UpdateSpanWithStatus(GetSpan(), grpc::Status::OK);
334}
335
336template <typename Response>
337void UnaryCall<Response>::FinishWithError(const grpc::Status& status) {
338 UINVARIANT(!is_finished_, "'FinishWithError' called on a finished call");
339 is_finished_ = true;
340 LogFinish(status);
341 impl::FinishWithError(stream_, status, GetCallName());
342 Statistics().OnExplicitFinish(status.error_code());
343 ugrpc::impl::UpdateSpanWithStatus(GetSpan(), status);
344}
345
346template <typename Response>
347bool UnaryCall<Response>::IsFinished() const {
348 return is_finished_;
349}
350
351template <typename Request, typename Response>
352InputStream<Request, Response>::InputStream(
353 impl::CallParams&& call_params, impl::RawReader<Request, Response>& stream)
354 : CallAnyBase(std::move(call_params)), stream_(stream) {}
355
356template <typename Request, typename Response>
357InputStream<Request, Response>::~InputStream() {
358 if (state_ != State::kFinished) {
359 impl::CancelWithError(stream_, GetCallName());
360 LogFinish(impl::kUnknownErrorStatus);
361 }
362}
363
364template <typename Request, typename Response>
365bool InputStream<Request, Response>::Read(Request& request) {
366 UINVARIANT(state_ == State::kOpen,
367 "'Read' called while the stream is half-closed for reads");
368 if (impl::Read(stream_, request)) {
369 return true;
370 } else {
371 state_ = State::kReadsDone;
372 return false;
373 }
374}
375
376template <typename Request, typename Response>
377void InputStream<Request, Response>::Finish(const Response& response) {
378 UINVARIANT(state_ != State::kFinished,
379 "'Finish' called on a finished stream");
380 state_ = State::kFinished;
381 LogFinish(grpc::Status::OK);
382 impl::Finish(stream_, response, grpc::Status::OK, GetCallName());
383 Statistics().OnExplicitFinish(grpc::StatusCode::OK);
384 ugrpc::impl::UpdateSpanWithStatus(GetSpan(), grpc::Status::OK);
385}
386
387template <typename Request, typename Response>
388void InputStream<Request, Response>::FinishWithError(
389 const grpc::Status& status) {
390 UASSERT(!status.ok());
391 UINVARIANT(state_ != State::kFinished,
392 "'FinishWithError' called on a finished stream");
393 state_ = State::kFinished;
394 LogFinish(status);
395 impl::FinishWithError(stream_, status, GetCallName());
396 Statistics().OnExplicitFinish(status.error_code());
397 ugrpc::impl::UpdateSpanWithStatus(GetSpan(), status);
398}
399
400template <typename Request, typename Response>
401bool InputStream<Request, Response>::IsFinished() const {
402 return state_ == State::kFinished;
403}
404
405template <typename Response>
406OutputStream<Response>::OutputStream(impl::CallParams&& call_params,
407 impl::RawWriter<Response>& stream)
408 : CallAnyBase(std::move(call_params)), stream_(stream) {}
409
410template <typename Response>
411OutputStream<Response>::~OutputStream() {
412 if (state_ != State::kFinished) {
413 impl::Cancel(stream_, GetCallName());
414 LogFinish(impl::kUnknownErrorStatus);
415 }
416}
417
418template <typename Response>
419void OutputStream<Response>::Write(const Response& response) {
420 UINVARIANT(state_ != State::kFinished, "'Write' called on a finished stream");
421
422 // For some reason, gRPC requires explicit 'SendInitialMetadata' in output
423 // streams
424 impl::SendInitialMetadataIfNew(stream_, GetCallName(), state_);
425
426 // Don't buffer writes, otherwise in an event subscription scenario, events
427 // may never actually be delivered
428 grpc::WriteOptions write_options{};
429
430 impl::Write(stream_, response, write_options, GetCallName());
431}
432
433template <typename Response>
434void OutputStream<Response>::Finish() {
435 UINVARIANT(state_ != State::kFinished,
436 "'Finish' called on a finished stream");
437 state_ = State::kFinished;
438 const auto status = grpc::Status::OK;
439 LogFinish(status);
440 impl::Finish(stream_, status, GetCallName());
441 Statistics().OnExplicitFinish(grpc::StatusCode::OK);
442 ugrpc::impl::UpdateSpanWithStatus(GetSpan(), status);
443}
444
445template <typename Response>
446void OutputStream<Response>::FinishWithError(const grpc::Status& status) {
447 UASSERT(!status.ok());
448 UINVARIANT(state_ != State::kFinished,
449 "'Finish' called on a finished stream");
450 state_ = State::kFinished;
451 LogFinish(status);
452 impl::Finish(stream_, status, GetCallName());
453 Statistics().OnExplicitFinish(status.error_code());
454 ugrpc::impl::UpdateSpanWithStatus(GetSpan(), status);
455}
456
457template <typename Response>
458void OutputStream<Response>::WriteAndFinish(const Response& response) {
459 UINVARIANT(state_ != State::kFinished,
460 "'WriteAndFinish' called on a finished stream");
461 state_ = State::kFinished;
462
463 // Don't buffer writes, otherwise in an event subscription scenario, events
464 // may never actually be delivered
465 grpc::WriteOptions write_options{};
466
467 const auto status = grpc::Status::OK;
468 LogFinish(status);
469 impl::WriteAndFinish(stream_, response, write_options, status, GetCallName());
470}
471
472template <typename Response>
473bool OutputStream<Response>::IsFinished() const {
474 return state_ == State::kFinished;
475}
476
477template <typename Request, typename Response>
479 impl::CallParams&& call_params,
480 impl::RawReaderWriter<Request, Response>& stream)
481 : CallAnyBase(std::move(call_params)), stream_(stream) {}
482
483template <typename Request, typename Response>
484BidirectionalStream<Request, Response>::~BidirectionalStream() {
485 if (!is_finished_) {
486 impl::Cancel(stream_, GetCallName());
487 LogFinish(impl::kUnknownErrorStatus);
488 }
489}
490
491template <typename Request, typename Response>
492bool BidirectionalStream<Request, Response>::Read(Request& request) {
493 UINVARIANT(!are_reads_done_,
494 "'Read' called while the stream is half-closed for reads");
495 if (impl::Read(stream_, request)) {
496 return true;
497 } else {
498 are_reads_done_ = true;
499 return false;
500 }
501}
502
503template <typename Request, typename Response>
504void BidirectionalStream<Request, Response>::Write(const Response& response) {
505 UINVARIANT(!is_finished_, "'Write' called on a finished stream");
506
507 // Don't buffer writes, optimize for ping-pong-style interaction
508 grpc::WriteOptions write_options{};
509
510 try {
511 impl::Write(stream_, response, write_options, GetCallName());
512 } catch (const RpcInterruptedError&) {
513 is_finished_ = true;
514 throw;
515 }
516}
517
518template <typename Request, typename Response>
519void BidirectionalStream<Request, Response>::Finish() {
520 UINVARIANT(!is_finished_, "'Finish' called on a finished stream");
521 is_finished_ = true;
522 const auto status = grpc::Status::OK;
523 LogFinish(status);
524 impl::Finish(stream_, status, GetCallName());
525 Statistics().OnExplicitFinish(grpc::StatusCode::OK);
526 ugrpc::impl::UpdateSpanWithStatus(GetSpan(), status);
527}
528
529template <typename Request, typename Response>
530void BidirectionalStream<Request, Response>::FinishWithError(
531 const grpc::Status& status) {
532 UASSERT(!status.ok());
533 UINVARIANT(!is_finished_, "'FinishWithError' called on a finished stream");
534 is_finished_ = true;
535 LogFinish(status);
536 impl::Finish(stream_, status, GetCallName());
537 Statistics().OnExplicitFinish(status.error_code());
538 ugrpc::impl::UpdateSpanWithStatus(GetSpan(), status);
539}
540
541template <typename Request, typename Response>
542void BidirectionalStream<Request, Response>::WriteAndFinish(
543 const Response& response) {
544 UINVARIANT(!is_finished_, "'WriteAndFinish' called on a finished stream");
545 is_finished_ = true;
546
547 // Don't buffer writes, optimize for ping-pong-style interaction
548 grpc::WriteOptions write_options{};
549
550 const auto status = grpc::Status::OK;
551 LogFinish(status);
552 impl::WriteAndFinish(stream_, response, write_options, status, GetCallName());
553}
554
555template <typename Request, typename Response>
556bool BidirectionalStream<Request, Response>::IsFinished() const {
557 return is_finished_;
558}
559
560} // namespace ugrpc::server
561
562USERVER_NAMESPACE_END