userver: userver/ugrpc/server/impl/async_methods.hpp Source File
Loading...
Searching...
No Matches
async_methods.hpp
1#pragma once
2
3#include <memory>
4#include <string_view>
5#include <utility>
6
7#include <grpcpp/impl/codegen/async_stream.h>
8#include <grpcpp/impl/codegen/async_unary_call.h>
9#include <grpcpp/impl/codegen/status.h>
10
11#include <userver/ugrpc/server/exceptions.hpp>
12#include <userver/ugrpc/server/impl/async_method_invocation.hpp>
13
14USERVER_NAMESPACE_BEGIN
15
16namespace ugrpc::server::impl {
17
18/// @{
19/// @brief Helper type aliases for low-level asynchronous gRPC streams
20/// @see <grpcpp/impl/codegen/async_unary_call_impl.h>
21/// @see <grpcpp/impl/codegen/async_stream_impl.h>
22template <typename Response>
23using RawResponseWriter = grpc::ServerAsyncResponseWriter<Response>;
24
25template <typename Request, typename Response>
26using RawReader = grpc::ServerAsyncReader<Response, Request>;
27
28template <typename Response>
29using RawWriter = grpc::ServerAsyncWriter<Response>;
30
31template <typename Request, typename Response>
32using RawReaderWriter = grpc::ServerAsyncReaderWriter<Response, Request>;
33/// @}
34
35using ugrpc::impl::AsyncMethodInvocation;
36
37void ReportErrorWhileCancelling(std::string_view call_name) noexcept;
38
39void ThrowOnError(impl::AsyncMethodInvocation::WaitStatus status,
40 std::string_view call_name, std::string_view stage_name);
41
42extern const grpc::Status kUnimplementedStatus;
43extern const grpc::Status kUnknownErrorStatus;
44
45template <typename GrpcStream, typename Response>
46void Finish(GrpcStream& stream, const Response& response,
47 const grpc::Status& status, std::string_view call_name) {
48 AsyncMethodInvocation finish;
49 stream.Finish(response, status, finish.GetTag());
50 ThrowOnError(Wait(finish), call_name, "Finish");
51}
52
53template <typename GrpcStream>
54void Finish(GrpcStream& stream, const grpc::Status& status,
55 std::string_view call_name) {
56 AsyncMethodInvocation finish;
57 stream.Finish(status, finish.GetTag());
58 ThrowOnError(Wait(finish), call_name, "Finish");
59}
60
61template <typename GrpcStream>
62void Cancel(GrpcStream& stream, std::string_view call_name) noexcept {
63 AsyncMethodInvocation cancel;
64 stream.Finish(kUnknownErrorStatus, cancel.GetTag());
65 if (Wait(cancel) != impl::AsyncMethodInvocation::WaitStatus::kOk)
66 ReportErrorWhileCancelling(call_name);
67}
68
69template <typename GrpcStream>
70void CancelWithError(GrpcStream& stream, std::string_view call_name) noexcept {
71 AsyncMethodInvocation cancel;
72 stream.FinishWithError(kUnknownErrorStatus, cancel.GetTag());
73 if (Wait(cancel) != impl::AsyncMethodInvocation::WaitStatus::kOk)
74 ReportErrorWhileCancelling(call_name);
75}
76
77template <typename GrpcStream>
78void FinishWithError(GrpcStream& stream, const grpc::Status& status,
79 std::string_view call_name) {
80 AsyncMethodInvocation finish;
81 stream.FinishWithError(status, finish.GetTag());
82 ThrowOnError(Wait(finish), call_name, "FinishWithError");
83}
84
85template <typename GrpcStream>
86void SendInitialMetadata(GrpcStream& stream, std::string_view call_name) {
87 AsyncMethodInvocation metadata;
88 stream.SendInitialMetadata(metadata.GetTag());
89 ThrowOnError(Wait(metadata), call_name, "SendInitialMetadata");
90}
91
92template <typename GrpcStream, typename Request>
93bool Read(GrpcStream& stream, Request& request) {
94 AsyncMethodInvocation read;
95 stream.Read(&request, read.GetTag());
96 return Wait(read) == impl::AsyncMethodInvocation::WaitStatus::kOk;
97}
98
99template <typename GrpcStream, typename Response>
100void Write(GrpcStream& stream, const Response& response,
101 grpc::WriteOptions options, std::string_view call_name) {
102 AsyncMethodInvocation write;
103 stream.Write(response, options, write.GetTag());
104 ThrowOnError(Wait(write), call_name, "Write");
105}
106
107template <typename GrpcStream, typename Response>
108void WriteAndFinish(GrpcStream& stream, const Response& response,
109 grpc::WriteOptions options, const grpc::Status& status,
110 std::string_view call_name) {
111 AsyncMethodInvocation write_and_finish;
112 stream.WriteAndFinish(response, options, status, write_and_finish.GetTag());
113 ThrowOnError(Wait(write_and_finish), call_name, "WriteAndFinish");
114}
115
116template <typename GrpcStream, typename State>
117void SendInitialMetadataIfNew(GrpcStream& stream, std::string_view call_name,
118 State& state) {
119 if (state == State::kNew) {
120 state = State::kOpen;
121 SendInitialMetadata(stream, call_name);
122 }
123}
124
125} // namespace ugrpc::server::impl
126
127USERVER_NAMESPACE_END