userver: userver/ugrpc/server/rpc.hpp Source File
Loading...
Searching...
No Matches
rpc.hpp
Go to the documentation of this file.
1#pragma once
2
3/// @file userver/ugrpc/server/rpc.hpp
4/// @brief Classes representing an incoming RPC
5
6#include <grpcpp/impl/codegen/proto_utils.h>
7#include <grpcpp/server_context.h>
8
9#include <userver/utils/assert.hpp>
10
11#include <userver/ugrpc/impl/deadline_timepoint.hpp>
12#include <userver/ugrpc/impl/internal_tag_fwd.hpp>
13#include <userver/ugrpc/impl/span.hpp>
14#include <userver/ugrpc/impl/statistics_scope.hpp>
15#include <userver/ugrpc/server/exceptions.hpp>
16#include <userver/ugrpc/server/impl/async_methods.hpp>
17#include <userver/ugrpc/server/impl/call_params.hpp>
18
19USERVER_NAMESPACE_BEGIN
20
21namespace ugrpc::server {
22
23namespace impl {
24
25std::string FormatLogMessage(
26 const std::multimap<grpc::string_ref, grpc::string_ref>& metadata,
27 std::string_view peer, std::chrono::system_clock::time_point start_time,
28 std::string_view call_name, int code);
29
30}
31
32/// @brief A non-typed base class for any gRPC call
34 public:
35 CallAnyBase(impl::CallParams&& params) : params_(params) {}
36
37 /// @brief Complete the RPC with an error
38 ///
39 /// `Finish` must not be called multiple times for the same RPC.
40 ///
41 /// @param status error details
42 /// @throws ugrpc::server::RpcError on an RPC error
43 virtual void FinishWithError(const grpc::Status& status) = 0;
44
45 /// @returns the `ServerContext` used for this RPC
46 /// @note Initial server metadata is not currently supported
47 /// @note Trailing metadata, if any, must be set before the `Finish` call
48 grpc::ServerContext& GetContext() { return params_.context; }
49
50 /// @brief Name of the call. Consists of service and method names
51 std::string_view GetCallName() const { return params_.call_name; }
52
53 tracing::Span& GetSpan() { return params_.call_span; }
54
55 virtual bool IsFinished() const = 0;
56
57 /// @cond
58 // For internal use only
59 ugrpc::impl::RpcStatisticsScope& Statistics(ugrpc::impl::InternalTag);
60 /// @endcond
61
62 protected:
63 ugrpc::impl::RpcStatisticsScope& Statistics() { return params_.statistics; }
64
65 logging::LoggerRef AccessTskvLogger() { return params_.access_tskv_logger; }
66
67 void LogFinish(grpc::Status status) const;
68
69 private:
70 impl::CallParams params_;
71};
72
73/// @brief Controls a single request -> single response RPC
74///
75/// The RPC is cancelled on destruction unless `Finish` has been called.
76template <typename Response>
77class UnaryCall final : public CallAnyBase {
78 public:
79 /// @brief Complete the RPC successfully
80 ///
81 /// `Finish` must not be called multiple times for the same RPC.
82 ///
83 /// @param response the single Response to send to the client
84 /// @throws ugrpc::server::RpcError on an RPC error
85 void Finish(const Response& response);
86
87 /// @brief Complete the RPC with an error
88 ///
89 /// `Finish` must not be called multiple times for the same RPC.
90 ///
91 /// @param status error details
92 /// @throws ugrpc::server::RpcError on an RPC error
93 void FinishWithError(const grpc::Status& status) override;
94
95 /// For internal use only
96 UnaryCall(impl::CallParams&& call_params,
97 impl::RawResponseWriter<Response>& stream);
98
99 UnaryCall(UnaryCall&&) = delete;
100 UnaryCall& operator=(UnaryCall&&) = delete;
101 ~UnaryCall();
102
103 bool IsFinished() const override;
104
105 private:
106 impl::RawResponseWriter<Response>& stream_;
107 bool is_finished_{false};
108};
109
110/// @brief Controls a request stream -> single response RPC
111///
112/// This class is not thread-safe except for `GetContext`.
113///
114/// The RPC is cancelled on destruction unless the stream has been finished.
115///
116/// If any method throws, further methods must not be called on the same stream,
117/// except for `GetContext`.
118template <typename Request, typename Response>
119class InputStream final : public CallAnyBase {
120 public:
121 /// @brief Await and read the next incoming message
122 /// @param request where to put the request on success
123 /// @returns `true` on success, `false` on end-of-input
124 [[nodiscard]] bool Read(Request& request);
125
126 /// @brief Complete the RPC successfully
127 ///
128 /// `Finish` must not be called multiple times for the same RPC.
129 ///
130 /// @param response the single Response to send to the client
131 /// @throws ugrpc::server::RpcError on an RPC error
132 void Finish(const Response& response);
133
134 /// @brief Complete the RPC with an error
135 ///
136 /// `Finish` must not be called multiple times for the same RPC.
137 ///
138 /// @param status error details
139 /// @throws ugrpc::server::RpcError on an RPC error
140 void FinishWithError(const grpc::Status& status) override;
141
142 /// For internal use only
143 InputStream(impl::CallParams&& call_params,
144 impl::RawReader<Request, Response>& stream);
145
146 InputStream(InputStream&&) = delete;
147 InputStream& operator=(InputStream&&) = delete;
148 ~InputStream();
149
150 bool IsFinished() const override;
151
152 private:
153 enum class State { kOpen, kReadsDone, kFinished };
154
155 impl::RawReader<Request, Response>& stream_;
156 State state_{State::kOpen};
157};
158
159/// @brief Controls a single request -> response stream RPC
160///
161/// This class is not thread-safe except for `GetContext`.
162///
163/// The RPC is cancelled on destruction unless the stream has been finished.
164///
165/// If any method throws, further methods must not be called on the same stream,
166/// except for `GetContext`.
167template <typename Response>
168class OutputStream final : public CallAnyBase {
169 public:
170 /// @brief Write the next outgoing message
171 /// @param response the next message to write
172 /// @throws ugrpc::server::RpcError on an RPC error
173 void Write(const Response& response);
174
175 /// @brief Complete the RPC successfully
176 ///
177 /// `Finish` must not be called multiple times.
178 ///
179 /// @throws ugrpc::server::RpcError on an RPC error
180 void Finish();
181
182 /// @brief Complete the RPC with an error
183 ///
184 /// `Finish` must not be called multiple times.
185 ///
186 /// @param status error details
187 /// @throws ugrpc::server::RpcError on an RPC error
188 void FinishWithError(const grpc::Status& status) override;
189
190 /// @brief Equivalent to `Write + Finish`
191 ///
192 /// This call saves one round-trip, compared to separate `Write` and `Finish`.
193 ///
194 /// `Finish` must not be called multiple times.
195 ///
196 /// @param response the final response message
197 /// @throws ugrpc::server::RpcError on an RPC error
198 void WriteAndFinish(const Response& response);
199
200 /// For internal use only
201 OutputStream(impl::CallParams&& call_params,
202 impl::RawWriter<Response>& stream);
203
204 OutputStream(OutputStream&&) = delete;
205 OutputStream& operator=(OutputStream&&) = delete;
206 ~OutputStream();
207
208 bool IsFinished() const override;
209
210 private:
211 enum class State { kNew, kOpen, kFinished };
212
213 impl::RawWriter<Response>& stream_;
214 State state_{State::kNew};
215};
216
217/// @brief Controls a request stream -> response stream RPC
218///
219/// This class is not thread-safe except for `GetContext`.
220///
221/// The RPC is cancelled on destruction unless the stream has been finished.
222///
223/// If any method throws, further methods must not be called on the same stream,
224/// except for `GetContext`.
225template <typename Request, typename Response>
227 public:
228 /// @brief Await and read the next incoming message
229 /// @param request where to put the request on success
230 /// @returns `true` on success, `false` on end-of-input
231 /// @throws ugrpc::server::RpcError on an RPC error
232 [[nodiscard]] bool Read(Request& request);
233
234 /// @brief Write the next outgoing message
235 /// @param response the next message to write
236 /// @throws ugrpc::server::RpcError on an RPC error
237 void Write(const Response& response);
238
239 /// @brief Complete the RPC successfully
240 ///
241 /// `Finish` must not be called multiple times.
242 ///
243 /// @throws ugrpc::server::RpcError on an RPC error
244 void Finish();
245
246 /// @brief Complete the RPC with an error
247 ///
248 /// `Finish` must not be called multiple times.
249 ///
250 /// @param status error details
251 /// @throws ugrpc::server::RpcError on an RPC error
252 void FinishWithError(const grpc::Status& status) override;
253
254 /// @brief Equivalent to `Write + Finish`
255 ///
256 /// This call saves one round-trip, compared to separate `Write` and `Finish`.
257 ///
258 /// `Finish` must not be called multiple times.
259 ///
260 /// @param response the final response message
261 /// @throws ugrpc::server::RpcError on an RPC error
262 void WriteAndFinish(const Response& response);
263
264 /// For internal use only
265 BidirectionalStream(impl::CallParams&& call_params,
266 impl::RawReaderWriter<Request, Response>& stream);
267
268 BidirectionalStream(const BidirectionalStream&) = delete;
269 BidirectionalStream(BidirectionalStream&&) = delete;
270 ~BidirectionalStream();
271
272 bool IsFinished() const override;
273
274 private:
275 enum class State { kOpen, kReadsDone, kFinished };
276
277 impl::RawReaderWriter<Request, Response>& stream_;
278 State state_{State::kOpen};
279};
280
281// ========================== Implementation follows ==========================
282
283template <typename Response>
284UnaryCall<Response>::UnaryCall(impl::CallParams&& call_params,
285 impl::RawResponseWriter<Response>& stream)
286 : CallAnyBase(std::move(call_params)), stream_(stream) {}
287
288template <typename Response>
289UnaryCall<Response>::~UnaryCall() {
290 if (!is_finished_) {
291 impl::CancelWithError(stream_, GetCallName());
292 LogFinish(impl::kUnknownErrorStatus);
293 }
294}
295
296template <typename Response>
297void UnaryCall<Response>::Finish(const Response& response) {
298 UINVARIANT(!is_finished_, "'Finish' called on a finished call");
299 is_finished_ = true;
300
301 LogFinish(grpc::Status::OK);
302 impl::Finish(stream_, response, grpc::Status::OK, GetCallName());
303 Statistics().OnExplicitFinish(grpc::StatusCode::OK);
304 ugrpc::impl::UpdateSpanWithStatus(GetSpan(), grpc::Status::OK);
305}
306
307template <typename Response>
308void UnaryCall<Response>::FinishWithError(const grpc::Status& status) {
309 UINVARIANT(!is_finished_, "'FinishWithError' called on a finished call");
310 is_finished_ = true;
311 LogFinish(status);
312 impl::FinishWithError(stream_, status, GetCallName());
313 Statistics().OnExplicitFinish(status.error_code());
314 ugrpc::impl::UpdateSpanWithStatus(GetSpan(), status);
315}
316
317template <typename Response>
318bool UnaryCall<Response>::IsFinished() const {
319 return is_finished_;
320}
321
322template <typename Request, typename Response>
323InputStream<Request, Response>::InputStream(
324 impl::CallParams&& call_params, impl::RawReader<Request, Response>& stream)
325 : CallAnyBase(std::move(call_params)), stream_(stream) {}
326
327template <typename Request, typename Response>
328InputStream<Request, Response>::~InputStream() {
329 if (state_ != State::kFinished) {
330 impl::CancelWithError(stream_, GetCallName());
331 LogFinish(impl::kUnknownErrorStatus);
332 }
333}
334
335template <typename Request, typename Response>
336bool InputStream<Request, Response>::Read(Request& request) {
337 UINVARIANT(state_ == State::kOpen,
338 "'Read' called while the stream is half-closed for reads");
339 if (impl::Read(stream_, request)) {
340 return true;
341 } else {
342 state_ = State::kReadsDone;
343 return false;
344 }
345}
346
347template <typename Request, typename Response>
348void InputStream<Request, Response>::Finish(const Response& response) {
349 UINVARIANT(state_ != State::kFinished,
350 "'Finish' called on a finished stream");
351 state_ = State::kFinished;
352 LogFinish(grpc::Status::OK);
353 impl::Finish(stream_, response, grpc::Status::OK, GetCallName());
354 Statistics().OnExplicitFinish(grpc::StatusCode::OK);
355 ugrpc::impl::UpdateSpanWithStatus(GetSpan(), grpc::Status::OK);
356}
357
358template <typename Request, typename Response>
359void InputStream<Request, Response>::FinishWithError(
360 const grpc::Status& status) {
361 UASSERT(!status.ok());
362 UINVARIANT(state_ != State::kFinished,
363 "'FinishWithError' called on a finished stream");
364 state_ = State::kFinished;
365 LogFinish(status);
366 impl::FinishWithError(stream_, status, GetCallName());
367 Statistics().OnExplicitFinish(status.error_code());
368 ugrpc::impl::UpdateSpanWithStatus(GetSpan(), status);
369}
370
371template <typename Request, typename Response>
372bool InputStream<Request, Response>::IsFinished() const {
373 return state_ == State::kFinished;
374}
375
376template <typename Response>
377OutputStream<Response>::OutputStream(impl::CallParams&& call_params,
378 impl::RawWriter<Response>& stream)
379 : CallAnyBase(std::move(call_params)), stream_(stream) {}
380
381template <typename Response>
382OutputStream<Response>::~OutputStream() {
383 if (state_ != State::kFinished) {
384 impl::Cancel(stream_, GetCallName());
385 LogFinish(impl::kUnknownErrorStatus);
386 }
387}
388
389template <typename Response>
390void OutputStream<Response>::Write(const Response& response) {
391 UINVARIANT(state_ != State::kFinished, "'Write' called on a finished stream");
392
393 // For some reason, gRPC requires explicit 'SendInitialMetadata' in output
394 // streams
395 impl::SendInitialMetadataIfNew(stream_, GetCallName(), state_);
396
397 // Don't buffer writes, otherwise in an event subscription scenario, events
398 // may never actually be delivered
399 grpc::WriteOptions write_options{};
400
401 impl::Write(stream_, response, write_options, GetCallName());
402}
403
404template <typename Response>
405void OutputStream<Response>::Finish() {
406 UINVARIANT(state_ != State::kFinished,
407 "'Finish' called on a finished stream");
408 state_ = State::kFinished;
409 const auto status = grpc::Status::OK;
410 LogFinish(status);
411 impl::Finish(stream_, status, GetCallName());
412 Statistics().OnExplicitFinish(grpc::StatusCode::OK);
413 ugrpc::impl::UpdateSpanWithStatus(GetSpan(), status);
414}
415
416template <typename Response>
417void OutputStream<Response>::FinishWithError(const grpc::Status& status) {
418 UASSERT(!status.ok());
419 UINVARIANT(state_ != State::kFinished,
420 "'Finish' called on a finished stream");
421 state_ = State::kFinished;
422 LogFinish(status);
423 impl::Finish(stream_, status, GetCallName());
424 Statistics().OnExplicitFinish(status.error_code());
425 ugrpc::impl::UpdateSpanWithStatus(GetSpan(), status);
426}
427
428template <typename Response>
429void OutputStream<Response>::WriteAndFinish(const Response& response) {
430 UINVARIANT(state_ != State::kFinished,
431 "'WriteAndFinish' called on a finished stream");
432 state_ = State::kFinished;
433
434 // Don't buffer writes, otherwise in an event subscription scenario, events
435 // may never actually be delivered
436 grpc::WriteOptions write_options{};
437
438 const auto status = grpc::Status::OK;
439 LogFinish(status);
440 impl::WriteAndFinish(stream_, response, write_options, status, GetCallName());
441}
442
443template <typename Response>
444bool OutputStream<Response>::IsFinished() const {
445 return state_ == State::kFinished;
446}
447
448template <typename Request, typename Response>
450 impl::CallParams&& call_params,
451 impl::RawReaderWriter<Request, Response>& stream)
452 : CallAnyBase(std::move(call_params)), stream_(stream) {}
453
454template <typename Request, typename Response>
455BidirectionalStream<Request, Response>::~BidirectionalStream() {
456 if (state_ != State::kFinished) {
457 impl::Cancel(stream_, GetCallName());
458 LogFinish(impl::kUnknownErrorStatus);
459 }
460}
461
462template <typename Request, typename Response>
463bool BidirectionalStream<Request, Response>::Read(Request& request) {
464 UINVARIANT(state_ == State::kOpen,
465 "'Read' called while the stream is half-closed for reads");
466 if (impl::Read(stream_, request)) {
467 return true;
468 } else {
469 state_ = State::kReadsDone;
470 return false;
471 }
472}
473
474template <typename Request, typename Response>
475void BidirectionalStream<Request, Response>::Write(const Response& response) {
476 UINVARIANT(state_ != State::kFinished, "'Write' called on a finished stream");
477
478 // Don't buffer writes, optimize for ping-pong-style interaction
479 grpc::WriteOptions write_options{};
480
481 impl::Write(stream_, response, write_options, GetCallName());
482}
483
484template <typename Request, typename Response>
485void BidirectionalStream<Request, Response>::Finish() {
486 UINVARIANT(state_ != State::kFinished,
487 "'Finish' called on a finished stream");
488 state_ = State::kFinished;
489 const auto status = grpc::Status::OK;
490 LogFinish(status);
491 impl::Finish(stream_, status, GetCallName());
492 Statistics().OnExplicitFinish(grpc::StatusCode::OK);
493 ugrpc::impl::UpdateSpanWithStatus(GetSpan(), status);
494}
495
496template <typename Request, typename Response>
497void BidirectionalStream<Request, Response>::FinishWithError(
498 const grpc::Status& status) {
499 UASSERT(!status.ok());
500 UINVARIANT(state_ != State::kFinished,
501 "'FinishWithError' called on a finished stream");
502 state_ = State::kFinished;
503 LogFinish(status);
504 impl::Finish(stream_, status, GetCallName());
505 Statistics().OnExplicitFinish(status.error_code());
506 ugrpc::impl::UpdateSpanWithStatus(GetSpan(), status);
507}
508
509template <typename Request, typename Response>
510void BidirectionalStream<Request, Response>::WriteAndFinish(
511 const Response& response) {
512 UINVARIANT(state_ != State::kFinished,
513 "'WriteAndFinish' called on a finished stream");
514 state_ = State::kFinished;
515
516 // Don't buffer writes, optimize for ping-pong-style interaction
517 grpc::WriteOptions write_options{};
518
519 const auto status = grpc::Status::OK;
520 LogFinish(status);
521 impl::WriteAndFinish(stream_, response, write_options, status, GetCallName());
522}
523
524template <typename Request, typename Response>
525bool BidirectionalStream<Request, Response>::IsFinished() const {
526 return state_ == State::kFinished;
527}
528
529} // namespace ugrpc::server
530
531USERVER_NAMESPACE_END