userver: userver/ugrpc/server/impl/async_methods.hpp Source File
⚠️ This is the documentation for an old userver version. Click here to switch to the latest version.
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages Concepts
async_methods.hpp
1#pragma once
2
3#include <memory>
4#include <string_view>
5#include <utility>
6
7#include <grpcpp/impl/codegen/async_stream.h>
8#include <grpcpp/impl/codegen/async_unary_call.h>
9#include <grpcpp/impl/codegen/status.h>
10
11#include <userver/ugrpc/server/exceptions.hpp>
12#include <userver/ugrpc/server/impl/async_method_invocation.hpp>
13
14USERVER_NAMESPACE_BEGIN
15
16namespace ugrpc::server::impl {
17
18/// @{
19/// @brief Helper type aliases for low-level asynchronous gRPC streams
20/// @see <grpcpp/impl/codegen/async_unary_call_impl.h>
21/// @see <grpcpp/impl/codegen/async_stream_impl.h>
22template <typename Response>
23using RawResponseWriter = grpc::ServerAsyncResponseWriter<Response>;
24
25template <typename Request, typename Response>
26using RawReader = grpc::ServerAsyncReader<Response, Request>;
27
28template <typename Response>
29using RawWriter = grpc::ServerAsyncWriter<Response>;
30
31template <typename Request, typename Response>
32using RawReaderWriter = grpc::ServerAsyncReaderWriter<Response, Request>;
33/// @}
34
35using ugrpc::impl::AsyncMethodInvocation;
36
37void ReportErrorWhileCancelling(std::string_view call_name) noexcept;
38
39void ThrowOnError(impl::AsyncMethodInvocation::WaitStatus status,
40 std::string_view call_name, std::string_view stage_name);
41
42extern const grpc::Status kUnimplementedStatus;
43extern const grpc::Status kUnknownErrorStatus;
44
45template <typename GrpcStream, typename Response>
46void Finish(GrpcStream& stream, const Response& response,
47 const grpc::Status& status, std::string_view call_name) {
48 AsyncMethodInvocation finish;
49 stream.Finish(response, status, finish.GetTag());
50 ThrowOnError(Wait(finish), call_name, "Finish");
51}
52
53template <typename GrpcStream>
54void Finish(GrpcStream& stream, const grpc::Status& status,
55 std::string_view call_name) {
56 AsyncMethodInvocation finish;
57 stream.Finish(status, finish.GetTag());
58 ThrowOnError(Wait(finish), call_name, "Finish");
59}
60
61template <typename GrpcStream>
62void Cancel(GrpcStream& stream, std::string_view call_name) noexcept {
63 AsyncMethodInvocation cancel;
64 stream.Finish(kUnknownErrorStatus, cancel.GetTag());
65 if (Wait(cancel) != impl::AsyncMethodInvocation::WaitStatus::kOk)
66 ReportErrorWhileCancelling(call_name);
67}
68
69template <typename GrpcStream>
70void CancelWithError(GrpcStream& stream, std::string_view call_name) noexcept {
71 AsyncMethodInvocation cancel;
72 stream.FinishWithError(kUnknownErrorStatus, cancel.GetTag());
73 if (Wait(cancel) != impl::AsyncMethodInvocation::WaitStatus::kOk)
74 ReportErrorWhileCancelling(call_name);
75}
76
77template <typename GrpcStream>
78void FinishWithError(GrpcStream& stream, const grpc::Status& status,
79 std::string_view call_name) {
80 AsyncMethodInvocation finish;
81 stream.FinishWithError(status, finish.GetTag());
82 ThrowOnError(Wait(finish), call_name, "FinishWithError");
83}
84
85template <typename GrpcStream>
86void SendInitialMetadata(GrpcStream& stream, std::string_view call_name) {
87 AsyncMethodInvocation metadata;
88 stream.SendInitialMetadata(metadata.GetTag());
89 ThrowOnError(Wait(metadata), call_name, "SendInitialMetadata");
90}
91
92template <typename GrpcStream, typename Request>
93bool Read(GrpcStream& stream, Request& request) {
94 AsyncMethodInvocation read;
95 stream.Read(&request, read.GetTag());
96 return Wait(read) == impl::AsyncMethodInvocation::WaitStatus::kOk;
97}
98
99template <typename GrpcStream, typename Response>
100void Write(GrpcStream& stream, const Response& response,
101 grpc::WriteOptions options, std::string_view call_name) {
102 AsyncMethodInvocation write;
103 stream.Write(response, options, write.GetTag());
104 ThrowOnError(Wait(write), call_name, "Write");
105}
106
107template <typename GrpcStream, typename Response>
108void WriteAndFinish(GrpcStream& stream, const Response& response,
109 grpc::WriteOptions options, const grpc::Status& status,
110 std::string_view call_name) {
111 AsyncMethodInvocation write_and_finish;
112 stream.WriteAndFinish(response, options, status, write_and_finish.GetTag());
113 ThrowOnError(Wait(write_and_finish), call_name, "WriteAndFinish");
114}
115
116template <typename GrpcStream, typename State>
117void SendInitialMetadataIfNew(GrpcStream& stream, std::string_view call_name,
118 State& state) {
119 if (state == State::kNew) {
120 state = State::kOpen;
121 SendInitialMetadata(stream, call_name);
122 }
123}
124
125} // namespace ugrpc::server::impl
126
127USERVER_NAMESPACE_END