userver: userver/server/request/response_base.hpp Source File
Loading...
Searching...
No Matches
response_base.hpp
1#pragma once
2
3#include <atomic>
4#include <chrono>
5#include <functional>
6#include <limits>
7#include <optional>
8#include <string>
9
10#include <userver/concurrent/queue.hpp>
11#include <userver/concurrent/striped_counter.hpp>
12#include <userver/engine/single_consumer_event.hpp>
13#include <userver/utils/fast_pimpl.hpp>
14
15USERVER_NAMESPACE_BEGIN
16
17/// @cond
18// TODO: server internals. remove from a public interface
19namespace server::http::impl {
20
21struct Http2StreamEvent {
22 std::int32_t stream_id{-1};
23 std::string body_part{};
24 bool is_end{false};
25};
26
27// The order is fifo in the context of a single producer. So we are tolerant to
28// reordering between producers
29using Http2StreamEventQueue = concurrent::NonFifoMpscQueue<Http2StreamEvent>;
30
31class Http2StreamEventProducer final {
32public:
33 Http2StreamEventProducer(Http2StreamEventQueue& queue, engine::SingleConsumerEvent& event);
34
35 void PushEvent(Http2StreamEvent event, engine::Deadline deadline = {});
36
37 void CloseStream(std::int32_t id);
38
39private:
40 Http2StreamEventQueue::Producer producer_;
41 engine::SingleConsumerEvent& event_;
42};
43
44} // namespace server::http::impl
45/// @endcond
46
47namespace engine::io {
48class RwBase;
49} // namespace engine::io
50
51namespace server::request {
52
53class ResponseDataAccounter final {
54public:
55 void StartRequest(size_t size, std::chrono::steady_clock::time_point create_time);
56
57 void StopRequest(size_t size, std::chrono::steady_clock::time_point create_time);
58
59 size_t GetCurrentLevel() const { return current_; }
60
61 size_t GetMaxLevel() const { return max_; }
62
63 void SetMaxLevel(size_t size) { max_ = size; }
64
65 std::chrono::milliseconds GetAvgRequestTime() const;
66
67private:
68 std::atomic<size_t> current_{0};
69 std::atomic<size_t> max_{std::numeric_limits<size_t>::max()};
70 concurrent::StripedCounter count_;
71 concurrent::StripedCounter time_sum_;
72};
73
74/// @brief Base class for all the server responses.
75class ResponseBase {
76public:
77 explicit ResponseBase(ResponseDataAccounter& data_accounter);
78 ResponseBase(const ResponseBase&) = delete;
79 ResponseBase(ResponseBase&&) = delete;
80 virtual ~ResponseBase() noexcept;
81
82 void SetData(std::string data);
83 const std::string& GetData() const { return data_; }
84 std::string&& ExtractData() { return std::move(data_); }
85
86 virtual bool IsBodyStreamed() const = 0;
87 virtual bool WaitForHeadersEnd() = 0;
88 virtual void SetHeadersEnd() = 0;
89
90 /// @cond
91 // TODO: server internals. remove from a public interface
92 void SetReady();
93 void SetReady(std::chrono::steady_clock::time_point now);
94 virtual void SetSendFailed(std::chrono::steady_clock::time_point failure_time);
95 bool IsLimitReached() const;
96
97 bool IsReady() const { return is_ready_; }
98 bool IsSent() const { return is_sent_; }
99 size_t BytesSent() const { return bytes_sent_; }
100 std::chrono::steady_clock::time_point ReadyTime() const { return ready_time_; }
101 std::chrono::steady_clock::time_point SentTime() const { return sent_time_; }
102 virtual void SendResponse(engine::io::RwBase& socket) = 0;
103
104 virtual void SetStatusServiceUnavailable() = 0;
105 virtual void SetStatusOk() = 0;
106 virtual void SetStatusNotFound() = 0;
107
108 // HTTP/2.0 only
109 void SetStreamId(std::int32_t stream_id);
110 std::optional<std::int32_t> GetStreamId() const { return stream_id_; }
111 void SetStreamProdicer(http::impl::Http2StreamEventProducer&& producer);
112 http::impl::Http2StreamEventProducer GetStreamProducer();
113 /// @endcond
114
115protected:
116 ResponseBase(ResponseDataAccounter& data_account, std::chrono::steady_clock::time_point now);
117
118 void SetSent(std::size_t bytes_sent, std::chrono::steady_clock::time_point sent_time);
119
120private:
121 class Guard final {
122 public:
123 Guard(ResponseDataAccounter& accounter, std::chrono::steady_clock::time_point create_time, size_t size)
124 : accounter_(accounter), create_time_(create_time), size_(size) {
125 accounter_.StartRequest(size_, create_time_);
126 }
127
128 ~Guard() { accounter_.StopRequest(size_, create_time_); }
129
130 private:
131 ResponseDataAccounter& accounter_;
132 std::chrono::steady_clock::time_point create_time_;
133 size_t size_;
134 };
135
136 ResponseDataAccounter& accounter_;
137 std::optional<Guard> guard_;
138 std::string data_;
139 std::chrono::steady_clock::time_point create_time_;
140 std::chrono::steady_clock::time_point ready_time_;
141 std::chrono::steady_clock::time_point sent_time_;
142 size_t bytes_sent_ = 0;
143 bool is_ready_ = false;
144 bool is_sent_ = false;
145 std::optional<std::int32_t> stream_id_;
146 std::optional<http::impl::Http2StreamEventProducer> producer_{};
147};
148
149} // namespace server::request
150
151USERVER_NAMESPACE_END