13#include <boost/lockfree/queue.hpp>
15#include <userver/engine/async.hpp>
16#include <userver/engine/deadline.hpp>
17#include <userver/engine/get_all.hpp>
18#include <userver/engine/semaphore.hpp>
19#include <userver/logging/log.hpp>
20#include <userver/utils/assert.hpp>
22USERVER_NAMESPACE_BEGIN
28class PoolWaitLimitExceededError :
public std::runtime_error {
30 PoolWaitLimitExceededError();
36template <
class Connection,
class Derived>
37class ConnectionPoolBase :
public std::enable_shared_from_this<Derived> {
40 using ConnectionRawPtr = Connection*;
41 using ConnectionUniquePtr = std::unique_ptr<Connection>;
43 struct ConnectionHolder
final {
44 std::shared_ptr<Derived> pool_ptr;
45 ConnectionUniquePtr connection_ptr;
50 ConnectionPoolBase(std::size_t max_pool_size,
51 std::size_t max_simultaneously_connecting_clients);
54 ~ConnectionPoolBase();
60 void Init(std::size_t initial_size,
61 std::chrono::milliseconds connection_setup_timeout);
69 ConnectionHolder AcquireConnection(engine::Deadline deadline);
72 void ReleaseConnection(ConnectionUniquePtr connection_ptr);
76 ConnectionUniquePtr Pop(engine::Deadline deadline);
79 ConnectionUniquePtr TryPop();
86 void DoRelease(ConnectionUniquePtr connection_ptr);
90 void PushConnection(engine::Deadline deadline);
92 void Drop(ConnectionRawPtr connection_ptr)
noexcept;
96 std::size_t AliveConnectionsCountApprox()
const;
102 void NotifyConnectionWontBeReleased();
105 Derived& AsDerived()
noexcept;
106 const Derived& AsDerived()
const noexcept;
108 ConnectionUniquePtr CreateConnection(
109 const engine::SemaphoreLock& connecting_lock, engine::Deadline deadline);
113 void EnsureInitialized()
const;
114 void EnsureReset()
const;
116 engine::Semaphore given_away_semaphore_;
117 engine::Semaphore connecting_semaphore_;
119 boost::lockfree::queue<ConnectionRawPtr> queue_;
120 std::atomic<std::size_t> alive_connections_{0};
122 bool initialized_{
false};
126template <
class Connection,
class Derived>
127ConnectionPoolBase<Connection, Derived>::ConnectionPoolBase(
128 std::size_t max_pool_size,
129 std::size_t max_simultaneously_connecting_clients)
130 : given_away_semaphore_{max_pool_size},
131 connecting_semaphore_{max_simultaneously_connecting_clients},
132 queue_{max_pool_size} {}
134template <
class Connection,
class Derived>
135ConnectionPoolBase<Connection, Derived>::~ConnectionPoolBase() {
145template <
class Connection,
class Derived>
146void ConnectionPoolBase<Connection, Derived>::Init(
147 std::size_t initial_size,
148 std::chrono::milliseconds connection_setup_timeout) {
149 UASSERT_MSG(!initialized_,
"Calling Init multiple times is a API misuse");
155 std::vector<engine::TaskWithResult<
void>> init_tasks{};
156 init_tasks.reserve(initial_size);
158 for (std::size_t i = 0; i < initial_size; ++i) {
159 init_tasks.push_back(engine::AsyncNoSpan([
this, connection_setup_timeout] {
160 PushConnection(engine::Deadline::FromDuration(connection_setup_timeout));
165 engine::GetAll(init_tasks);
166 }
catch (
const std::exception& ex) {
167 LOG_WARNING() <<
"Failed to properly setup connection pool: " << ex;
172template <
class Connection,
class Derived>
173void ConnectionPoolBase<Connection, Derived>::Reset() {
174 UASSERT_MSG(!reset_,
"Calling Reset multiple times is a API misuse");
180template <
class Connection,
class Derived>
181typename ConnectionPoolBase<Connection, Derived>::ConnectionHolder
182ConnectionPoolBase<Connection, Derived>::AcquireConnection(
183 engine::Deadline deadline) {
186 auto connection_ptr = Pop(deadline);
187 return {
this->shared_from_this(), std::move(connection_ptr)};
190template <
class Connection,
class Derived>
191void ConnectionPoolBase<Connection, Derived>::ReleaseConnection(
192 ConnectionUniquePtr connection_ptr) {
196 DoRelease(std::move(connection_ptr));
198 given_away_semaphore_.unlock_shared();
199 AsDerived().AccountConnectionReleased();
202template <
class Connection,
class Derived>
203Derived& ConnectionPoolBase<Connection, Derived>::AsDerived()
noexcept {
204 return *
static_cast<Derived*>(
this);
207template <
class Connection,
class Derived>
208const Derived& ConnectionPoolBase<Connection, Derived>::AsDerived()
const
210 return *
static_cast<
const Derived*>(
this);
213template <
class Connection,
class Derived>
214typename ConnectionPoolBase<Connection, Derived>::ConnectionUniquePtr
215ConnectionPoolBase<Connection, Derived>::CreateConnection(
216 const engine::SemaphoreLock& connecting_lock, engine::Deadline deadline) {
219 UASSERT(connecting_lock.OwnsLock());
220 auto connection_ptr = AsDerived().DoCreateConnection(deadline);
222 alive_connections_.fetch_add(1);
223 AsDerived().AccountConnectionCreated();
225 return connection_ptr;
228template <
class Connection,
class Derived>
229typename ConnectionPoolBase<Connection, Derived>::ConnectionUniquePtr
230ConnectionPoolBase<Connection, Derived>::Pop(engine::Deadline deadline) {
233 engine::SemaphoreLock given_away_lock{given_away_semaphore_, deadline};
234 if (!given_away_lock.OwnsLock()) {
235 AsDerived().AccountOverload();
236 throw PoolWaitLimitExceededError{};
239 auto connection_ptr = TryPop();
240 if (!connection_ptr) {
241 engine::SemaphoreLock connecting_lock{connecting_semaphore_, deadline};
243 connection_ptr = TryPop();
244 if (!connection_ptr) {
245 if (!connecting_lock.OwnsLock()) {
246 AsDerived().AccountOverload();
247 throw PoolWaitLimitExceededError{};
249 connection_ptr = CreateConnection(connecting_lock, deadline);
255 given_away_lock.Release();
256 AsDerived().AccountConnectionAcquired();
258 return connection_ptr;
261template <
class Connection,
class Derived>
262typename ConnectionPoolBase<Connection, Derived>::ConnectionUniquePtr
263ConnectionPoolBase<Connection, Derived>::TryPop() {
266 ConnectionRawPtr connection_ptr{
nullptr};
267 if (!queue_.pop(connection_ptr)) {
271 return ConnectionUniquePtr{connection_ptr};
274template <
class Connection,
class Derived>
275void ConnectionPoolBase<Connection, Derived>::DoRelease(
276 ConnectionUniquePtr connection_ptr) {
280 const auto is_broken = connection_ptr->IsBroken();
281 ConnectionRawPtr connection_raw_ptr = connection_ptr.release();
282 if (is_broken || !queue_.bounded_push(connection_raw_ptr)) {
283 Drop(connection_raw_ptr);
287template <
class Connection,
class Derived>
288void ConnectionPoolBase<Connection, Derived>::PushConnection(
289 engine::Deadline deadline) {
292 engine::SemaphoreLock connecting_lock{connecting_semaphore_, deadline};
293 if (!connecting_lock.OwnsLock()) {
294 throw PoolWaitLimitExceededError{};
297 ConnectionUniquePtr connection_ptr =
298 CreateConnection(connecting_lock, deadline);
299 connecting_lock.Unlock();
301 ConnectionRawPtr connection_raw_ptr = connection_ptr.release();
302 if (!queue_.bounded_push(connection_raw_ptr)) {
303 Drop(connection_raw_ptr);
307template <
class Connection,
class Derived>
308void ConnectionPoolBase<Connection, Derived>::Drop(
309 ConnectionRawPtr connection_ptr)
noexcept {
311 std::default_delete<Connection>{}(connection_ptr);
313 alive_connections_.fetch_sub(1);
315 static_assert(
noexcept(AsDerived().AccountConnectionDestroyed()),
316 "Please make AccountConnectionDestroyed() noexcept, "
317 "because it might get called in pool destructor and is "
318 "expected to be noexcept");
319 AsDerived().AccountConnectionDestroyed();
322template <
class Connection,
class Derived>
324ConnectionPoolBase<Connection, Derived>::AliveConnectionsCountApprox()
const {
327 return alive_connections_.load();
330template <
class Connection,
class Derived>
331void ConnectionPoolBase<Connection, Derived>::NotifyConnectionWontBeReleased() {
334 given_away_semaphore_.unlock_shared();
335 alive_connections_.fetch_sub(1);
338template <
class Connection,
class Derived>
339void ConnectionPoolBase<Connection, Derived>::CleanupQueue() {
342 ConnectionRawPtr connection_ptr{
nullptr};
344 while (queue_.pop(connection_ptr)) {
345 Drop(connection_ptr);
349template <
class Connection,
class Derived>
350void ConnectionPoolBase<Connection, Derived>::EnsureInitialized()
const {
353 "Please call Init before invoking any other methods on connection pool.");
356template <
class Connection,
class Derived>
357void ConnectionPoolBase<Connection, Derived>::EnsureReset()
const {
359 "Please call Reset before base class is destroyed, otherwise no "
360 "cleanup is performed.");