userver: userver/drivers/impl/connection_pool_base.hpp Source File
⚠️ This is the documentation for an old userver version. Click here to switch to the latest version.
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages Concepts
connection_pool_base.hpp
Go to the documentation of this file.
1#pragma once
2
3/// @file userver/drivers/impl/connection_pool_base.hpp
4/// @brief @copybrief drivers::impl::ConnectionPoolBase
5
6#include <atomic>
7#include <chrono>
8#include <memory>
9#include <stdexcept>
10#include <utility>
11#include <vector>
12
13#include <boost/lockfree/queue.hpp>
14
15#include <userver/engine/async.hpp>
16#include <userver/engine/deadline.hpp>
17#include <userver/engine/get_all.hpp>
18#include <userver/engine/semaphore.hpp>
19#include <userver/logging/log.hpp>
20#include <userver/utils/assert.hpp>
21
22USERVER_NAMESPACE_BEGIN
23
24namespace drivers::impl {
25
26/// @brief Thrown when no connection could be acquired from the pool within
27/// specified timeout.
28class PoolWaitLimitExceededError : public std::runtime_error {
29 public:
30 PoolWaitLimitExceededError();
31};
32
33/// @brief Base connection pool implementation to be derived in different
34/// drivers. Takes care of synchronization, pool limits (min/max, simultaneously
35/// connecting etc.) and provides hooks for metrics.
36template <class Connection, class Derived>
37class ConnectionPoolBase : public std::enable_shared_from_this<Derived> {
38 public:
39 protected:
40 using ConnectionRawPtr = Connection*;
41 using ConnectionUniquePtr = std::unique_ptr<Connection>;
42
43 struct ConnectionHolder final {
44 std::shared_ptr<Derived> pool_ptr;
45 ConnectionUniquePtr connection_ptr;
46 };
47
48 /// @brief Constructor, doesn't create any connections, one should call `Init`
49 /// to initialize the pool.
50 ConnectionPoolBase(std::size_t max_pool_size,
51 std::size_t max_simultaneously_connecting_clients);
52 /// @brief Destructor. One should call `Reset` before the destructor is
53 /// invoked.
54 ~ConnectionPoolBase();
55
56 /// @brief Initializes the pool with given limits.
57 /// Uses some methods that must or may be overridden in derived class, hence
58 /// a separate method and not called from base class constructor.
59 /// Derived class constructor is a good place to call this.
60 void Init(std::size_t initial_size,
61 std::chrono::milliseconds connection_setup_timeout);
62 /// @brief Resets the pool, destroying all managed connections.
63 /// Uses some methods that may be overridden in derived class, hence
64 /// a separate function and not called from base class constructor.
65 /// Derived class destructor is a good place to call this.
66 void Reset();
67
68 /// @brief Acquires a connection from the pool.
69 ConnectionHolder AcquireConnection(engine::Deadline deadline);
70 /// @brief Returns the connection to the pool.
71 /// If `connection_ptr->IsBroken()` is true, the connection is destroyed.
72 void ReleaseConnection(ConnectionUniquePtr connection_ptr);
73
74 /// @brief Pops a connection from the pool, might create a new connection if
75 /// there are no ready connections.
76 ConnectionUniquePtr Pop(engine::Deadline deadline);
77 /// @brief Tries to pop a ready connection from the pool,
78 /// may return `nullptr`.
79 ConnectionUniquePtr TryPop();
80
81 /// @brief Returns the connection to the pool, internal.
82 /// If `connection_ptr->IsBroken()` is true, the connection is destroyed.
83 /// Doesn't affect pool limits, so you shouldn't call this directly,
84 /// unless the connection is acquired from `TryPop` - then it's the only
85 /// correct way to return it back.
86 void DoRelease(ConnectionUniquePtr connection_ptr);
87
88 /// @brief Creates a new connection and tries to push it into the ready
89 /// queue. If the queue is full, the connection is dropped immediately.
90 void PushConnection(engine::Deadline deadline);
91 /// @brief Drops the connections - destroys the object and accounts for that.
92 void Drop(ConnectionRawPtr connection_ptr) noexcept;
93
94 /// @brief Returns the approximate count of alive connections (given away and
95 /// ready to use).
96 std::size_t AliveConnectionsCountApprox() const;
97
98 /// @brief Call this method if for some reason a connection previously
99 /// acquired from the pool won't be returned into it.
100 /// If one fails to do so pool limits might shrink until pool becomes
101 /// unusable.
102 void NotifyConnectionWontBeReleased();
103
104 private:
105 Derived& AsDerived() noexcept;
106 const Derived& AsDerived() const noexcept;
107
108 ConnectionUniquePtr CreateConnection(
109 const engine::SemaphoreLock& connecting_lock, engine::Deadline deadline);
110
111 void CleanupQueue();
112
113 void EnsureInitialized() const;
114 void EnsureReset() const;
115
116 engine::Semaphore given_away_semaphore_;
117 engine::Semaphore connecting_semaphore_;
118
119 boost::lockfree::queue<ConnectionRawPtr> queue_;
120 std::atomic<std::size_t> alive_connections_{0};
121
122 bool initialized_{false};
123 bool reset_{false};
124};
125
126template <class Connection, class Derived>
127ConnectionPoolBase<Connection, Derived>::ConnectionPoolBase(
128 std::size_t max_pool_size,
129 std::size_t max_simultaneously_connecting_clients)
130 : given_away_semaphore_{max_pool_size},
131 connecting_semaphore_{max_simultaneously_connecting_clients},
132 queue_{max_pool_size} {}
133
134template <class Connection, class Derived>
135ConnectionPoolBase<Connection, Derived>::~ConnectionPoolBase() {
136 if (initialized_) {
137 // We don't call Reset here, because dropping a connection (when cleaning
138 // up the queue) might invoke virtual methods, and the derived class is
139 // already destroyed, so unexpected things could happen.
140 // However, we assert that derived class performed a cleanup itself.
141 EnsureReset();
142 }
143}
144
145template <class Connection, class Derived>
146void ConnectionPoolBase<Connection, Derived>::Init(
147 std::size_t initial_size,
148 std::chrono::milliseconds connection_setup_timeout) {
149 UASSERT_MSG(!initialized_, "Calling Init multiple times is a API misuse");
150 // We mark the pool as initialized even if this method throws, because
151 // this `initialized_` field is here just to ensure correct API usage,
152 // doesn't have much meaning aside from that.
153 initialized_ = true;
154
155 std::vector<engine::TaskWithResult<void>> init_tasks{};
156 init_tasks.reserve(initial_size);
157
158 for (std::size_t i = 0; i < initial_size; ++i) {
159 init_tasks.push_back(engine::AsyncNoSpan([this, connection_setup_timeout] {
160 PushConnection(engine::Deadline::FromDuration(connection_setup_timeout));
161 }));
162 }
163
164 try {
165 engine::GetAll(init_tasks);
166 } catch (const std::exception& ex) {
167 LOG_WARNING() << "Failed to properly setup connection pool: " << ex;
168 throw;
169 }
170}
171
172template <class Connection, class Derived>
173void ConnectionPoolBase<Connection, Derived>::Reset() {
174 UASSERT_MSG(!reset_, "Calling Reset multiple times is a API misuse");
175 reset_ = true;
176
177 CleanupQueue();
178}
179
180template <class Connection, class Derived>
181typename ConnectionPoolBase<Connection, Derived>::ConnectionHolder
182ConnectionPoolBase<Connection, Derived>::AcquireConnection(
183 engine::Deadline deadline) {
184 EnsureInitialized();
185
186 auto connection_ptr = Pop(deadline);
187 return {this->shared_from_this(), std::move(connection_ptr)};
188}
189
190template <class Connection, class Derived>
191void ConnectionPoolBase<Connection, Derived>::ReleaseConnection(
192 ConnectionUniquePtr connection_ptr) {
193 EnsureInitialized();
194 UASSERT(connection_ptr);
195
196 DoRelease(std::move(connection_ptr));
197
198 given_away_semaphore_.unlock_shared();
199 AsDerived().AccountConnectionReleased();
200}
201
202template <class Connection, class Derived>
203Derived& ConnectionPoolBase<Connection, Derived>::AsDerived() noexcept {
204 return *static_cast<Derived*>(this);
205}
206
207template <class Connection, class Derived>
208const Derived& ConnectionPoolBase<Connection, Derived>::AsDerived() const
209 noexcept {
210 return *static_cast<const Derived*>(this);
211}
212
213template <class Connection, class Derived>
214typename ConnectionPoolBase<Connection, Derived>::ConnectionUniquePtr
215ConnectionPoolBase<Connection, Derived>::CreateConnection(
216 const engine::SemaphoreLock& connecting_lock, engine::Deadline deadline) {
217 EnsureInitialized();
218
219 UASSERT(connecting_lock.OwnsLock());
220 auto connection_ptr = AsDerived().DoCreateConnection(deadline);
221
222 alive_connections_.fetch_add(1);
223 AsDerived().AccountConnectionCreated();
224
225 return connection_ptr;
226}
227
228template <class Connection, class Derived>
229typename ConnectionPoolBase<Connection, Derived>::ConnectionUniquePtr
230ConnectionPoolBase<Connection, Derived>::Pop(engine::Deadline deadline) {
231 EnsureInitialized();
232
233 engine::SemaphoreLock given_away_lock{given_away_semaphore_, deadline};
234 if (!given_away_lock.OwnsLock()) {
235 AsDerived().AccountOverload();
236 throw PoolWaitLimitExceededError{};
237 }
238
239 auto connection_ptr = TryPop();
240 if (!connection_ptr) {
241 engine::SemaphoreLock connecting_lock{connecting_semaphore_, deadline};
242
243 connection_ptr = TryPop();
244 if (!connection_ptr) {
245 if (!connecting_lock.OwnsLock()) {
246 AsDerived().AccountOverload();
247 throw PoolWaitLimitExceededError{};
248 }
249 connection_ptr = CreateConnection(connecting_lock, deadline);
250 }
251 }
252
253 UASSERT(connection_ptr);
254
255 given_away_lock.Release();
256 AsDerived().AccountConnectionAcquired();
257
258 return connection_ptr;
259}
260
261template <class Connection, class Derived>
262typename ConnectionPoolBase<Connection, Derived>::ConnectionUniquePtr
263ConnectionPoolBase<Connection, Derived>::TryPop() {
264 EnsureInitialized();
265
266 ConnectionRawPtr connection_ptr{nullptr};
267 if (!queue_.pop(connection_ptr)) {
268 return nullptr;
269 }
270
271 return ConnectionUniquePtr{connection_ptr};
272}
273
274template <class Connection, class Derived>
275void ConnectionPoolBase<Connection, Derived>::DoRelease(
276 ConnectionUniquePtr connection_ptr) {
277 EnsureInitialized();
278 UASSERT(connection_ptr);
279
280 const auto is_broken = connection_ptr->IsBroken();
281 ConnectionRawPtr connection_raw_ptr = connection_ptr.release();
282 if (is_broken || !queue_.bounded_push(connection_raw_ptr)) {
283 Drop(connection_raw_ptr);
284 }
285}
286
287template <class Connection, class Derived>
288void ConnectionPoolBase<Connection, Derived>::PushConnection(
289 engine::Deadline deadline) {
290 EnsureInitialized();
291
292 engine::SemaphoreLock connecting_lock{connecting_semaphore_, deadline};
293 if (!connecting_lock.OwnsLock()) {
294 throw PoolWaitLimitExceededError{};
295 }
296
297 ConnectionUniquePtr connection_ptr =
298 CreateConnection(connecting_lock, deadline);
299 connecting_lock.Unlock();
300
301 ConnectionRawPtr connection_raw_ptr = connection_ptr.release();
302 if (!queue_.bounded_push(connection_raw_ptr)) {
303 Drop(connection_raw_ptr);
304 }
305}
306
307template <class Connection, class Derived>
308void ConnectionPoolBase<Connection, Derived>::Drop(
309 ConnectionRawPtr connection_ptr) noexcept {
310 EnsureInitialized();
311 std::default_delete<Connection>{}(connection_ptr);
312
313 alive_connections_.fetch_sub(1);
314
315 static_assert(noexcept(AsDerived().AccountConnectionDestroyed()),
316 "Please make AccountConnectionDestroyed() noexcept, "
317 "because it might get called in pool destructor and is "
318 "expected to be noexcept");
319 AsDerived().AccountConnectionDestroyed();
320}
321
322template <class Connection, class Derived>
323std::size_t
324ConnectionPoolBase<Connection, Derived>::AliveConnectionsCountApprox() const {
325 EnsureInitialized();
326
327 return alive_connections_.load();
328}
329
330template <class Connection, class Derived>
331void ConnectionPoolBase<Connection, Derived>::NotifyConnectionWontBeReleased() {
332 EnsureInitialized();
333
334 given_away_semaphore_.unlock_shared();
335 alive_connections_.fetch_sub(1);
336}
337
338template <class Connection, class Derived>
339void ConnectionPoolBase<Connection, Derived>::CleanupQueue() {
340 EnsureInitialized();
341
342 ConnectionRawPtr connection_ptr{nullptr};
343
344 while (queue_.pop(connection_ptr)) {
345 Drop(connection_ptr);
346 }
347}
348
349template <class Connection, class Derived>
350void ConnectionPoolBase<Connection, Derived>::EnsureInitialized() const {
352 initialized_,
353 "Please call Init before invoking any other methods on connection pool.");
354}
355
356template <class Connection, class Derived>
357void ConnectionPoolBase<Connection, Derived>::EnsureReset() const {
358 UASSERT_MSG(reset_,
359 "Please call Reset before base class is destroyed, otherwise no "
360 "cleanup is performed.");
361}
362
363} // namespace drivers::impl
364
365USERVER_NAMESPACE_END