跳至内容 跳至搜索

Active Record 连接池

用于管理 Active Record 数据库连接的连接池基类。

简介

连接池会同步线程对有限数量的数据库连接的访问。基本思想是每个线程从池中检出(checkout)一个数据库连接,使用该连接,然后将连接归还(checkin)回池中。 ConnectionPool 是完全线程安全的,只要正确遵循 ConnectionPool 的约定,它就能确保一个连接不会同时被两个线程使用。它还会处理连接数多于线程数的情况:如果所有连接都已被检出,而某个线程仍然尝试检出连接,那么 ConnectionPool 将会等待直到有其他线程归还连接,或者 checkout_timeout 超时。

获取(检出)连接

可以通过几种方式从连接池获取并使用连接

  1. 直接使用 ActiveRecord::Base.lease_connection。当你完成连接(们)的使用并希望将其返回到池中时,调用 ActiveRecord::Base.connection_handler.clear_active_connections!。这是 Active Record 在与 Action Pack 的请求处理周期结合使用时的默认行为。

  2. 通过 ActiveRecord::Base.connection_pool.checkout 手动从池中检出连接。完成使用后,你需要负责通过调用 ActiveRecord::Base.connection_pool.checkin(connection) 将连接归还到池中。

  3. 使用 ActiveRecord::Base.connection_pool.with_connection(&block),它会获取一个连接,将该连接作为唯一参数传递给块,并在块完成后将其返回到池中。

池中的连接实际上是 AbstractAdapter 对象(或与 AbstractAdapter 接口兼容的对象)。

当一个线程使用以上三种方法之一从池中检出一个连接时,该连接将自动成为该线程上执行的 ActiveRecord 查询所使用的连接。例如,不必显式地将检出的连接传递给 Rails 模型或查询。

选项

你可以在数据库连接配置中添加几个与连接池相关的选项

  • checkout_timeout:等待连接可用之前的秒数,超过此时间将放弃并引发超时错误(默认为 5 秒)。

  • idle_timeout:连接在池中闲置的秒数,超过此时间将被自动断开(默认为 300 秒)。设置为零表示永久保留连接。

  • keepalive:在连接闲置期间进行保持连接检查的秒数(默认为 600 秒)。

  • max_age:池允许连接存在的最长秒数,之后会在下次检入时进行淘汰(默认为 Float::INFINITY)。

  • max_connections:池可以管理的连接的最大数量(默认为 5)。设置为 nil 或 -1 表示无限连接。

  • min_connections:池将打开并维护的连接的最小数量(默认为 0)。

  • pool_jitter:应用于 max_agekeepalive 时间间隔的最大减少因子(默认为 0.2;范围 0.0-1.0)。

命名空间
方法
A
C
D
F
I
K
L
N
P
R
S
W
包含的模块

常量

WeakThreadKeyMap = ObjectSpace::WeakKeyMap
 

Attributes

[R] async_executor
[RW] automatic_reconnect
[RW] checkout_timeout
[R] db_config
[R] keepalive
[R] max_age
[R] max_connections
[R] min_connections
[R] pool_config
[R] reaper
[R] role
[R] shard
[R] size

类公共方法

install_executor_hooks(executor = ActiveSupport::Executor)

# File activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb, line 231
def install_executor_hooks(executor = ActiveSupport::Executor)
  executor.register_hook(ExecutorHooks)
end

new(pool_config)

创建一个新的 ConnectionPool 对象。 pool_config 是一个 PoolConfig 对象,它描述了数据库连接信息(例如,适配器、主机名、用户名、密码等),以及此 ConnectionPool 的最大大小。

默认的 ConnectionPool 最大大小为 5。

# File activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb, line 251
def initialize(pool_config)
  super()

  @pool_config = pool_config
  @db_config = pool_config.db_config
  @role = pool_config.role
  @shard = pool_config.shard

  @checkout_timeout = db_config.checkout_timeout
  @idle_timeout = db_config.idle_timeout
  @max_connections = db_config.max_connections
  @min_connections = db_config.min_connections
  @max_age = db_config.max_age
  @keepalive = db_config.keepalive

  # This variable tracks the cache of threads mapped to reserved connections, with the
  # sole purpose of speeding up the +connection+ method. It is not the authoritative
  # registry of which thread owns which connection. Connection ownership is tracked by
  # the +connection.owner+ attr on each +connection+ instance.
  # The invariant works like this: if there is mapping of <tt>thread => conn</tt>,
  # then that +thread+ does indeed own that +conn+. However, an absence of such
  # mapping does not mean that the +thread+ doesn't own the said connection. In
  # that case +conn.owner+ attr should be consulted.
  # Access and modification of <tt>@leases</tt> does not require
  # synchronization.
  @leases = LeaseRegistry.new

  @connections         = []
  @automatic_reconnect = true

  # Connection pool allows for concurrent (outside the main +synchronize+ section)
  # establishment of new connections. This variable tracks the number of threads
  # currently in the process of independently establishing connections to the DB.
  @now_connecting = 0

  # Sometimes otherwise-idle connections are temporarily held by the Reaper for
  # maintenance. This variable tracks the number of connections currently in that
  # state -- if a thread requests a connection and there are none available, it
  # will await any in-maintenance connections in preference to creating a new one.
  @maintaining = 0

  @threads_blocking_new_connections = 0

  @available = ConnectionLeasingQueue.new self
  @pinned_connection = nil
  @pinned_connections_depth = 0

  @async_executor = build_async_executor

  @schema_cache = nil

  @activated = false
  @original_context = ActiveSupport::IsolatedExecutionState.context

  @reaper_lock = Monitor.new
  @reaper = Reaper.new(self, db_config.reaping_frequency)
  @reaper.run
end

实例公共方法

activate()

# File activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb, line 342
def activate
  @activated = true
end

activated?()

# File activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb, line 346
def activated?
  @activated
end

active_connection?()

如果当前线程有一个正在使用的开放连接,则返回 true。

此方法仅适用于通过 lease_connectionwith_connection 方法获得的连接。通过 checkout 获得的连接不会被 active_connection? 检测到。

# File activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb, line 419
def active_connection?
  connection_lease.connection
end

checkin(conn)

将数据库连接归还到池中,表示你不再需要此连接。

conn:一个 AbstractAdapter 对象,该对象之前是通过在此池上调用 checkout 获取的。

# File activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb, line 658
def checkin(conn)
  return if @pinned_connection.equal?(conn)

  conn.lock.synchronize do
    synchronize do
      connection_lease.clear(conn)
      conn.expire
      @available.add conn
    end
  end
end

checkout(checkout_timeout = @checkout_timeout)

从池中检出一个数据库连接,表示你希望使用它。完成后你应该调用 checkin

这可以通过返回并租用现有连接,或创建新连接并租用它来完成。

如果所有连接都已被租用且池已满(意味着当前租用连接的数量大于或等于设定的容量限制),将引发 ActiveRecord::ConnectionTimeoutError 异常。

返回: 一个 AbstractAdapter 对象。

引发

# File activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb, line 630
def checkout(checkout_timeout = @checkout_timeout)
  return checkout_and_verify(acquire_connection(checkout_timeout)) unless @pinned_connection

  @pinned_connection.lock.synchronize do
    synchronize do
      # The pinned connection may have been cleaned up before we synchronized, so check if it is still present
      if @pinned_connection
        @pinned_connection.verify!

        # Any leased connection must be in @connections otherwise
        # some methods like #connected? won't behave correctly
        unless @connections.include?(@pinned_connection)
          @connections << @pinned_connection
        end

        @pinned_connection
      else
        checkout_and_verify(acquire_connection(checkout_timeout))
      end
    end
  end
end

clear_reloadable_connections(raise_on_acquisition_timeout = true)

从池中清除可重载的连接,并重新连接需要重载的连接。

引发

# File activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb, line 588
def clear_reloadable_connections(raise_on_acquisition_timeout = true)
  with_exclusively_acquired_all_connections(raise_on_acquisition_timeout) do
    synchronize do
      @connections.each do |conn|
        if conn.in_use?
          conn.steal!
          checkin conn
        end
        conn.disconnect! if conn.requires_reloading?
      end
      @connections.delete_if(&:requires_reloading?)
      @available.clear
    end
  end
end

clear_reloadable_connections!()

从池中清除可重载的连接,并重新连接需要重载的连接。

池首先尝试获得所有连接的所有权。如果在超时间隔内无法做到这一点(默认持续时间为 spec.db_config.checkout_timeout * 2 秒),则池将强制清除缓存并重新加载连接,而不考虑其他拥有连接的线程。

# File activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb, line 612
def clear_reloadable_connections!
  clear_reloadable_connections(false)
end

connected?()

如果连接已打开,则返回 true。

# File activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb, line 490
def connected?
  synchronize { @connections.any?(&:connected?) }
end

connections()

返回一个包含池中当前连接的数组。对数组的访问不需要对池进行同步,因为数组是新创建的,不会被池保留。

但是;此方法绕过了 ConnectionPool 的线程安全连接访问模式。返回的连接可能属于另一个线程、未拥有,或者碰巧属于调用线程。

在连接上调用方法而不拥有它,受底层方法线程安全性的保证。连接适配器类上的许多方法本质上都是多线程不安全的。

# File activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb, line 505
def connections
  synchronize { @connections.dup }
end

disconnect(raise_on_acquisition_timeout = true)

断开池中的所有连接,并清空池。

引发

# File activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb, line 515
def disconnect(raise_on_acquisition_timeout = true)
  @reaper_lock.synchronize do
    return if self.discarded?

    with_exclusively_acquired_all_connections(raise_on_acquisition_timeout) do
      synchronize do
        return if self.discarded?
        @connections.each do |conn|
          if conn.in_use?
            conn.steal!
            checkin conn
          end
          conn.disconnect!
        end
        @connections = []
        @leases.clear
        @available.clear

        # Stop maintaining the minimum size until reactivated
        @activated = false
      end
    end
  end
end

disconnect!()

断开池中的所有连接,并清空池。

池首先尝试获得所有连接的所有权。如果在超时间隔内无法做到这一点(默认持续时间为 spec.db_config.checkout_timeout * 2 秒),则池将被强制断开,而不考虑其他拥有连接的线程。

# File activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb, line 546
def disconnect!
  disconnect(false)
end

flush(minimum_idle = @idle_timeout)

断开所有已闲置至少 minimum_idle 秒的连接。当前已被检出或在 minimum_idle 秒内才被检入的连接不受影响。

# File activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb, line 727
def flush(minimum_idle = @idle_timeout)
  return if minimum_idle.nil?

  removed_connections = synchronize do
    return if self.discarded?

    idle_connections = @connections.select do |conn|
      !conn.in_use? && conn.seconds_idle >= minimum_idle
    end.sort_by { |conn| -conn.seconds_idle } # sort longest idle first

    # Don't go below our configured pool minimum unless we're flushing
    # everything
    idles_to_retain =
      if minimum_idle > 0
        @min_connections - (@connections.size - idle_connections.size)
      else
        0
      end

    if idles_to_retain > 0
      idle_connections.pop idles_to_retain
    end

    idle_connections.each do |conn|
      conn.lease

      @available.delete conn
      @connections.delete conn
    end
  end

  removed_connections.each do |conn|
    conn.disconnect!
  end
end

flush!()

断开所有当前空闲的连接。当前已被检出的连接不受影响。池将停止维护其最小连接数,直到它被重新激活(例如,通过后续的检出)。

# File activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb, line 766
def flush!
  reap
  flush(-1)

  # Stop maintaining the minimum size until reactivated
  @activated = false
end

keep_alive(threshold = @keepalive)

检查是否有超过配置的保持连接时间的闲置连接。这会间接验证连接是否仍然有效,但主要目的是告知服务器(以及任何中间网络节点),我们仍在活跃使用连接。

# File activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb, line 825
def keep_alive(threshold = @keepalive)
  return if threshold.nil?

  sequential_maintenance -> c { (c.seconds_since_last_activity || 0) > c.pool_jitter(threshold) } do |conn|
    # conn.active? will cause some amount of network activity, which is all
    # we need to provide a keepalive signal.
    #
    # If it returns false, the connection is already broken; disconnect,
    # so it can be found and repaired.
    conn.disconnect! unless conn.active?
  end
end

lease_connection()

检索与当前线程关联的连接,如果需要,则调用 checkout 来获取一个。

lease_connection 可以被调用任意多次;连接被缓存,缓存的键是线程。

# File activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb, line 355
def lease_connection
  lease = connection_lease
  lease.connection ||= checkout
  lease.sticky = true
  lease.connection
end

pool_transaction_isolation_level()

# File activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb, line 891
def pool_transaction_isolation_level
  isolation_level_key = "activerecord_pool_transaction_isolation_level_#{db_config.name}"
  ActiveSupport::IsolatedExecutionState[isolation_level_key]
end

pool_transaction_isolation_level=(isolation_level)

# File activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb, line 896
def pool_transaction_isolation_level=(isolation_level)
  isolation_level_key = "activerecord_pool_transaction_isolation_level_#{db_config.name}"
  ActiveSupport::IsolatedExecutionState[isolation_level_key] = isolation_level
end

preconnect()

预先建立池中的所有连接。这可以避免池用户在首次使用连接(检出后)时等待连接建立。

# File activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb, line 810
def preconnect
  sequential_maintenance -> c { (!c.connected? || !c.verified?) && c.allow_preconnect } do |conn|
    conn.connect!
  rescue
    # Wholesale rescue: there's nothing we can do but move on. The
    # connection will go back to the pool, and the next consumer will
    # presumably try to connect again -- which will either work, or
    # fail and they'll be able to report the exception.
  end
end

prepopulate()

确保池至少包含配置的最小数量的连接。

# File activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb, line 776
def prepopulate
  need_new_connections = nil

  synchronize do
    return if self.discarded?

    # We don't want to start prepopulating until we know the pool is wanted,
    # so we can avoid maintaining full pools in one-off scripts etc.
    return unless @activated

    need_new_connections = @connections.size < @min_connections
  end

  if need_new_connections
    while new_conn = try_to_checkout_new_connection { @connections.size < @min_connections }
      new_conn.allow_preconnect = true
      checkin(new_conn)
    end
  end
end

reap()

恢复池中丢失的连接。当程序员忘记在线程结束时检入连接或线程意外死亡时,可能会发生连接丢失。

# File activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb, line 704
def reap
  stale_connections = synchronize do
    return if self.discarded?
    @connections.select do |conn|
      conn.in_use? && !conn.owner.alive?
    end.each do |conn|
      conn.steal!
    end
  end

  stale_connections.each do |conn|
    if conn.active?
      conn.reset!
      checkin conn
    else
      remove conn
    end
  end
end

recycle!()

立即将所有当前连接标记为待替换,相当于它们已达到 max_age——即使没有配置 max_age

# File activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb, line 841
def recycle!
  synchronize do
    return if self.discarded?

    @connections.each do |conn|
      conn.force_retirement
    end
  end

  retire_old_connections
end

release_connection(existing_lease = nil)

指示线程已完成对当前连接的使用。 release_connection 会释放连接-线程关联,并将连接返回到池中。

此方法仅适用于通过 lease_connectionwith_connection 方法获得的连接,通过 checkout 获得的连接不会被自动释放。

# File activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb, line 431
def release_connection(existing_lease = nil)
  return if self.discarded?

  if conn = connection_lease.release
    checkin conn
    return true
  end
  false
end

remove(conn)

从连接池中移除一个连接。该连接将保持打开和活跃状态,但将不再由此池管理。

# File activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb, line 672
def remove(conn)
  needs_new_connection = false

  synchronize do
    remove_connection_from_thread_cache conn

    @connections.delete conn
    @available.delete conn

    # @available.any_waiting? => true means that prior to removing this
    # conn, the pool was at its max size (@connections.size == @max_connections).
    # This would mean that any threads stuck waiting in the queue wouldn't
    # know they could checkout_new_connection, so let's do it for them.
    # Because condition-wait loop is encapsulated in the Queue class
    # (that in turn is oblivious to ConnectionPool implementation), threads
    # that are "stuck" there are helpless. They have no way of creating
    # new connections and are completely reliant on us feeding available
    # connections into the Queue.
    needs_new_connection = @available.num_waiting > @maintaining
  end

  # This is intentionally done outside of the synchronized section as we
  # would like not to hold the main mutex while checking out new connections.
  # Thus there is some chance that needs_new_connection information is now
  # stale, we can live with that (bulk_make_new_connections will make
  # sure not to exceed the pool's @max_connections limit).
  bulk_make_new_connections(1) if needs_new_connection
end

retire_old_connections(max_age = @max_age)

# File activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb, line 797
def retire_old_connections(max_age = @max_age)
  max_age ||= Float::INFINITY

  sequential_maintenance -> c { c.connection_age&.>= c.pool_jitter(max_age) } do |conn|
    # Disconnect, then return the adapter to the pool. Preconnect will
    # handle the rest.
    conn.disconnect!
  end
end

schema_cache()

# File activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb, line 317
def schema_cache
  @schema_cache ||= BoundSchemaReflection.new(schema_reflection, self)
end

schema_reflection=(schema_reflection)

# File activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb, line 321
def schema_reflection=(schema_reflection)
  pool_config.schema_reflection = schema_reflection
  @schema_cache = nil
end

stat()

返回连接池的使用统计信息。

ActiveRecord::Base.connection_pool.stat # => { size: 15, connections: 1, busy: 1, dead: 0, idle: 0, waiting: 0, checkout_timeout: 5 }
# File activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb, line 864
def stat
  synchronize do
    {
      size: size,
      connections: @connections.size,
      busy: @connections.count { |c| c.in_use? && c.owner.alive? },
      dead: @connections.count { |c| c.in_use? && !c.owner.alive? },
      idle: @connections.count { |c| !c.in_use? },
      waiting: num_waiting_in_queue,
      checkout_timeout: checkout_timeout
    }
  end
end

with_connection(prevent_permanent_checkout: false)

将一个连接从连接池传递给块。如果当前线程还没有检出连接,则会从池中检出一个连接,传递给块,并在块完成后返回到池中。如果当前线程已经检出连接(例如通过 lease_connectionwith_connection),则会传递该现有连接,并且该连接不会在块结束时自动返回到池中;预计该现有连接将被检出它的代码正确地返回到池中。

# File activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb, line 450
def with_connection(prevent_permanent_checkout: false)
  lease = connection_lease
  sticky_was = lease.sticky
  lease.sticky = false if prevent_permanent_checkout

  if lease.connection
    begin
      yield lease.connection
    ensure
      lease.sticky = sticky_was if prevent_permanent_checkout && !sticky_was
    end
  else
    begin
      yield lease.connection = checkout
    ensure
      lease.sticky = sticky_was if prevent_permanent_checkout && !sticky_was
      release_connection(lease) unless lease.sticky
    end
  end
end