10#include <userver/concurrent/queue.hpp>
11#include <userver/concurrent/striped_counter.hpp>
12#include <userver/engine/single_consumer_event.hpp>
13#include <userver/utils/fast_pimpl.hpp>
15USERVER_NAMESPACE_BEGIN
19namespace server::
http::impl {
21struct Http2StreamEvent {
22 std::int32_t stream_id{-1};
23 std::string body_part{};
29using Http2StreamEventQueue = concurrent::NonFifoMpscQueue<Http2StreamEvent>;
31class Http2StreamEventProducer
final {
33 Http2StreamEventProducer(Http2StreamEventQueue& queue, engine::SingleConsumerEvent& event);
35 void PushEvent(Http2StreamEvent event, engine::Deadline deadline
= {});
37 void CloseStream(std::int32_t id);
40 Http2StreamEventQueue::Producer producer_;
41 engine::SingleConsumerEvent& event_;
53class ResponseDataAccounter
final {
55 void StartRequest(size_t size, std::chrono::steady_clock::time_point create_time);
57 void StopRequest(size_t size, std::chrono::steady_clock::time_point create_time);
59 size_t GetCurrentLevel()
const {
return current_; }
61 size_t GetMaxLevel()
const {
return max_; }
63 void SetMaxLevel(size_t size) { max_ = size; }
65 std::chrono::milliseconds GetAvgRequestTime()
const;
68 std::atomic<size_t> current_{0};
69 std::atomic<size_t> max_{std::numeric_limits<size_t>::max()};
79 explicit ResponseBase(ResponseDataAccounter& data_accounter);
82 virtual ~ResponseBase()
noexcept;
84 void SetData(std::string data);
85 const std::string& GetData()
const {
return data_; }
86 std::string&& ExtractData() {
return std::move(data_); }
88 virtual bool IsBodyStreamed()
const = 0;
89 virtual bool WaitForHeadersEnd() = 0;
90 virtual void SetHeadersEnd() = 0;
95 void SetReady(std::chrono::steady_clock::time_point now);
96 virtual void SetSendFailed(std::chrono::steady_clock::time_point failure_time);
97 bool IsLimitReached()
const;
99 bool IsReady()
const {
return is_ready_; }
100 bool IsSent()
const {
return is_sent_; }
101 size_t BytesSent()
const {
return bytes_sent_; }
102 std::chrono::steady_clock::time_point ReadyTime()
const {
return ready_time_; }
103 std::chrono::steady_clock::time_point SentTime()
const {
return sent_time_; }
104 virtual void SendResponse(engine::
io::
RwBase& socket) = 0;
106 virtual void SetStatusServiceUnavailable() = 0;
107 virtual void SetStatusOk() = 0;
108 virtual void SetStatusNotFound() = 0;
111 void SetStreamId(std::int32_t stream_id);
112 std::optional<std::int32_t> GetStreamId()
const {
return stream_id_; }
113 void SetStreamProdicer(
http::impl::Http2StreamEventProducer&& producer);
114 http::impl::Http2StreamEventProducer GetStreamProducer();
118 ResponseBase(ResponseDataAccounter& data_account, std::chrono::steady_clock::time_point now);
120 void SetSent(std::size_t bytes_sent, std::chrono::steady_clock::time_point sent_time);
125 Guard(ResponseDataAccounter& accounter, std::chrono::steady_clock::time_point create_time, size_t size)
126 : accounter_(accounter), create_time_(create_time), size_(size) {
127 accounter_.StartRequest(size_, create_time_);
130 ~Guard() { accounter_.StopRequest(size_, create_time_); }
133 ResponseDataAccounter& accounter_;
134 std::chrono::steady_clock::time_point create_time_;
138 ResponseDataAccounter& accounter_;
139 std::optional<Guard> guard_;
141 std::chrono::steady_clock::time_point create_time_;
142 std::chrono::steady_clock::time_point ready_time_;
143 std::chrono::steady_clock::time_point sent_time_;
144 size_t bytes_sent_ = 0;
145 bool is_ready_ =
false;
146 bool is_sent_ =
false;
147 std::optional<std::int32_t> stream_id_;
148 std::optional<
http::impl::Http2StreamEventProducer> producer_{};