7#include <grpcpp/impl/codegen/async_stream.h>
8#include <grpcpp/impl/codegen/async_unary_call.h>
9#include <grpcpp/impl/codegen/status.h>
11#include <userver/ugrpc/server/exceptions.hpp>
12#include <userver/ugrpc/server/impl/async_method_invocation.hpp>
14USERVER_NAMESPACE_BEGIN
16namespace ugrpc::
server::impl {
22template <
typename Response>
23using RawResponseWriter = grpc::ServerAsyncResponseWriter<Response>;
25template <
typename Request,
typename Response>
26using RawReader = grpc::ServerAsyncReader<Response, Request>;
28template <
typename Response>
29using RawWriter = grpc::ServerAsyncWriter<Response>;
31template <
typename Request,
typename Response>
32using RawReaderWriter = grpc::ServerAsyncReaderWriter<Response, Request>;
35using ugrpc::impl::AsyncMethodInvocation;
37void ReportErrorWhileCancelling(std::string_view call_name)
noexcept;
39void ThrowOnError(impl::AsyncMethodInvocation::WaitStatus status,
40 std::string_view call_name, std::string_view stage_name);
42extern const grpc::Status kUnimplementedStatus;
43extern const grpc::Status kUnknownErrorStatus;
45template <
typename GrpcStream,
typename Response>
46void Finish(GrpcStream& stream,
const Response& response,
47 const grpc::Status& status, std::string_view call_name) {
48 AsyncMethodInvocation finish;
49 stream.Finish(response, status, finish.GetTag());
50 ThrowOnError(Wait(finish), call_name,
"Finish");
53template <
typename GrpcStream>
54void Finish(GrpcStream& stream,
const grpc::Status& status,
55 std::string_view call_name) {
56 AsyncMethodInvocation finish;
57 stream.Finish(status, finish.GetTag());
58 ThrowOnError(Wait(finish), call_name,
"Finish");
61template <
typename GrpcStream>
62void Cancel(GrpcStream& stream, std::string_view call_name)
noexcept {
63 AsyncMethodInvocation cancel;
64 stream.Finish(kUnknownErrorStatus, cancel.GetTag());
65 if (Wait(cancel) != impl::AsyncMethodInvocation::WaitStatus::kOk)
66 ReportErrorWhileCancelling(call_name);
69template <
typename GrpcStream>
70void CancelWithError(GrpcStream& stream, std::string_view call_name)
noexcept {
71 AsyncMethodInvocation cancel;
72 stream.FinishWithError(kUnknownErrorStatus, cancel.GetTag());
73 if (Wait(cancel) != impl::AsyncMethodInvocation::WaitStatus::kOk)
74 ReportErrorWhileCancelling(call_name);
77template <
typename GrpcStream>
78void FinishWithError(GrpcStream& stream,
const grpc::Status& status,
79 std::string_view call_name) {
80 AsyncMethodInvocation finish;
81 stream.FinishWithError(status, finish.GetTag());
82 ThrowOnError(Wait(finish), call_name,
"FinishWithError");
85template <
typename GrpcStream>
86void SendInitialMetadata(GrpcStream& stream, std::string_view call_name) {
87 AsyncMethodInvocation metadata;
88 stream.SendInitialMetadata(metadata.GetTag());
89 ThrowOnError(Wait(metadata), call_name,
"SendInitialMetadata");
92template <
typename GrpcStream,
typename Request>
93bool Read(GrpcStream& stream, Request& request) {
94 AsyncMethodInvocation read;
95 stream.Read(&request, read.GetTag());
96 return Wait(read) == impl::AsyncMethodInvocation::WaitStatus::kOk;
99template <
typename GrpcStream,
typename Response>
100void Write(GrpcStream& stream,
const Response& response,
101 grpc::WriteOptions options, std::string_view call_name) {
102 AsyncMethodInvocation write;
103 stream.Write(response, options, write.GetTag());
104 ThrowOnError(Wait(write), call_name,
"Write");
107template <
typename GrpcStream,
typename Response>
108void WriteAndFinish(GrpcStream& stream,
const Response& response,
109 grpc::WriteOptions options,
const grpc::Status& status,
110 std::string_view call_name) {
111 AsyncMethodInvocation write_and_finish;
112 stream.WriteAndFinish(response, options, status, write_and_finish.GetTag());
113 ThrowOnError(Wait(write_and_finish), call_name,
"WriteAndFinish");
116template <
typename GrpcStream,
typename State>
117void SendInitialMetadataIfNew(GrpcStream& stream, std::string_view call_name,
119 if (state == State::kNew) {
120 state = State::kOpen;
121 SendInitialMetadata(stream, call_name);