13#include <boost/lockfree/queue.hpp> 
   15#include <userver/engine/async.hpp> 
   16#include <userver/engine/deadline.hpp> 
   17#include <userver/engine/get_all.hpp> 
   18#include <userver/engine/semaphore.hpp> 
   19#include <userver/logging/log.hpp> 
   20#include <userver/utils/assert.hpp> 
   22USERVER_NAMESPACE_BEGIN
 
   28class PoolWaitLimitExceededError : 
public std::runtime_error {
 
   30  PoolWaitLimitExceededError();
 
   36template <
class Connection, 
class Derived>
 
   37class ConnectionPoolBase : 
public std::enable_shared_from_this<Derived> {
 
   40  using ConnectionRawPtr = Connection*;
 
   41  using ConnectionUniquePtr = std::unique_ptr<Connection>;
 
   43  struct ConnectionHolder 
final {
 
   44    std::shared_ptr<Derived> pool_ptr;
 
   45    ConnectionUniquePtr connection_ptr;
 
   50  ConnectionPoolBase(std::size_t max_pool_size,
 
   51                     std::size_t max_simultaneously_connecting_clients);
 
   54  ~ConnectionPoolBase();
 
   60  void Init(std::size_t initial_size,
 
   61            std::chrono::milliseconds connection_setup_timeout);
 
   69  ConnectionHolder AcquireConnection(engine::Deadline deadline);
 
   72  void ReleaseConnection(ConnectionUniquePtr connection_ptr);
 
   76  ConnectionUniquePtr Pop(engine::Deadline deadline);
 
   79  ConnectionUniquePtr TryPop();
 
   86  void DoRelease(ConnectionUniquePtr connection_ptr);
 
   90  void PushConnection(engine::Deadline deadline);
 
   92  void Drop(ConnectionRawPtr connection_ptr) 
noexcept;
 
   96  std::size_t AliveConnectionsCountApprox() 
const;
 
  102  void NotifyConnectionWontBeReleased();
 
  105  Derived& AsDerived() 
noexcept;
 
  106  const Derived& AsDerived() 
const noexcept;
 
  108  ConnectionUniquePtr CreateConnection(
 
  109      const engine::SemaphoreLock& connecting_lock, engine::Deadline deadline);
 
  113  void EnsureInitialized() 
const;
 
  114  void EnsureReset() 
const;
 
  116  engine::Semaphore given_away_semaphore_;
 
  117  engine::Semaphore connecting_semaphore_;
 
  119  boost::lockfree::queue<ConnectionRawPtr> queue_;
 
  120  std::atomic<std::size_t> alive_connections_{0};
 
  122  bool initialized_{
false};
 
  126template <
class Connection, 
class Derived>
 
  127ConnectionPoolBase<Connection, Derived>::ConnectionPoolBase(
 
  128    std::size_t max_pool_size,
 
  129    std::size_t max_simultaneously_connecting_clients)
 
  130    : given_away_semaphore_{max_pool_size},
 
  131      connecting_semaphore_{max_simultaneously_connecting_clients},
 
  132      queue_{max_pool_size} {}
 
  134template <
class Connection, 
class Derived>
 
  135ConnectionPoolBase<Connection, Derived>::~ConnectionPoolBase() {
 
  145template <
class Connection, 
class Derived>
 
  146void ConnectionPoolBase<Connection, Derived>::Init(
 
  147    std::size_t initial_size,
 
  148    std::chrono::milliseconds connection_setup_timeout) {
 
  149  UASSERT_MSG(!initialized_, 
"Calling Init multiple times is a API misuse");
 
  155  std::vector<engine::TaskWithResult<
void>> init_tasks{};
 
  156  init_tasks.reserve(initial_size);
 
  158  for (std::size_t i = 0; i < initial_size; ++i) {
 
  159    init_tasks.push_back(engine::AsyncNoSpan([
this, connection_setup_timeout] {
 
  160      PushConnection(engine::Deadline::FromDuration(connection_setup_timeout));
 
  165    engine::GetAll(init_tasks);
 
  166  } 
catch (
const std::exception& ex) {
 
  167    LOG_WARNING() << 
"Failed to properly setup connection pool: " << ex;
 
  172template <
class Connection, 
class Derived>
 
  173void ConnectionPoolBase<Connection, Derived>::Reset() {
 
  174  UASSERT_MSG(!reset_, 
"Calling Reset multiple times is a API misuse");
 
  180template <
class Connection, 
class Derived>
 
  181typename ConnectionPoolBase<Connection, Derived>::ConnectionHolder
 
  182ConnectionPoolBase<Connection, Derived>::AcquireConnection(
 
  183    engine::Deadline deadline) {
 
  186  auto connection_ptr = Pop(deadline);
 
  187  return {
this->shared_from_this(), std::move(connection_ptr)};
 
  190template <
class Connection, 
class Derived>
 
  191void ConnectionPoolBase<Connection, Derived>::ReleaseConnection(
 
  192    ConnectionUniquePtr connection_ptr) {
 
  196  DoRelease(std::move(connection_ptr));
 
  198  given_away_semaphore_.unlock_shared();
 
  199  AsDerived().AccountConnectionReleased();
 
  202template <
class Connection, 
class Derived>
 
  203Derived& ConnectionPoolBase<Connection, Derived>::AsDerived() 
noexcept {
 
  204  return *
static_cast<Derived*>(
this);
 
  207template <
class Connection, 
class Derived>
 
  208const Derived& ConnectionPoolBase<Connection, Derived>::AsDerived() 
const 
  210  return *
static_cast<
const Derived*>(
this);
 
  213template <
class Connection, 
class Derived>
 
  214typename ConnectionPoolBase<Connection, Derived>::ConnectionUniquePtr
 
  215ConnectionPoolBase<Connection, Derived>::CreateConnection(
 
  216    const engine::SemaphoreLock& connecting_lock, engine::Deadline deadline) {
 
  219  UASSERT(connecting_lock.OwnsLock());
 
  220  auto connection_ptr = AsDerived().DoCreateConnection(deadline);
 
  222  alive_connections_.fetch_add(1);
 
  223  AsDerived().AccountConnectionCreated();
 
  225  return connection_ptr;
 
  228template <
class Connection, 
class Derived>
 
  229typename ConnectionPoolBase<Connection, Derived>::ConnectionUniquePtr
 
  230ConnectionPoolBase<Connection, Derived>::Pop(engine::Deadline deadline) {
 
  233  engine::SemaphoreLock given_away_lock{given_away_semaphore_, deadline};
 
  234  if (!given_away_lock.OwnsLock()) {
 
  235    AsDerived().AccountOverload();
 
  236    throw PoolWaitLimitExceededError{};
 
  239  auto connection_ptr = TryPop();
 
  240  if (!connection_ptr) {
 
  241    engine::SemaphoreLock connecting_lock{connecting_semaphore_, deadline};
 
  243    connection_ptr = TryPop();
 
  244    if (!connection_ptr) {
 
  245      if (!connecting_lock.OwnsLock()) {
 
  246        AsDerived().AccountOverload();
 
  247        throw PoolWaitLimitExceededError{};
 
  249      connection_ptr = CreateConnection(connecting_lock, deadline);
 
  255  given_away_lock.Release();
 
  256  AsDerived().AccountConnectionAcquired();
 
  258  return connection_ptr;
 
  261template <
class Connection, 
class Derived>
 
  262typename ConnectionPoolBase<Connection, Derived>::ConnectionUniquePtr
 
  263ConnectionPoolBase<Connection, Derived>::TryPop() {
 
  266  ConnectionRawPtr connection_ptr{
nullptr};
 
  267  if (!queue_.pop(connection_ptr)) {
 
  271  return ConnectionUniquePtr{connection_ptr};
 
  274template <
class Connection, 
class Derived>
 
  275void ConnectionPoolBase<Connection, Derived>::DoRelease(
 
  276    ConnectionUniquePtr connection_ptr) {
 
  280  const auto is_broken = connection_ptr->IsBroken();
 
  281  ConnectionRawPtr connection_raw_ptr = connection_ptr.release();
 
  282  if (is_broken || !queue_.bounded_push(connection_raw_ptr)) {
 
  283    Drop(connection_raw_ptr);
 
  287template <
class Connection, 
class Derived>
 
  288void ConnectionPoolBase<Connection, Derived>::PushConnection(
 
  289    engine::Deadline deadline) {
 
  292  engine::SemaphoreLock connecting_lock{connecting_semaphore_, deadline};
 
  293  if (!connecting_lock.OwnsLock()) {
 
  294    throw PoolWaitLimitExceededError{};
 
  297  ConnectionUniquePtr connection_ptr =
 
  298      CreateConnection(connecting_lock, deadline);
 
  299  connecting_lock.Unlock();
 
  301  ConnectionRawPtr connection_raw_ptr = connection_ptr.release();
 
  302  if (!queue_.bounded_push(connection_raw_ptr)) {
 
  303    Drop(connection_raw_ptr);
 
  307template <
class Connection, 
class Derived>
 
  308void ConnectionPoolBase<Connection, Derived>::Drop(
 
  309    ConnectionRawPtr connection_ptr) 
noexcept {
 
  311  std::default_delete<Connection>{}(connection_ptr);
 
  313  alive_connections_.fetch_sub(1);
 
  315  static_assert(
noexcept(AsDerived().AccountConnectionDestroyed()),
 
  316                "Please make AccountConnectionDestroyed() noexcept, " 
  317                "because it might get called in pool destructor and is " 
  318                "expected to be noexcept");
 
  319  AsDerived().AccountConnectionDestroyed();
 
  322template <
class Connection, 
class Derived>
 
  324ConnectionPoolBase<Connection, Derived>::AliveConnectionsCountApprox() 
const {
 
  327  return alive_connections_.load();
 
  330template <
class Connection, 
class Derived>
 
  331void ConnectionPoolBase<Connection, Derived>::NotifyConnectionWontBeReleased() {
 
  334  given_away_semaphore_.unlock_shared();
 
  335  alive_connections_.fetch_sub(1);
 
  338template <
class Connection, 
class Derived>
 
  339void ConnectionPoolBase<Connection, Derived>::CleanupQueue() {
 
  342  ConnectionRawPtr connection_ptr{
nullptr};
 
  344  while (queue_.pop(connection_ptr)) {
 
  345    Drop(connection_ptr);
 
  349template <
class Connection, 
class Derived>
 
  350void ConnectionPoolBase<Connection, Derived>::EnsureInitialized() 
const {
 
  353      "Please call Init before invoking any other methods on connection pool.");
 
  356template <
class Connection, 
class Derived>
 
  357void ConnectionPoolBase<Connection, Derived>::EnsureReset() 
const {
 
  359              "Please call Reset before base class is destroyed, otherwise no " 
  360              "cleanup is performed.");