userver: userver/server/request/response_base.hpp Source File
Loading...
Searching...
No Matches
response_base.hpp
Go to the documentation of this file.
1#pragma once
2
3/// @file userver/server/request/response_base.hpp
4/// @brief @copybrief server::request::ResponseBase
5
6#include <atomic>
7#include <chrono>
8#include <functional>
9#include <limits>
10#include <optional>
11#include <string>
12
13#include <userver/concurrent/queue.hpp>
14#include <userver/concurrent/striped_counter.hpp>
15#include <userver/engine/single_consumer_event.hpp>
16#include <userver/utils/fast_pimpl.hpp>
17
18USERVER_NAMESPACE_BEGIN
19
20/// @cond
21// TODO: server internals. remove from a public interface
22namespace server::http::impl {
23
24struct Http2StreamEvent {
25 std::int32_t stream_id{-1};
26 std::string body_part{};
27 bool is_end{false};
28};
29
30// The order is fifo in the context of a single producer. So we are tolerant to
31// reordering between producers
32using Http2StreamEventQueue = concurrent::NonFifoMpscQueue<Http2StreamEvent>;
33
34class Http2StreamEventProducer final {
35public:
36 Http2StreamEventProducer(Http2StreamEventQueue& queue, engine::SingleConsumerEvent& event);
37
38 void PushEvent(Http2StreamEvent event, engine::Deadline deadline = {});
39
40 void CloseStream(std::int32_t id);
41
42private:
43 Http2StreamEventQueue::Producer producer_;
44 engine::SingleConsumerEvent& event_;
45};
46
47} // namespace server::http::impl
48/// @endcond
49
50namespace engine::io {
51class RwBase;
52} // namespace engine::io
53
54namespace server::request {
55
56class ResponseDataAccounter final {
57public:
58 void StartRequest(std::size_t size, std::chrono::steady_clock::time_point create_time);
59
60 void StopRequest(std::size_t size, std::chrono::steady_clock::time_point create_time);
61
62 std::size_t GetPendingResponsesSizeInBytes() const { return pending_responses_size_in_bytes_; }
63
64 std::size_t GetMaxPendingResponsesSizeInBytes() const { return max_pending_responses_size_in_bytes_; }
65
66 void SetMaxPendingResponsesSizeInBytes(size_t size) { max_pending_responses_size_in_bytes_ = size; }
67
68 std::chrono::milliseconds GetAvgRequestTime() const;
69
70private:
71 std::atomic<std::size_t> pending_responses_size_in_bytes_{0};
72 std::atomic<std::size_t> max_pending_responses_size_in_bytes_{std::numeric_limits<std::size_t>::max()};
73 concurrent::StripedCounter pending_responses_count_{};
74 concurrent::StripedCounter time_sum_{};
75};
76
77// TODO: merge with HttpResponse
78
79/// @brief Base class for all the server responses.
81public:
82 explicit ResponseBase(ResponseDataAccounter& data_accounter);
83 ResponseBase(const ResponseBase&) = delete;
84 ResponseBase(ResponseBase&&) = delete;
85 virtual ~ResponseBase() noexcept;
86
87 void SetData(std::string data);
88 const std::string& GetData() const { return data_; }
89 std::string&& ExtractData() { return std::move(data_); }
90
91 virtual bool IsBodyStreamed() const = 0;
92 virtual bool WaitForHeadersEnd() = 0;
93 virtual void SetHeadersEnd() = 0;
94
95 /// @cond
96 // TODO: server internals. remove from a public interface
97 void SetReady();
98 void SetReady(std::chrono::steady_clock::time_point now);
99 virtual void SetSendFailed(std::chrono::steady_clock::time_point failure_time);
100 bool IsLimitReached() const;
101
102 bool IsReady() const { return is_ready_; }
103 bool IsSent() const noexcept { return is_sent_; }
104 size_t BytesSent() const { return bytes_sent_; }
105 std::chrono::steady_clock::time_point ReadyTime() const { return ready_time_; }
106 std::chrono::steady_clock::time_point SentTime() const { return sent_time_; }
107 virtual void SendResponse(engine::io::RwBase& socket) = 0;
108
109 virtual void SetStatusServiceUnavailable() = 0;
110 virtual void SetStatusOk() = 0;
111 virtual void SetStatusNotFound() = 0;
112
113 // HTTP/2.0 only
114 void SetStreamId(std::int32_t stream_id);
115 std::optional<std::int32_t> GetStreamId() const { return stream_id_; }
116 void SetStreamProdicer(http::impl::Http2StreamEventProducer&& producer);
117 http::impl::Http2StreamEventProducer GetStreamProducer();
118 /// @endcond
119
120protected:
121 ResponseBase(ResponseDataAccounter& data_account, std::chrono::steady_clock::time_point now);
122
123 void SetSent(std::size_t bytes_sent, std::chrono::steady_clock::time_point sent_time);
124
125private:
126 class Guard final {
127 public:
128 Guard(ResponseDataAccounter& accounter, std::chrono::steady_clock::time_point create_time, size_t size)
129 : accounter_(accounter),
130 create_time_(create_time),
131 size_(size)
132 {
133 accounter_.StartRequest(size_, create_time_);
134 }
135
136 ~Guard() { accounter_.StopRequest(size_, create_time_); }
137
138 private:
139 ResponseDataAccounter& accounter_;
140 std::chrono::steady_clock::time_point create_time_;
141 size_t size_;
142 };
143
144 ResponseDataAccounter& accounter_;
145 std::optional<Guard> guard_;
146 std::string data_;
147 std::chrono::steady_clock::time_point create_time_;
148 std::chrono::steady_clock::time_point ready_time_;
149 std::chrono::steady_clock::time_point sent_time_;
150 size_t bytes_sent_ = 0;
151 bool is_ready_ = false;
152 bool is_sent_ = false;
153 std::optional<std::int32_t> stream_id_;
154 std::optional<http::impl::Http2StreamEventProducer> producer_{};
155};
156
157} // namespace server::request
158
159USERVER_NAMESPACE_END