10#include <userver/concurrent/queue.hpp>
11#include <userver/concurrent/striped_counter.hpp>
12#include <userver/engine/single_consumer_event.hpp>
13#include <userver/utils/fast_pimpl.hpp>
15USERVER_NAMESPACE_BEGIN
19namespace server::
http::impl {
21struct Http2StreamEvent {
22 std::int32_t stream_id{-1};
23 std::string body_part{};
29using Http2StreamEventQueue = concurrent::NonFifoMpscQueue<Http2StreamEvent>;
31class Http2StreamEventProducer
final {
33 Http2StreamEventProducer(Http2StreamEventQueue& queue, engine::SingleConsumerEvent& event);
35 void PushEvent(Http2StreamEvent event, engine::Deadline deadline = {});
37 void CloseStream(std::int32_t id);
40 Http2StreamEventQueue::Producer producer_;
41 engine::SingleConsumerEvent& event_;
53class ResponseDataAccounter
final {
55 void StartRequest(size_t size, std::chrono::steady_clock::time_point create_time);
57 void StopRequest(size_t size, std::chrono::steady_clock::time_point create_time);
59 size_t GetCurrentLevel()
const {
return current_; }
61 size_t GetMaxLevel()
const {
return max_; }
63 void SetMaxLevel(size_t size) { max_ = size; }
65 std::chrono::milliseconds GetAvgRequestTime()
const;
68 std::atomic<size_t> current_{0};
69 std::atomic<size_t> max_{std::numeric_limits<size_t>::max()};
77 explicit ResponseBase(ResponseDataAccounter& data_accounter);
80 virtual ~ResponseBase()
noexcept;
82 void SetData(std::string data);
83 const std::string& GetData()
const {
return data_; }
84 std::string&& ExtractData() {
return std::move(data_); }
86 virtual bool IsBodyStreamed()
const = 0;
87 virtual bool WaitForHeadersEnd() = 0;
88 virtual void SetHeadersEnd() = 0;
93 void SetReady(std::chrono::steady_clock::time_point now);
94 virtual void SetSendFailed(std::chrono::steady_clock::time_point failure_time);
95 bool IsLimitReached()
const;
97 bool IsReady()
const {
return is_ready_; }
98 bool IsSent()
const {
return is_sent_; }
99 size_t BytesSent()
const {
return bytes_sent_; }
100 std::chrono::steady_clock::time_point ReadyTime()
const {
return ready_time_; }
101 std::chrono::steady_clock::time_point SentTime()
const {
return sent_time_; }
102 virtual void SendResponse(engine::
io::
RwBase& socket) = 0;
104 virtual void SetStatusServiceUnavailable() = 0;
105 virtual void SetStatusOk() = 0;
106 virtual void SetStatusNotFound() = 0;
109 void SetStreamId(std::int32_t stream_id);
110 std::optional<std::int32_t> GetStreamId()
const {
return stream_id_; }
111 void SetStreamProdicer(
http::impl::Http2StreamEventProducer&& producer);
112 http::impl::Http2StreamEventProducer GetStreamProducer();
116 ResponseBase(ResponseDataAccounter& data_account, std::chrono::steady_clock::time_point now);
118 void SetSent(std::size_t bytes_sent, std::chrono::steady_clock::time_point sent_time);
123 Guard(ResponseDataAccounter& accounter, std::chrono::steady_clock::time_point create_time, size_t size)
124 : accounter_(accounter), create_time_(create_time), size_(size) {
125 accounter_.StartRequest(size_, create_time_);
128 ~Guard() { accounter_.StopRequest(size_, create_time_); }
131 ResponseDataAccounter& accounter_;
132 std::chrono::steady_clock::time_point create_time_;
136 ResponseDataAccounter& accounter_;
137 std::optional<Guard> guard_;
139 std::chrono::steady_clock::time_point create_time_;
140 std::chrono::steady_clock::time_point ready_time_;
141 std::chrono::steady_clock::time_point sent_time_;
142 size_t bytes_sent_ = 0;
143 bool is_ready_ =
false;
144 bool is_sent_ =
false;
145 std::optional<std::int32_t> stream_id_;
146 std::optional<
http::impl::Http2StreamEventProducer> producer_{};