userver: userver/server/request/response_base.hpp Source File
Loading...
Searching...
No Matches
response_base.hpp
1#pragma once
2
3#include <atomic>
4#include <chrono>
5#include <functional>
6#include <limits>
7#include <optional>
8#include <string>
9
10#include <userver/concurrent/queue.hpp>
11#include <userver/concurrent/striped_counter.hpp>
12#include <userver/engine/single_consumer_event.hpp>
13#include <userver/utils/fast_pimpl.hpp>
14
15USERVER_NAMESPACE_BEGIN
16
17/// @cond
18// TODO: server internals. remove from a public interface
19namespace server::http::impl {
20
21struct Http2StreamEvent {
22 std::int32_t stream_id{-1};
23 std::string body_part{};
24 bool is_end{false};
25};
26
27// The order is fifo in the context of a single producer. So we are tolerant to
28// reordering between producers
29using Http2StreamEventQueue = concurrent::NonFifoMpscQueue<Http2StreamEvent>;
30
31class Http2StreamEventProducer final {
32public:
33 Http2StreamEventProducer(Http2StreamEventQueue& queue, engine::SingleConsumerEvent& event);
34
35 void PushEvent(Http2StreamEvent event, engine::Deadline deadline = {});
36
37 void CloseStream(std::int32_t id);
38
39private:
40 Http2StreamEventQueue::Producer producer_;
41 engine::SingleConsumerEvent& event_;
42};
43
44} // namespace server::http::impl
45/// @endcond
46
47namespace engine::io {
48class RwBase;
49} // namespace engine::io
50
51namespace server::request {
52
53class ResponseDataAccounter final {
54public:
55 void StartRequest(size_t size, std::chrono::steady_clock::time_point create_time);
56
57 void StopRequest(size_t size, std::chrono::steady_clock::time_point create_time);
58
59 size_t GetCurrentLevel() const { return current_; }
60
61 size_t GetMaxLevel() const { return max_; }
62
63 void SetMaxLevel(size_t size) { max_ = size; }
64
65 std::chrono::milliseconds GetAvgRequestTime() const;
66
67private:
68 std::atomic<size_t> current_{0};
69 std::atomic<size_t> max_{std::numeric_limits<size_t>::max()};
70 concurrent::StripedCounter count_;
71 concurrent::StripedCounter time_sum_;
72};
73
74// TODO: merge with HttpResponse
75
76/// @brief Base class for all the server responses.
77class ResponseBase {
78public:
79 explicit ResponseBase(ResponseDataAccounter& data_accounter);
80 ResponseBase(const ResponseBase&) = delete;
81 ResponseBase(ResponseBase&&) = delete;
82 virtual ~ResponseBase() noexcept;
83
84 void SetData(std::string data);
85 const std::string& GetData() const { return data_; }
86 std::string&& ExtractData() { return std::move(data_); }
87
88 virtual bool IsBodyStreamed() const = 0;
89 virtual bool WaitForHeadersEnd() = 0;
90 virtual void SetHeadersEnd() = 0;
91
92 /// @cond
93 // TODO: server internals. remove from a public interface
94 void SetReady();
95 void SetReady(std::chrono::steady_clock::time_point now);
96 virtual void SetSendFailed(std::chrono::steady_clock::time_point failure_time);
97 bool IsLimitReached() const;
98
99 bool IsReady() const { return is_ready_; }
100 bool IsSent() const { return is_sent_; }
101 size_t BytesSent() const { return bytes_sent_; }
102 std::chrono::steady_clock::time_point ReadyTime() const { return ready_time_; }
103 std::chrono::steady_clock::time_point SentTime() const { return sent_time_; }
104 virtual void SendResponse(engine::io::RwBase& socket) = 0;
105
106 virtual void SetStatusServiceUnavailable() = 0;
107 virtual void SetStatusOk() = 0;
108 virtual void SetStatusNotFound() = 0;
109
110 // HTTP/2.0 only
111 void SetStreamId(std::int32_t stream_id);
112 std::optional<std::int32_t> GetStreamId() const { return stream_id_; }
113 void SetStreamProdicer(http::impl::Http2StreamEventProducer&& producer);
114 http::impl::Http2StreamEventProducer GetStreamProducer();
115 /// @endcond
116
117protected:
118 ResponseBase(ResponseDataAccounter& data_account, std::chrono::steady_clock::time_point now);
119
120 void SetSent(std::size_t bytes_sent, std::chrono::steady_clock::time_point sent_time);
121
122private:
123 class Guard final {
124 public:
125 Guard(ResponseDataAccounter& accounter, std::chrono::steady_clock::time_point create_time, size_t size)
126 : accounter_(accounter), create_time_(create_time), size_(size) {
127 accounter_.StartRequest(size_, create_time_);
128 }
129
130 ~Guard() { accounter_.StopRequest(size_, create_time_); }
131
132 private:
133 ResponseDataAccounter& accounter_;
134 std::chrono::steady_clock::time_point create_time_;
135 size_t size_;
136 };
137
138 ResponseDataAccounter& accounter_;
139 std::optional<Guard> guard_;
140 std::string data_;
141 std::chrono::steady_clock::time_point create_time_;
142 std::chrono::steady_clock::time_point ready_time_;
143 std::chrono::steady_clock::time_point sent_time_;
144 size_t bytes_sent_ = 0;
145 bool is_ready_ = false;
146 bool is_sent_ = false;
147 std::optional<std::int32_t> stream_id_;
148 std::optional<http::impl::Http2StreamEventProducer> producer_{};
149};
150
151} // namespace server::request
152
153USERVER_NAMESPACE_END