userver: userver/utils/statistics/recentperiod.hpp Source File
Loading...
Searching...
No Matches
recentperiod.hpp
1#pragma once
2
3#include <atomic>
4#include <chrono>
5#include <tuple>
6#include <type_traits>
7#include <vector>
8
9#include <userver/utils/datetime.hpp>
10#include <userver/utils/statistics/fwd.hpp>
11#include <userver/utils/statistics/recentperiod_detail.hpp>
12
13USERVER_NAMESPACE_BEGIN
14
15namespace utils::statistics {
16
17/** \brief Class maintains circular buffer of Counters
18 *
19 * At any time current Counter is accessible for modification via
20 * GetCurrentCounter().
21 * Counter can provide a Reset() member function to clear contents.
22 * @see utils::statistics::Percentile
23 */
24template <typename Counter, typename Result,
25 typename Timer = utils::datetime::SteadyClock>
26class RecentPeriod {
27 public:
28 using Duration = typename Timer::duration;
29
30 static_assert(
31 (detail::kResultWantsAddFunction<Result, Counter, Duration> ||
32 detail::kResultCanUseAddAssign<Result, Counter>),
33 "The Result template type argument must provide either Add(Counter, "
34 "Duration, Duration) function or add assignment operator");
35
36 static constexpr bool kUseAddFunction =
37 detail::kResultWantsAddFunction<Result, Counter, Duration>;
38
39 /**
40 * @param epoch_duration duration of epoch.
41 * @param max_duration max duration to calculate statistics for
42 * must be multiple of epoch_duration.
43 */
44 RecentPeriod(Duration epoch_duration = std::chrono::seconds(5),
45 Duration max_duration = std::chrono::seconds(60))
46 : epoch_duration_(epoch_duration),
47 max_duration_(max_duration),
48 epoch_index_(0),
50
51 Counter& GetCurrentCounter() { return items_[get_current_index()].counter; }
52
53 Counter& GetPreviousCounter(int epochs_ago) {
54 return items_[get_previous_index(epochs_ago)].counter;
55 }
56
57 /** \brief Aggregates counters within given time range
58 *
59 * @param duration Time range. Special value Duration::min() -> use
60 * whole RecentPeriod range.
61 * @param with_current_epoch Include current (possibly unfinished) counter
62 * into aggregation
63 *
64 * Type Result must have method Add(Counter, Duration, Duration) or allow
65 * addition of counter values
66 */
67 // NOLINTNEXTLINE(readability-const-return-type)
68 const Result GetStatsForPeriod(Duration duration = Duration::min(),
69 bool with_current_epoch = false) const {
70 if (duration == Duration::min()) {
71 duration = max_duration_;
72 }
73
74 Result result{};
75 Duration now = Timer::now().time_since_epoch();
76 Duration current_epoch = get_epoch_for_duration(now);
77 Duration start_epoch = current_epoch - duration;
78 Duration first_epoch_duration = now - current_epoch;
79 size_t index = epoch_index_.load();
80
81 for (size_t i = 0; i < items_.size();
82 i++, index = (index + items_.size() - 1) % items_.size()) {
83 Duration epoch = items_[index].epoch;
84
85 if (epoch > current_epoch) continue;
86 if (epoch == current_epoch && !with_current_epoch) continue;
87 if (epoch < start_epoch) break;
88
89 if constexpr (kUseAddFunction) {
90 Duration this_epoch_duration =
91 (i == 0) ? first_epoch_duration : epoch_duration_;
92
93 Duration before_this_epoch_duration = epoch - start_epoch;
94 result.Add(items_[index].counter, this_epoch_duration,
95 before_this_epoch_duration);
96 } else {
97 result += items_[index].counter;
98 }
99 }
100
101 return result;
102 }
103
104 Duration GetEpochDuration() const { return epoch_duration_; }
105
106 Duration GetMaxDuration() const { return max_duration_; }
107
108 void UpdateEpochIfOld() { std::ignore = get_current_index(); }
109
110 void Reset() {
111 for (auto& item : items_) {
112 item.Reset();
113 }
114 }
115
116 private:
117 size_t get_current_index() const {
118 while (true) {
119 Duration now = Timer::now().time_since_epoch();
120 Duration epoch = get_epoch_for_duration(now);
121 size_t index = epoch_index_.load();
122 Duration bucket_epoch = items_[index].epoch.load();
123
124 if (epoch != bucket_epoch) {
125 size_t new_index = (index + 1) % items_.size();
126
127 if (epoch_index_.compare_exchange_weak(index, new_index)) {
128 items_[new_index].epoch = epoch;
129 items_[(new_index + 1) % items_.size()].Reset();
130 return new_index;
131 }
132 } else {
133 return index;
134 }
135 }
136 }
137
138 size_t get_previous_index(int epochs_ago) {
139 int index = static_cast<int>(get_current_index()) - epochs_ago;
140 while (index < 0) index += items_.size();
141 return index % items_.size();
142 }
143
144 Duration get_epoch_for_duration(Duration duration) const {
145 auto now = std::chrono::duration_cast<Duration>(duration);
146 return now - now % epoch_duration_;
147 }
148
149 static size_t get_size_for_duration(Duration epoch_duration,
150 Duration max_duration) {
151 /* 3 = current bucket, next zero bucket and extra one to handle
152 possible race. */
153 return max_duration.count() / epoch_duration.count() + 3;
154 }
155
156 struct EpochBucket {
157 static constexpr bool kUseReset = detail::kCanReset<Counter>;
158 std::atomic<Duration> epoch;
159 Counter counter;
160
161 EpochBucket() { Reset(); }
162
163 void Reset() {
164 epoch = Duration::min();
165 if constexpr (kUseReset) {
166 counter.Reset();
167 } else {
168 counter = 0;
169 }
170 }
171 };
172
173 const Duration epoch_duration_;
174 const Duration max_duration_;
175 mutable std::atomic_size_t epoch_index_;
176 mutable std::vector<EpochBucket> items_;
177};
178
179/// @brief @a Writer support for @a RecentPeriod. Forwards to `DumpMetric`
180/// overload for `Result`.
181///
182/// @param args if any, are forwarded to `DumpMetric` for `Result`
183template <typename Counter, typename Result, typename Timer>
184void DumpMetric(Writer& writer,
185 const RecentPeriod<Counter, Result, Timer>& recent_period) {
186 writer = recent_period.GetStatsForPeriod();
187}
188
189template <typename Counter, typename Result, typename Timer>
190void ResetMetric(RecentPeriod<Counter, Result, Timer>& recent_period) {
191 recent_period.Reset();
192}
193
194} // namespace utils::statistics
195
196USERVER_NAMESPACE_END