userver: userver/ugrpc/server/rpc.hpp Source File
Loading...
Searching...
No Matches
rpc.hpp
Go to the documentation of this file.
1#pragma once
2
3/// @file userver/ugrpc/server/rpc.hpp
4/// @brief Classes representing an incoming RPC
5
6#include <grpcpp/impl/codegen/proto_utils.h>
7#include <grpcpp/server_context.h>
8
9#include <userver/utils/assert.hpp>
10
11#include <userver/ugrpc/impl/deadline_timepoint.hpp>
12#include <userver/ugrpc/impl/internal_tag_fwd.hpp>
13#include <userver/ugrpc/impl/span.hpp>
14#include <userver/ugrpc/impl/statistics_scope.hpp>
15#include <userver/ugrpc/server/exceptions.hpp>
16#include <userver/ugrpc/server/impl/async_methods.hpp>
17#include <userver/ugrpc/server/impl/call_params.hpp>
18
19USERVER_NAMESPACE_BEGIN
20
21namespace ugrpc::server {
22
23namespace impl {
24
25std::string FormatLogMessage(
26 const std::multimap<grpc::string_ref, grpc::string_ref>& metadata,
27 std::string_view peer, std::chrono::system_clock::time_point start_time,
28 std::string_view call_name, grpc::StatusCode code);
29
30}
31
32/// @brief A non-typed base class for any gRPC call
34 public:
35 CallAnyBase(impl::CallParams&& params) : params_(std::move(params)) {}
36
37 /// @brief Complete the RPC with an error
38 ///
39 /// `Finish` must not be called multiple times for the same RPC.
40 ///
41 /// @param status error details
42 /// @throws ugrpc::server::RpcError on an RPC error
43 virtual void FinishWithError(const grpc::Status& status) = 0;
44
45 /// @returns the `ServerContext` used for this RPC
46 /// @note Initial server metadata is not currently supported
47 /// @note Trailing metadata, if any, must be set before the `Finish` call
48 grpc::ServerContext& GetContext() { return params_.context; }
49
50 /// @brief Name of the call. Consists of service and method names
51 std::string_view GetCallName() const { return params_.call_name; }
52
53 tracing::Span& GetSpan() { return params_.call_span; }
54
55 /// @brief Returns call context for storing per-call custom data
56 ///
57 /// The context can be used to pass data from server middleware to client
58 /// handler or from one middleware to another one.
59 ///
60 /// ## Example usage:
61 ///
62 /// In authentication middleware:
63 ///
64 /// @code
65 /// if (password_is_correct) {
66 /// // Username is authenticated, set it in per-call storage context
67 /// ctx.GetCall().GetStorageContext().Emplace(kAuthUsername, username);
68 /// }
69 /// @endcode
70 ///
71 /// In client handler:
72 ///
73 /// @code
74 /// const auto& username = rpc.GetStorageContext().Get(kAuthUsername);
75 /// auto msg = fmt::format("Hello, {}!", username);
76 /// @endcode
77 utils::AnyStorage<StorageContext>& GetStorageContext() {
78 return params_.storage_context;
79 }
80
81 virtual bool IsFinished() const = 0;
82
83 /// @cond
84 // For internal use only
85 ugrpc::impl::RpcStatisticsScope& Statistics(ugrpc::impl::InternalTag);
86 /// @endcond
87
88 protected:
89 ugrpc::impl::RpcStatisticsScope& Statistics() { return params_.statistics; }
90
91 logging::LoggerRef AccessTskvLogger() { return params_.access_tskv_logger; }
92
93 void LogFinish(grpc::Status status) const;
94
95 private:
96 impl::CallParams params_;
97};
98
99/// @brief Controls a single request -> single response RPC
100///
101/// The RPC is cancelled on destruction unless `Finish` has been called.
102template <typename Response>
103class UnaryCall final : public CallAnyBase {
104 public:
105 /// @brief Complete the RPC successfully
106 ///
107 /// `Finish` must not be called multiple times for the same RPC.
108 ///
109 /// @param response the single Response to send to the client
110 /// @throws ugrpc::server::RpcError on an RPC error
111 void Finish(const Response& response);
112
113 /// @brief Complete the RPC with an error
114 ///
115 /// `Finish` must not be called multiple times for the same RPC.
116 ///
117 /// @param status error details
118 /// @throws ugrpc::server::RpcError on an RPC error
119 void FinishWithError(const grpc::Status& status) override;
120
121 /// For internal use only
122 UnaryCall(impl::CallParams&& call_params,
123 impl::RawResponseWriter<Response>& stream);
124
125 UnaryCall(UnaryCall&&) = delete;
126 UnaryCall& operator=(UnaryCall&&) = delete;
127 ~UnaryCall();
128
129 bool IsFinished() const override;
130
131 private:
132 impl::RawResponseWriter<Response>& stream_;
133 bool is_finished_{false};
134};
135
136/// @brief Controls a request stream -> single response RPC
137///
138/// This class is not thread-safe except for `GetContext`.
139///
140/// The RPC is cancelled on destruction unless the stream has been finished.
141///
142/// If any method throws, further methods must not be called on the same stream,
143/// except for `GetContext`.
144template <typename Request, typename Response>
145class InputStream final : public CallAnyBase {
146 public:
147 /// @brief Await and read the next incoming message
148 /// @param request where to put the request on success
149 /// @returns `true` on success, `false` on end-of-input
150 [[nodiscard]] bool Read(Request& request);
151
152 /// @brief Complete the RPC successfully
153 ///
154 /// `Finish` must not be called multiple times for the same RPC.
155 ///
156 /// @param response the single Response to send to the client
157 /// @throws ugrpc::server::RpcError on an RPC error
158 void Finish(const Response& response);
159
160 /// @brief Complete the RPC with an error
161 ///
162 /// `Finish` must not be called multiple times for the same RPC.
163 ///
164 /// @param status error details
165 /// @throws ugrpc::server::RpcError on an RPC error
166 void FinishWithError(const grpc::Status& status) override;
167
168 /// For internal use only
169 InputStream(impl::CallParams&& call_params,
170 impl::RawReader<Request, Response>& stream);
171
172 InputStream(InputStream&&) = delete;
173 InputStream& operator=(InputStream&&) = delete;
174 ~InputStream();
175
176 bool IsFinished() const override;
177
178 private:
179 enum class State { kOpen, kReadsDone, kFinished };
180
181 impl::RawReader<Request, Response>& stream_;
182 State state_{State::kOpen};
183};
184
185/// @brief Controls a single request -> response stream RPC
186///
187/// This class is not thread-safe except for `GetContext`.
188///
189/// The RPC is cancelled on destruction unless the stream has been finished.
190///
191/// If any method throws, further methods must not be called on the same stream,
192/// except for `GetContext`.
193template <typename Response>
194class OutputStream final : public CallAnyBase {
195 public:
196 /// @brief Write the next outgoing message
197 /// @param response the next message to write
198 /// @throws ugrpc::server::RpcError on an RPC error
199 void Write(const Response& response);
200
201 /// @brief Complete the RPC successfully
202 ///
203 /// `Finish` must not be called multiple times.
204 ///
205 /// @throws ugrpc::server::RpcError on an RPC error
206 void Finish();
207
208 /// @brief Complete the RPC with an error
209 ///
210 /// `Finish` must not be called multiple times.
211 ///
212 /// @param status error details
213 /// @throws ugrpc::server::RpcError on an RPC error
214 void FinishWithError(const grpc::Status& status) override;
215
216 /// @brief Equivalent to `Write + Finish`
217 ///
218 /// This call saves one round-trip, compared to separate `Write` and `Finish`.
219 ///
220 /// `Finish` must not be called multiple times.
221 ///
222 /// @param response the final response message
223 /// @throws ugrpc::server::RpcError on an RPC error
224 void WriteAndFinish(const Response& response);
225
226 /// For internal use only
227 OutputStream(impl::CallParams&& call_params,
228 impl::RawWriter<Response>& stream);
229
230 OutputStream(OutputStream&&) = delete;
231 OutputStream& operator=(OutputStream&&) = delete;
232 ~OutputStream();
233
234 bool IsFinished() const override;
235
236 private:
237 enum class State { kNew, kOpen, kFinished };
238
239 impl::RawWriter<Response>& stream_;
240 State state_{State::kNew};
241};
242
243/// @brief Controls a request stream -> response stream RPC
244///
245/// This class allows the following concurrent calls:
246///
247/// - `GetContext`
248/// - `Read`;
249/// - one of (`Write`, `Finish`, `FinishWithError`, `WriteAndFinish`).
250///
251/// The RPC is cancelled on destruction unless the stream has been finished.
252///
253/// If any method throws, further methods must not be called on the same stream,
254/// except for `GetContext`.
255template <typename Request, typename Response>
257 public:
258 /// @brief Await and read the next incoming message
259 /// @param request where to put the request on success
260 /// @returns `true` on success, `false` on end-of-input
261 /// @throws ugrpc::server::RpcError on an RPC error
262 [[nodiscard]] bool Read(Request& request);
263
264 /// @brief Write the next outgoing message
265 /// @param response the next message to write
266 /// @throws ugrpc::server::RpcError on an RPC error
267 void Write(const Response& response);
268
269 /// @brief Complete the RPC successfully
270 ///
271 /// `Finish` must not be called multiple times.
272 ///
273 /// @throws ugrpc::server::RpcError on an RPC error
274 void Finish();
275
276 /// @brief Complete the RPC with an error
277 ///
278 /// `Finish` must not be called multiple times.
279 ///
280 /// @param status error details
281 /// @throws ugrpc::server::RpcError on an RPC error
282 void FinishWithError(const grpc::Status& status) override;
283
284 /// @brief Equivalent to `Write + Finish`
285 ///
286 /// This call saves one round-trip, compared to separate `Write` and `Finish`.
287 ///
288 /// `Finish` must not be called multiple times.
289 ///
290 /// @param response the final response message
291 /// @throws ugrpc::server::RpcError on an RPC error
292 void WriteAndFinish(const Response& response);
293
294 /// For internal use only
295 BidirectionalStream(impl::CallParams&& call_params,
296 impl::RawReaderWriter<Request, Response>& stream);
297
298 BidirectionalStream(const BidirectionalStream&) = delete;
299 BidirectionalStream(BidirectionalStream&&) = delete;
300 ~BidirectionalStream();
301
302 bool IsFinished() const override;
303
304 private:
305 impl::RawReaderWriter<Request, Response>& stream_;
306 bool are_reads_done_{false};
307 bool is_finished_{false};
308};
309
310// ========================== Implementation follows ==========================
311
312template <typename Response>
313UnaryCall<Response>::UnaryCall(impl::CallParams&& call_params,
314 impl::RawResponseWriter<Response>& stream)
315 : CallAnyBase(std::move(call_params)), stream_(stream) {}
316
317template <typename Response>
318UnaryCall<Response>::~UnaryCall() {
319 if (!is_finished_) {
320 impl::CancelWithError(stream_, GetCallName());
321 LogFinish(impl::kUnknownErrorStatus);
322 }
323}
324
325template <typename Response>
326void UnaryCall<Response>::Finish(const Response& response) {
327 UINVARIANT(!is_finished_, "'Finish' called on a finished call");
328 is_finished_ = true;
329
330 LogFinish(grpc::Status::OK);
331 impl::Finish(stream_, response, grpc::Status::OK, GetCallName());
332 Statistics().OnExplicitFinish(grpc::StatusCode::OK);
333 ugrpc::impl::UpdateSpanWithStatus(GetSpan(), grpc::Status::OK);
334}
335
336template <typename Response>
337void UnaryCall<Response>::FinishWithError(const grpc::Status& status) {
338 UINVARIANT(!is_finished_, "'FinishWithError' called on a finished call");
339 is_finished_ = true;
340 LogFinish(status);
341 impl::FinishWithError(stream_, status, GetCallName());
342 Statistics().OnExplicitFinish(status.error_code());
343 ugrpc::impl::UpdateSpanWithStatus(GetSpan(), status);
344}
345
346template <typename Response>
347bool UnaryCall<Response>::IsFinished() const {
348 return is_finished_;
349}
350
351template <typename Request, typename Response>
352InputStream<Request, Response>::InputStream(
353 impl::CallParams&& call_params, impl::RawReader<Request, Response>& stream)
354 : CallAnyBase(std::move(call_params)), stream_(stream) {}
355
356template <typename Request, typename Response>
357InputStream<Request, Response>::~InputStream() {
358 if (state_ != State::kFinished) {
359 impl::CancelWithError(stream_, GetCallName());
360 LogFinish(impl::kUnknownErrorStatus);
361 }
362}
363
364template <typename Request, typename Response>
365bool InputStream<Request, Response>::Read(Request& request) {
366 UINVARIANT(state_ == State::kOpen,
367 "'Read' called while the stream is half-closed for reads");
368 if (impl::Read(stream_, request)) {
369 return true;
370 } else {
371 state_ = State::kReadsDone;
372 return false;
373 }
374}
375
376template <typename Request, typename Response>
377void InputStream<Request, Response>::Finish(const Response& response) {
378 UINVARIANT(state_ != State::kFinished,
379 "'Finish' called on a finished stream");
380 state_ = State::kFinished;
381 LogFinish(grpc::Status::OK);
382 impl::Finish(stream_, response, grpc::Status::OK, GetCallName());
383 Statistics().OnExplicitFinish(grpc::StatusCode::OK);
384 ugrpc::impl::UpdateSpanWithStatus(GetSpan(), grpc::Status::OK);
385}
386
387template <typename Request, typename Response>
388void InputStream<Request, Response>::FinishWithError(
389 const grpc::Status& status) {
390 UASSERT(!status.ok());
391 UINVARIANT(state_ != State::kFinished,
392 "'FinishWithError' called on a finished stream");
393 state_ = State::kFinished;
394 LogFinish(status);
395 impl::FinishWithError(stream_, status, GetCallName());
396 Statistics().OnExplicitFinish(status.error_code());
397 ugrpc::impl::UpdateSpanWithStatus(GetSpan(), status);
398}
399
400template <typename Request, typename Response>
401bool InputStream<Request, Response>::IsFinished() const {
402 return state_ == State::kFinished;
403}
404
405template <typename Response>
406OutputStream<Response>::OutputStream(impl::CallParams&& call_params,
407 impl::RawWriter<Response>& stream)
408 : CallAnyBase(std::move(call_params)), stream_(stream) {}
409
410template <typename Response>
411OutputStream<Response>::~OutputStream() {
412 if (state_ != State::kFinished) {
413 impl::Cancel(stream_, GetCallName());
414 LogFinish(impl::kUnknownErrorStatus);
415 }
416}
417
418template <typename Response>
419void OutputStream<Response>::Write(const Response& response) {
420 UINVARIANT(state_ != State::kFinished, "'Write' called on a finished stream");
421
422 // For some reason, gRPC requires explicit 'SendInitialMetadata' in output
423 // streams
424 impl::SendInitialMetadataIfNew(stream_, GetCallName(), state_);
425
426 // Don't buffer writes, otherwise in an event subscription scenario, events
427 // may never actually be delivered
428 grpc::WriteOptions write_options{};
429
430 impl::Write(stream_, response, write_options, GetCallName());
431}
432
433template <typename Response>
434void OutputStream<Response>::Finish() {
435 UINVARIANT(state_ != State::kFinished,
436 "'Finish' called on a finished stream");
437 state_ = State::kFinished;
438 const auto status = grpc::Status::OK;
439 LogFinish(status);
440 impl::Finish(stream_, status, GetCallName());
441 Statistics().OnExplicitFinish(grpc::StatusCode::OK);
442 ugrpc::impl::UpdateSpanWithStatus(GetSpan(), status);
443}
444
445template <typename Response>
446void OutputStream<Response>::FinishWithError(const grpc::Status& status) {
447 UASSERT(!status.ok());
448 UINVARIANT(state_ != State::kFinished,
449 "'Finish' called on a finished stream");
450 state_ = State::kFinished;
451 LogFinish(status);
452 impl::Finish(stream_, status, GetCallName());
453 Statistics().OnExplicitFinish(status.error_code());
454 ugrpc::impl::UpdateSpanWithStatus(GetSpan(), status);
455}
456
457template <typename Response>
458void OutputStream<Response>::WriteAndFinish(const Response& response) {
459 UINVARIANT(state_ != State::kFinished,
460 "'WriteAndFinish' called on a finished stream");
461 state_ = State::kFinished;
462
463 // Don't buffer writes, otherwise in an event subscription scenario, events
464 // may never actually be delivered
465 grpc::WriteOptions write_options{};
466
467 const auto status = grpc::Status::OK;
468 LogFinish(status);
469 impl::WriteAndFinish(stream_, response, write_options, status, GetCallName());
470}
471
472template <typename Response>
473bool OutputStream<Response>::IsFinished() const {
474 return state_ == State::kFinished;
475}
476
477template <typename Request, typename Response>
479 impl::CallParams&& call_params,
480 impl::RawReaderWriter<Request, Response>& stream)
481 : CallAnyBase(std::move(call_params)), stream_(stream) {}
482
483template <typename Request, typename Response>
484BidirectionalStream<Request, Response>::~BidirectionalStream() {
485 if (!is_finished_) {
486 impl::Cancel(stream_, GetCallName());
487 LogFinish(impl::kUnknownErrorStatus);
488 }
489}
490
491template <typename Request, typename Response>
492bool BidirectionalStream<Request, Response>::Read(Request& request) {
493 UINVARIANT(!are_reads_done_,
494 "'Read' called while the stream is half-closed for reads");
495 if (impl::Read(stream_, request)) {
496 return true;
497 } else {
498 are_reads_done_ = true;
499 return false;
500 }
501}
502
503template <typename Request, typename Response>
504void BidirectionalStream<Request, Response>::Write(const Response& response) {
505 UINVARIANT(!is_finished_, "'Write' called on a finished stream");
506
507 // Don't buffer writes, optimize for ping-pong-style interaction
508 grpc::WriteOptions write_options{};
509
510 try {
511 impl::Write(stream_, response, write_options, GetCallName());
512 } catch (const RpcInterruptedError&) {
513 is_finished_ = true;
514 throw;
515 }
516}
517
518template <typename Request, typename Response>
519void BidirectionalStream<Request, Response>::Finish() {
520 UINVARIANT(!is_finished_, "'Finish' called on a finished stream");
521 is_finished_ = true;
522 const auto status = grpc::Status::OK;
523 LogFinish(status);
524 impl::Finish(stream_, status, GetCallName());
525 Statistics().OnExplicitFinish(grpc::StatusCode::OK);
526 ugrpc::impl::UpdateSpanWithStatus(GetSpan(), status);
527}
528
529template <typename Request, typename Response>
530void BidirectionalStream<Request, Response>::FinishWithError(
531 const grpc::Status& status) {
532 UASSERT(!status.ok());
533 UINVARIANT(!is_finished_, "'FinishWithError' called on a finished stream");
534 is_finished_ = true;
535 LogFinish(status);
536 impl::Finish(stream_, status, GetCallName());
537 Statistics().OnExplicitFinish(status.error_code());
538 ugrpc::impl::UpdateSpanWithStatus(GetSpan(), status);
539}
540
541template <typename Request, typename Response>
542void BidirectionalStream<Request, Response>::WriteAndFinish(
543 const Response& response) {
544 UINVARIANT(!is_finished_, "'WriteAndFinish' called on a finished stream");
545 is_finished_ = true;
546
547 // Don't buffer writes, optimize for ping-pong-style interaction
548 grpc::WriteOptions write_options{};
549
550 const auto status = grpc::Status::OK;
551 LogFinish(status);
552 impl::WriteAndFinish(stream_, response, write_options, status, GetCallName());
553}
554
555template <typename Request, typename Response>
556bool BidirectionalStream<Request, Response>::IsFinished() const {
557 return is_finished_;
558}
559
560} // namespace ugrpc::server
561
562USERVER_NAMESPACE_END