userver: userver/ugrpc/server/rpc.hpp Source File
Loading...
Searching...
No Matches
rpc.hpp
Go to the documentation of this file.
1#pragma once
2
3/// @file userver/ugrpc/server/rpc.hpp
4/// @brief Classes representing an incoming RPC
5
6#include <google/protobuf/message.h>
7
8#include <userver/utils/assert.hpp>
9
10#include <userver/ugrpc/impl/deadline_timepoint.hpp>
11#include <userver/ugrpc/impl/internal_tag_fwd.hpp>
12#include <userver/ugrpc/impl/span.hpp>
13#include <userver/ugrpc/server/call.hpp>
14#include <userver/ugrpc/server/exceptions.hpp>
15#include <userver/ugrpc/server/impl/async_methods.hpp>
16#include <userver/ugrpc/server/stream.hpp>
17
18USERVER_NAMESPACE_BEGIN
19
20namespace ugrpc::server {
21
22/// @brief Controls a single request -> single response RPC
23///
24/// The RPC is cancelled on destruction unless `Finish` has been called.
25template <typename Response>
26class UnaryCall final : public CallAnyBase {
27public:
28 /// @brief Complete the RPC successfully
29 ///
30 /// `Finish` must not be called multiple times for the same RPC.
31 ///
32 /// @param response the single Response to send to the client
33 /// @throws ugrpc::server::RpcError on an RPC error
34 void Finish(Response& response);
35
36 /// @brief Complete the RPC successfully
37 ///
38 /// `Finish` must not be called multiple times for the same RPC.
39 ///
40 /// @param response the single Response to send to the client
41 /// @throws ugrpc::server::RpcError on an RPC error
42 void Finish(Response&& response);
43
44 /// @brief Complete the RPC with an error
45 ///
46 /// `Finish` must not be called multiple times for the same RPC.
47 ///
48 /// @param status error details
49 /// @throws ugrpc::server::RpcError on an RPC error
50 void FinishWithError(const grpc::Status& status) override;
51
52 /// For internal use only
53 UnaryCall(impl::CallParams&& call_params, impl::RawResponseWriter<Response>& stream);
54
55 UnaryCall(UnaryCall&&) = delete;
56 UnaryCall& operator=(UnaryCall&&) = delete;
57 ~UnaryCall();
58
59 bool IsFinished() const override;
60
61private:
62 impl::RawResponseWriter<Response>& stream_;
63 bool is_finished_{false};
64};
65
66/// @brief Controls a request stream -> single response RPC
67///
68/// This class is not thread-safe except for `GetContext`.
69///
70/// The RPC is cancelled on destruction unless the stream has been finished.
71///
72/// If any method throws, further methods must not be called on the same stream,
73/// except for `GetContext`.
74template <typename Request, typename Response>
75class InputStream final : public CallAnyBase, public Reader<Request> {
76public:
77 /// @brief Await and read the next incoming message
78 /// @param request where to put the request on success
79 /// @returns `true` on success, `false` on end-of-input
80 bool Read(Request& request) override;
81
82 /// @brief Complete the RPC successfully
83 ///
84 /// `Finish` must not be called multiple times for the same RPC.
85 ///
86 /// @param response the single Response to send to the client
87 /// @throws ugrpc::server::RpcError on an RPC error
88 void Finish(Response& response);
89
90 /// @brief Complete the RPC successfully
91 ///
92 /// `Finish` must not be called multiple times for the same RPC.
93 ///
94 /// @param response the single Response to send to the client
95 /// @throws ugrpc::server::RpcError on an RPC error
96 void Finish(Response&& response);
97
98 /// @brief Complete the RPC with an error
99 ///
100 /// `Finish` must not be called multiple times for the same RPC.
101 ///
102 /// @param status error details
103 /// @throws ugrpc::server::RpcError on an RPC error
104 void FinishWithError(const grpc::Status& status) override;
105
106 /// For internal use only
107 InputStream(impl::CallParams&& call_params, impl::RawReader<Request, Response>& stream);
108
109 InputStream(InputStream&&) = delete;
110 InputStream& operator=(InputStream&&) = delete;
111 ~InputStream() override;
112
113 bool IsFinished() const override;
114
115private:
116 enum class State { kOpen, kReadsDone, kFinished };
117
118 impl::RawReader<Request, Response>& stream_;
119 State state_{State::kOpen};
120};
121
122/// @brief Controls a single request -> response stream RPC
123///
124/// This class is not thread-safe except for `GetContext`.
125///
126/// The RPC is cancelled on destruction unless the stream has been finished.
127///
128/// If any method throws, further methods must not be called on the same stream,
129/// except for `GetContext`.
130template <typename Response>
131class OutputStream final : public CallAnyBase, public Writer<Response> {
132public:
133 /// @brief Write the next outgoing message
134 /// @param response the next message to write
135 /// @throws ugrpc::server::RpcError on an RPC error
136 void Write(Response& response) override;
137
138 /// @brief Write the next outgoing message
139 /// @param response the next message to write
140 /// @throws ugrpc::server::RpcError on an RPC error
141 void Write(Response&& response) override;
142
143 /// @brief Complete the RPC successfully
144 ///
145 /// `Finish` must not be called multiple times.
146 ///
147 /// @throws ugrpc::server::RpcError on an RPC error
148 void Finish();
149
150 /// @brief Complete the RPC with an error
151 ///
152 /// `Finish` must not be called multiple times.
153 ///
154 /// @param status error details
155 /// @throws ugrpc::server::RpcError on an RPC error
156 void FinishWithError(const grpc::Status& status) override;
157
158 /// @brief Equivalent to `Write + Finish`
159 ///
160 /// This call saves one round-trip, compared to separate `Write` and `Finish`.
161 ///
162 /// `Finish` must not be called multiple times.
163 ///
164 /// @param response the final response message
165 /// @throws ugrpc::server::RpcError on an RPC error
166 void WriteAndFinish(Response& response);
167
168 /// @brief Equivalent to `Write + Finish`
169 ///
170 /// This call saves one round-trip, compared to separate `Write` and `Finish`.
171 ///
172 /// `Finish` must not be called multiple times.
173 ///
174 /// @param response the final response message
175 /// @throws ugrpc::server::RpcError on an RPC error
176 void WriteAndFinish(Response&& response);
177
178 /// For internal use only
179 OutputStream(impl::CallParams&& call_params, impl::RawWriter<Response>& stream);
180
181 OutputStream(OutputStream&&) = delete;
182 OutputStream& operator=(OutputStream&&) = delete;
183 ~OutputStream() override;
184
185 bool IsFinished() const override;
186
187private:
188 enum class State { kNew, kOpen, kFinished };
189
190 impl::RawWriter<Response>& stream_;
191 State state_{State::kNew};
192};
193
194/// @brief Controls a request stream -> response stream RPC
195///
196/// This class allows the following concurrent calls:
197///
198/// - `GetContext`
199/// - `Read`;
200/// - one of (`Write`, `Finish`, `FinishWithError`, `WriteAndFinish`).
201///
202/// The RPC is cancelled on destruction unless the stream has been finished.
203///
204/// If any method throws, further methods must not be called on the same stream,
205/// except for `GetContext`.
206template <typename Request, typename Response>
207class BidirectionalStream : public CallAnyBase, public ReaderWriter<Request, Response> {
208public:
209 /// @brief Await and read the next incoming message
210 /// @param request where to put the request on success
211 /// @returns `true` on success, `false` on end-of-input
212 /// @throws ugrpc::server::RpcError on an RPC error
213 bool Read(Request& request) override;
214
215 /// @brief Write the next outgoing message
216 /// @param response the next message to write
217 /// @throws ugrpc::server::RpcError on an RPC error
218 void Write(Response& response) override;
219
220 /// @brief Write the next outgoing message
221 /// @param response the next message to write
222 /// @throws ugrpc::server::RpcError on an RPC error
223 void Write(Response&& response) override;
224
225 /// @brief Complete the RPC successfully
226 ///
227 /// `Finish` must not be called multiple times.
228 ///
229 /// @throws ugrpc::server::RpcError on an RPC error
230 void Finish();
231
232 /// @brief Complete the RPC with an error
233 ///
234 /// `Finish` must not be called multiple times.
235 ///
236 /// @param status error details
237 /// @throws ugrpc::server::RpcError on an RPC error
238 void FinishWithError(const grpc::Status& status) override;
239
240 /// @brief Equivalent to `Write + Finish`
241 ///
242 /// This call saves one round-trip, compared to separate `Write` and `Finish`.
243 ///
244 /// `Finish` must not be called multiple times.
245 ///
246 /// @param response the final response message
247 /// @throws ugrpc::server::RpcError on an RPC error
248 void WriteAndFinish(Response& response);
249
250 /// @brief Equivalent to `Write + Finish`
251 ///
252 /// This call saves one round-trip, compared to separate `Write` and `Finish`.
253 ///
254 /// `Finish` must not be called multiple times.
255 ///
256 /// @param response the final response message
257 /// @throws ugrpc::server::RpcError on an RPC error
258 void WriteAndFinish(Response&& response);
259
260 /// For internal use only
261 BidirectionalStream(impl::CallParams&& call_params, impl::RawReaderWriter<Request, Response>& stream);
262
263 BidirectionalStream(const BidirectionalStream&) = delete;
264 BidirectionalStream(BidirectionalStream&&) = delete;
265 ~BidirectionalStream() override;
266
267 bool IsFinished() const override;
268
269private:
270 impl::RawReaderWriter<Request, Response>& stream_;
271 bool are_reads_done_{false};
272 bool is_finished_{false};
273};
274
275template <typename Response>
276UnaryCall<Response>::UnaryCall(impl::CallParams&& call_params, impl::RawResponseWriter<Response>& stream)
277 : CallAnyBase(utils::impl::InternalTag{}, std::move(call_params), CallKind::kUnaryCall), stream_(stream) {}
278
279template <typename Response>
280UnaryCall<Response>::~UnaryCall() {
281 if (!is_finished_) {
282 impl::CancelWithError(stream_, GetCallName());
283 LogFinish(impl::kUnknownErrorStatus);
284 }
285}
286
287template <typename Response>
288void UnaryCall<Response>::Finish(Response&& response) {
289 Finish(response);
290}
291
292template <typename Response>
293void UnaryCall<Response>::Finish(Response& response) {
294 UINVARIANT(!is_finished_, "'Finish' called on a finished call");
295 ApplyResponseHook(&response);
296
297 // It is important to set is_finished_ after ApplyResponseHook.
298 // Otherwise, there would be no way to call FinishWithError there.
299 is_finished_ = true;
300
301 LogFinish(grpc::Status::OK);
302 impl::Finish(stream_, response, grpc::Status::OK, GetCallName());
303 GetStatistics().OnExplicitFinish(grpc::StatusCode::OK);
304 ugrpc::impl::UpdateSpanWithStatus(GetSpan(), grpc::Status::OK);
305}
306
307template <typename Response>
308void UnaryCall<Response>::FinishWithError(const grpc::Status& status) {
309 if (IsFinished()) return;
310 is_finished_ = true;
311 LogFinish(status);
312 impl::FinishWithError(stream_, status, GetCallName());
313 GetStatistics().OnExplicitFinish(status.error_code());
314 ugrpc::impl::UpdateSpanWithStatus(GetSpan(), status);
315}
316
317template <typename Response>
318bool UnaryCall<Response>::IsFinished() const {
319 return is_finished_;
320}
321
322template <typename Request, typename Response>
323InputStream<Request, Response>::InputStream(impl::CallParams&& call_params, impl::RawReader<Request, Response>& stream)
324 : CallAnyBase(utils::impl::InternalTag{}, std::move(call_params), CallKind::kRequestStream), stream_(stream) {}
325
326template <typename Request, typename Response>
327InputStream<Request, Response>::~InputStream() {
328 if (state_ != State::kFinished) {
329 impl::CancelWithError(stream_, GetCallName());
330 LogFinish(impl::kUnknownErrorStatus);
331 }
332}
333
334template <typename Request, typename Response>
335bool InputStream<Request, Response>::Read(Request& request) {
336 UINVARIANT(state_ == State::kOpen, "'Read' called while the stream is half-closed for reads");
337 if (impl::Read(stream_, request)) {
338 ApplyRequestHook(&request);
339 return true;
340 } else {
341 state_ = State::kReadsDone;
342 return false;
343 }
344}
345
346template <typename Request, typename Response>
347void InputStream<Request, Response>::Finish(Response&& response) {
348 Finish(response);
349}
350
351template <typename Request, typename Response>
352void InputStream<Request, Response>::Finish(Response& response) {
353 UINVARIANT(state_ != State::kFinished, "'Finish' called on a finished stream");
354 ApplyResponseHook(&response);
355
356 // It is important to set the state_ after ApplyResponseHook.
357 // Otherwise, there would be no way to call FinishWithError there.
358 state_ = State::kFinished;
359
360 const auto& status = grpc::Status::OK;
361 LogFinish(status);
362
363 impl::Finish(stream_, response, status, GetCallName());
364 GetStatistics().OnExplicitFinish(status.error_code());
365 ugrpc::impl::UpdateSpanWithStatus(GetSpan(), status);
366}
367
368template <typename Request, typename Response>
369void InputStream<Request, Response>::FinishWithError(const grpc::Status& status) {
370 UASSERT(!status.ok());
371 if (IsFinished()) return;
372 state_ = State::kFinished;
373 LogFinish(status);
374 impl::FinishWithError(stream_, status, GetCallName());
375 GetStatistics().OnExplicitFinish(status.error_code());
376 ugrpc::impl::UpdateSpanWithStatus(GetSpan(), status);
377}
378
379template <typename Request, typename Response>
380bool InputStream<Request, Response>::IsFinished() const {
381 return state_ == State::kFinished;
382}
383
384template <typename Response>
385OutputStream<Response>::OutputStream(impl::CallParams&& call_params, impl::RawWriter<Response>& stream)
386 : CallAnyBase(utils::impl::InternalTag{}, std::move(call_params), CallKind::kResponseStream), stream_(stream) {}
387
388template <typename Response>
389OutputStream<Response>::~OutputStream() {
390 if (state_ != State::kFinished) {
391 impl::Cancel(stream_, GetCallName());
392 LogFinish(impl::kUnknownErrorStatus);
393 }
394}
395
396template <typename Response>
397void OutputStream<Response>::Write(Response&& response) {
398 Write(response);
399}
400
401template <typename Response>
402void OutputStream<Response>::Write(Response& response) {
403 UINVARIANT(state_ != State::kFinished, "'Write' called on a finished stream");
404 ApplyResponseHook(&response);
405
406 // For some reason, gRPC requires explicit 'SendInitialMetadata' in output
407 // streams
408 impl::SendInitialMetadataIfNew(stream_, GetCallName(), state_);
409
410 // Don't buffer writes, otherwise in an event subscription scenario, events
411 // may never actually be delivered
412 grpc::WriteOptions write_options{};
413
414 impl::Write(stream_, response, write_options, GetCallName());
415}
416
417template <typename Response>
418void OutputStream<Response>::Finish() {
419 UINVARIANT(state_ != State::kFinished, "'Finish' called on a finished stream");
420 state_ = State::kFinished;
421
422 const auto& status = grpc::Status::OK;
423 LogFinish(status);
424 impl::Finish(stream_, status, GetCallName());
425 GetStatistics().OnExplicitFinish(grpc::StatusCode::OK);
426 ugrpc::impl::UpdateSpanWithStatus(GetSpan(), status);
427}
428
429template <typename Response>
430void OutputStream<Response>::FinishWithError(const grpc::Status& status) {
431 UASSERT(!status.ok());
432 if (IsFinished()) return;
433 state_ = State::kFinished;
434 LogFinish(status);
435 impl::Finish(stream_, status, GetCallName());
436 GetStatistics().OnExplicitFinish(status.error_code());
437 ugrpc::impl::UpdateSpanWithStatus(GetSpan(), status);
438}
439
440template <typename Response>
441void OutputStream<Response>::WriteAndFinish(Response&& response) {
442 WriteAndFinish(response);
443}
444
445template <typename Response>
446void OutputStream<Response>::WriteAndFinish(Response& response) {
447 UINVARIANT(state_ != State::kFinished, "'WriteAndFinish' called on a finished stream");
448 ApplyResponseHook(&response);
449
450 // It is important to set the state_ after ApplyResponseHook.
451 // Otherwise, there would be no way to call FinishWithError there.
452 state_ = State::kFinished;
453
454 // Don't buffer writes, otherwise in an event subscription scenario, events
455 // may never actually be delivered
456 grpc::WriteOptions write_options{};
457
458 const auto& status = grpc::Status::OK;
459 LogFinish(status);
460
461 impl::WriteAndFinish(stream_, response, write_options, status, GetCallName());
462 GetStatistics().OnExplicitFinish(grpc::StatusCode::OK);
463 ugrpc::impl::UpdateSpanWithStatus(GetSpan(), status);
464}
465
466template <typename Response>
467bool OutputStream<Response>::IsFinished() const {
468 return state_ == State::kFinished;
469}
470
471template <typename Request, typename Response>
473 impl::CallParams&& call_params,
474 impl::RawReaderWriter<Request, Response>& stream
475)
476 : CallAnyBase(utils::impl::InternalTag{}, std::move(call_params), CallKind::kBidirectionalStream),
477 stream_(stream) {}
478
479template <typename Request, typename Response>
480BidirectionalStream<Request, Response>::~BidirectionalStream() {
481 if (!is_finished_) {
482 impl::Cancel(stream_, GetCallName());
483 LogFinish(impl::kUnknownErrorStatus);
484 }
485}
486
487template <typename Request, typename Response>
488bool BidirectionalStream<Request, Response>::Read(Request& request) {
489 UINVARIANT(!are_reads_done_, "'Read' called while the stream is half-closed for reads");
490 if (impl::Read(stream_, request)) {
491 if constexpr (std::is_base_of_v<google::protobuf::Message, Request>) {
492 ApplyRequestHook(&request);
493 }
494 return true;
495 } else {
496 are_reads_done_ = true;
497 return false;
498 }
499}
500
501template <typename Request, typename Response>
502void BidirectionalStream<Request, Response>::Write(Response&& response) {
503 Write(response);
504}
505
506template <typename Request, typename Response>
507void BidirectionalStream<Request, Response>::Write(Response& response) {
508 UINVARIANT(!is_finished_, "'Write' called on a finished stream");
509 if constexpr (std::is_base_of_v<google::protobuf::Message, Response>) {
510 ApplyResponseHook(&response);
511 }
512
513 // Don't buffer writes, optimize for ping-pong-style interaction
514 grpc::WriteOptions write_options{};
515
516 try {
517 impl::Write(stream_, response, write_options, GetCallName());
518 } catch (const RpcInterruptedError&) {
519 is_finished_ = true;
520 throw;
521 }
522}
523
524template <typename Request, typename Response>
525void BidirectionalStream<Request, Response>::Finish() {
526 UINVARIANT(!is_finished_, "'Finish' called on a finished stream");
527 is_finished_ = true;
528
529 const auto& status = grpc::Status::OK;
530 LogFinish(status);
531 impl::Finish(stream_, status, GetCallName());
532 GetStatistics().OnExplicitFinish(grpc::StatusCode::OK);
533 ugrpc::impl::UpdateSpanWithStatus(GetSpan(), status);
534}
535
536template <typename Request, typename Response>
537void BidirectionalStream<Request, Response>::FinishWithError(const grpc::Status& status) {
538 UASSERT(!status.ok());
539 if (IsFinished()) return;
540 is_finished_ = true;
541 LogFinish(status);
542 impl::Finish(stream_, status, GetCallName());
543 GetStatistics().OnExplicitFinish(status.error_code());
544 ugrpc::impl::UpdateSpanWithStatus(GetSpan(), status);
545}
546
547template <typename Request, typename Response>
548void BidirectionalStream<Request, Response>::WriteAndFinish(Response&& response) {
549 WriteAndFinish(response);
550}
551
552template <typename Request, typename Response>
553void BidirectionalStream<Request, Response>::WriteAndFinish(Response& response) {
554 UINVARIANT(!is_finished_, "'WriteAndFinish' called on a finished stream");
555 if constexpr (std::is_base_of_v<google::protobuf::Message, Response>) {
556 ApplyResponseHook(&response);
557 }
558
559 // It is important to set is_finished_ after ApplyResponseHook.
560 // Otherwise, there would be no way to call FinishWithError there.
561 is_finished_ = true;
562
563 // Don't buffer writes, optimize for ping-pong-style interaction
564 grpc::WriteOptions write_options{};
565
566 const auto& status = grpc::Status::OK;
567 LogFinish(status);
568
569 impl::WriteAndFinish(stream_, response, write_options, status, GetCallName());
570 GetStatistics().OnExplicitFinish(status.error_code());
571 ugrpc::impl::UpdateSpanWithStatus(GetSpan(), status);
572}
573
574template <typename Request, typename Response>
575bool BidirectionalStream<Request, Response>::IsFinished() const {
576 return is_finished_;
577}
578
579} // namespace ugrpc::server
580
581USERVER_NAMESPACE_END