Active Record 连接池¶ ↑
用于管理 Active Record 数据库连接的连接池基类。
简介¶ ↑
连接池会同步线程对有限数量的数据库连接的访问。基本思想是每个线程从池中检出(checkout)一个数据库连接,使用该连接,然后将连接归还(checkin)回池中。 ConnectionPool 是完全线程安全的,只要正确遵循 ConnectionPool 的约定,它就能确保一个连接不会同时被两个线程使用。它还会处理连接数多于线程数的情况:如果所有连接都已被检出,而某个线程仍然尝试检出连接,那么 ConnectionPool 将会等待直到有其他线程归还连接,或者 checkout_timeout 超时。
获取(检出)连接¶ ↑
可以通过几种方式从连接池获取并使用连接
-
直接使用 ActiveRecord::Base.lease_connection。当你完成连接(们)的使用并希望将其返回到池中时,调用 ActiveRecord::Base.connection_handler.clear_active_connections!。这是 Active Record 在与 Action Pack 的请求处理周期结合使用时的默认行为。
-
通过 ActiveRecord::Base.connection_pool.checkout 手动从池中检出连接。完成使用后,你需要负责通过调用 ActiveRecord::Base.connection_pool.checkin(connection) 将连接归还到池中。
-
使用 ActiveRecord::Base.connection_pool.with_connection(&block),它会获取一个连接,将该连接作为唯一参数传递给块,并在块完成后将其返回到池中。
池中的连接实际上是 AbstractAdapter 对象(或与 AbstractAdapter 接口兼容的对象)。
当一个线程使用以上三种方法之一从池中检出一个连接时,该连接将自动成为该线程上执行的 ActiveRecord 查询所使用的连接。例如,不必显式地将检出的连接传递给 Rails 模型或查询。
选项¶ ↑
你可以在数据库连接配置中添加几个与连接池相关的选项
-
checkout_timeout:等待连接可用之前的秒数,超过此时间将放弃并引发超时错误(默认为 5 秒)。 -
idle_timeout:连接在池中闲置的秒数,超过此时间将被自动断开(默认为 300 秒)。设置为零表示永久保留连接。 -
keepalive:在连接闲置期间进行保持连接检查的秒数(默认为 600 秒)。 -
max_age:池允许连接存在的最长秒数,之后会在下次检入时进行淘汰(默认为 Float::INFINITY)。 -
max_connections:池可以管理的连接的最大数量(默认为 5)。设置为nil或 -1 表示无限连接。 -
min_connections:池将打开并维护的连接的最小数量(默认为 0)。 -
pool_jitter:应用于max_age和keepalive时间间隔的最大减少因子(默认为 0.2;范围 0.0-1.0)。
- 类 ActiveRecord::ConnectionAdapters::ConnectionPool::Queue
- 类 ActiveRecord::ConnectionAdapters::ConnectionPool::Reaper
- A
- C
- D
- F
- I
- K
- L
- N
- P
- R
- S
- W
- MonitorMixin
常量
| WeakThreadKeyMap | = | ObjectSpace::WeakKeyMap |
Attributes
| [R] | async_executor | |
| [RW] | automatic_reconnect | |
| [RW] | checkout_timeout | |
| [R] | db_config | |
| [R] | keepalive | |
| [R] | max_age | |
| [R] | max_connections | |
| [R] | min_connections | |
| [R] | pool_config | |
| [R] | reaper | |
| [R] | role | |
| [R] | shard | |
| [R] | size |
类公共方法
install_executor_hooks(executor = ActiveSupport::Executor) 链接
来源: 显示 | 在 GitHub 上
# File activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb, line 231 def install_executor_hooks(executor = ActiveSupport::Executor) executor.register_hook(ExecutorHooks) end
new(pool_config) 链接
创建一个新的 ConnectionPool 对象。 pool_config 是一个 PoolConfig 对象,它描述了数据库连接信息(例如,适配器、主机名、用户名、密码等),以及此 ConnectionPool 的最大大小。
默认的 ConnectionPool 最大大小为 5。
来源: 显示 | 在 GitHub 上
# File activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb, line 251 def initialize(pool_config) super() @pool_config = pool_config @db_config = pool_config.db_config @role = pool_config.role @shard = pool_config.shard @checkout_timeout = db_config.checkout_timeout @idle_timeout = db_config.idle_timeout @max_connections = db_config.max_connections @min_connections = db_config.min_connections @max_age = db_config.max_age @keepalive = db_config.keepalive # This variable tracks the cache of threads mapped to reserved connections, with the # sole purpose of speeding up the +connection+ method. It is not the authoritative # registry of which thread owns which connection. Connection ownership is tracked by # the +connection.owner+ attr on each +connection+ instance. # The invariant works like this: if there is mapping of <tt>thread => conn</tt>, # then that +thread+ does indeed own that +conn+. However, an absence of such # mapping does not mean that the +thread+ doesn't own the said connection. In # that case +conn.owner+ attr should be consulted. # Access and modification of <tt>@leases</tt> does not require # synchronization. @leases = LeaseRegistry.new @connections = [] @automatic_reconnect = true # Connection pool allows for concurrent (outside the main +synchronize+ section) # establishment of new connections. This variable tracks the number of threads # currently in the process of independently establishing connections to the DB. @now_connecting = 0 # Sometimes otherwise-idle connections are temporarily held by the Reaper for # maintenance. This variable tracks the number of connections currently in that # state -- if a thread requests a connection and there are none available, it # will await any in-maintenance connections in preference to creating a new one. @maintaining = 0 @threads_blocking_new_connections = 0 @available = ConnectionLeasingQueue.new self @pinned_connection = nil @pinned_connections_depth = 0 @async_executor = build_async_executor @schema_cache = nil @activated = false @original_context = ActiveSupport::IsolatedExecutionState.context @reaper_lock = Monitor.new @reaper = Reaper.new(self, db_config.reaping_frequency) @reaper.run end
实例公共方法
activate() 链接
来源: 显示 | 在 GitHub 上
# File activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb, line 342 def activate @activated = true end
activated?() 链接
来源: 显示 | 在 GitHub 上
# File activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb, line 346 def activated? @activated end
active_connection?() 链接
如果当前线程有一个正在使用的开放连接,则返回 true。
此方法仅适用于通过 lease_connection 或 with_connection 方法获得的连接。通过 checkout 获得的连接不会被 active_connection? 检测到。
来源: 显示 | 在 GitHub 上
# File activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb, line 419 def active_connection? connection_lease.connection end
checkin(conn) 链接
将数据库连接归还到池中,表示你不再需要此连接。
conn:一个 AbstractAdapter 对象,该对象之前是通过在此池上调用 checkout 获取的。
来源: 显示 | 在 GitHub 上
# File activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb, line 658 def checkin(conn) return if @pinned_connection.equal?(conn) conn.lock.synchronize do synchronize do connection_lease.clear(conn) conn.expire @available.add conn end end end
checkout(checkout_timeout = @checkout_timeout) 链接
从池中检出一个数据库连接,表示你希望使用它。完成后你应该调用 checkin。
这可以通过返回并租用现有连接,或创建新连接并租用它来完成。
如果所有连接都已被租用且池已满(意味着当前租用连接的数量大于或等于设定的容量限制),将引发 ActiveRecord::ConnectionTimeoutError 异常。
返回: 一个 AbstractAdapter 对象。
引发
-
ActiveRecord::ConnectionTimeoutError无法从池中获取连接。
来源: 显示 | 在 GitHub 上
# File activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb, line 630 def checkout(checkout_timeout = @checkout_timeout) return checkout_and_verify(acquire_connection(checkout_timeout)) unless @pinned_connection @pinned_connection.lock.synchronize do synchronize do # The pinned connection may have been cleaned up before we synchronized, so check if it is still present if @pinned_connection @pinned_connection.verify! # Any leased connection must be in @connections otherwise # some methods like #connected? won't behave correctly unless @connections.include?(@pinned_connection) @connections << @pinned_connection end @pinned_connection else checkout_and_verify(acquire_connection(checkout_timeout)) end end end end
clear_reloadable_connections(raise_on_acquisition_timeout = true) 链接
从池中清除可重载的连接,并重新连接需要重载的连接。
引发
-
ActiveRecord::ExclusiveConnectionTimeoutError如果在超时间隔内无法获得池中所有连接的所有权(默认持续时间为spec.db_config.checkout_timeout * 2秒)。
来源: 显示 | 在 GitHub 上
# File activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb, line 588 def clear_reloadable_connections(raise_on_acquisition_timeout = true) with_exclusively_acquired_all_connections(raise_on_acquisition_timeout) do synchronize do @connections.each do |conn| if conn.in_use? conn.steal! checkin conn end conn.disconnect! if conn.requires_reloading? end @connections.delete_if(&:requires_reloading?) @available.clear end end end
clear_reloadable_connections!() 链接
从池中清除可重载的连接,并重新连接需要重载的连接。
池首先尝试获得所有连接的所有权。如果在超时间隔内无法做到这一点(默认持续时间为 spec.db_config.checkout_timeout * 2 秒),则池将强制清除缓存并重新加载连接,而不考虑其他拥有连接的线程。
来源: 显示 | 在 GitHub 上
# File activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb, line 612 def clear_reloadable_connections! clear_reloadable_connections(false) end
connected?() 链接
如果连接已打开,则返回 true。
来源: 显示 | 在 GitHub 上
# File activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb, line 490 def connected? synchronize { @connections.any?(&:connected?) } end
connections() 链接
返回一个包含池中当前连接的数组。对数组的访问不需要对池进行同步,因为数组是新创建的,不会被池保留。
但是;此方法绕过了 ConnectionPool 的线程安全连接访问模式。返回的连接可能属于另一个线程、未拥有,或者碰巧属于调用线程。
在连接上调用方法而不拥有它,受底层方法线程安全性的保证。连接适配器类上的许多方法本质上都是多线程不安全的。
来源: 显示 | 在 GitHub 上
# File activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb, line 505 def connections synchronize { @connections.dup } end
disconnect(raise_on_acquisition_timeout = true) 链接
断开池中的所有连接,并清空池。
引发
-
ActiveRecord::ExclusiveConnectionTimeoutError如果在超时间隔内无法获得池中所有连接的所有权(默认持续时间为spec.db_config.checkout_timeout * 2秒)。
来源: 显示 | 在 GitHub 上
# File activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb, line 515 def disconnect(raise_on_acquisition_timeout = true) @reaper_lock.synchronize do return if self.discarded? with_exclusively_acquired_all_connections(raise_on_acquisition_timeout) do synchronize do return if self.discarded? @connections.each do |conn| if conn.in_use? conn.steal! checkin conn end conn.disconnect! end @connections = [] @leases.clear @available.clear # Stop maintaining the minimum size until reactivated @activated = false end end end end
disconnect!() 链接
断开池中的所有连接,并清空池。
池首先尝试获得所有连接的所有权。如果在超时间隔内无法做到这一点(默认持续时间为 spec.db_config.checkout_timeout * 2 秒),则池将被强制断开,而不考虑其他拥有连接的线程。
来源: 显示 | 在 GitHub 上
# File activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb, line 546 def disconnect! disconnect(false) end
flush(minimum_idle = @idle_timeout) 链接
断开所有已闲置至少 minimum_idle 秒的连接。当前已被检出或在 minimum_idle 秒内才被检入的连接不受影响。
来源: 显示 | 在 GitHub 上
# File activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb, line 727 def flush(minimum_idle = @idle_timeout) return if minimum_idle.nil? removed_connections = synchronize do return if self.discarded? idle_connections = @connections.select do |conn| !conn.in_use? && conn.seconds_idle >= minimum_idle end.sort_by { |conn| -conn.seconds_idle } # sort longest idle first # Don't go below our configured pool minimum unless we're flushing # everything idles_to_retain = if minimum_idle > 0 @min_connections - (@connections.size - idle_connections.size) else 0 end if idles_to_retain > 0 idle_connections.pop idles_to_retain end idle_connections.each do |conn| conn.lease @available.delete conn @connections.delete conn end end removed_connections.each do |conn| conn.disconnect! end end
flush!() 链接
断开所有当前空闲的连接。当前已被检出的连接不受影响。池将停止维护其最小连接数,直到它被重新激活(例如,通过后续的检出)。
来源: 显示 | 在 GitHub 上
# File activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb, line 766 def flush! reap flush(-1) # Stop maintaining the minimum size until reactivated @activated = false end
keep_alive(threshold = @keepalive) 链接
检查是否有超过配置的保持连接时间的闲置连接。这会间接验证连接是否仍然有效,但主要目的是告知服务器(以及任何中间网络节点),我们仍在活跃使用连接。
来源: 显示 | 在 GitHub 上
# File activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb, line 825 def keep_alive(threshold = @keepalive) return if threshold.nil? sequential_maintenance -> c { (c.seconds_since_last_activity || 0) > c.pool_jitter(threshold) } do |conn| # conn.active? will cause some amount of network activity, which is all # we need to provide a keepalive signal. # # If it returns false, the connection is already broken; disconnect, # so it can be found and repaired. conn.disconnect! unless conn.active? end end
lease_connection() 链接
检索与当前线程关联的连接,如果需要,则调用 checkout 来获取一个。
lease_connection 可以被调用任意多次;连接被缓存,缓存的键是线程。
来源: 显示 | 在 GitHub 上
# File activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb, line 355 def lease_connection lease = connection_lease lease.connection ||= checkout lease.sticky = true lease.connection end
pool_transaction_isolation_level() 链接
来源: 显示 | 在 GitHub 上
# File activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb, line 891 def pool_transaction_isolation_level isolation_level_key = "activerecord_pool_transaction_isolation_level_#{db_config.name}" ActiveSupport::IsolatedExecutionState[isolation_level_key] end
pool_transaction_isolation_level=(isolation_level) 链接
来源: 显示 | 在 GitHub 上
# File activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb, line 896 def pool_transaction_isolation_level=(isolation_level) isolation_level_key = "activerecord_pool_transaction_isolation_level_#{db_config.name}" ActiveSupport::IsolatedExecutionState[isolation_level_key] = isolation_level end
preconnect() 链接
预先建立池中的所有连接。这可以避免池用户在首次使用连接(检出后)时等待连接建立。
来源: 显示 | 在 GitHub 上
# File activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb, line 810 def preconnect sequential_maintenance -> c { (!c.connected? || !c.verified?) && c.allow_preconnect } do |conn| conn.connect! rescue # Wholesale rescue: there's nothing we can do but move on. The # connection will go back to the pool, and the next consumer will # presumably try to connect again -- which will either work, or # fail and they'll be able to report the exception. end end
prepopulate() 链接
确保池至少包含配置的最小数量的连接。
来源: 显示 | 在 GitHub 上
# File activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb, line 776 def prepopulate need_new_connections = nil synchronize do return if self.discarded? # We don't want to start prepopulating until we know the pool is wanted, # so we can avoid maintaining full pools in one-off scripts etc. return unless @activated need_new_connections = @connections.size < @min_connections end if need_new_connections while new_conn = try_to_checkout_new_connection { @connections.size < @min_connections } new_conn.allow_preconnect = true checkin(new_conn) end end end
reap() 链接
恢复池中丢失的连接。当程序员忘记在线程结束时检入连接或线程意外死亡时,可能会发生连接丢失。
来源: 显示 | 在 GitHub 上
# File activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb, line 704 def reap stale_connections = synchronize do return if self.discarded? @connections.select do |conn| conn.in_use? && !conn.owner.alive? end.each do |conn| conn.steal! end end stale_connections.each do |conn| if conn.active? conn.reset! checkin conn else remove conn end end end
recycle!() 链接
来源: 显示 | 在 GitHub 上
# File activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb, line 841 def recycle! synchronize do return if self.discarded? @connections.each do |conn| conn.force_retirement end end retire_old_connections end
release_connection(existing_lease = nil) 链接
指示线程已完成对当前连接的使用。 release_connection 会释放连接-线程关联,并将连接返回到池中。
此方法仅适用于通过 lease_connection 或 with_connection 方法获得的连接,通过 checkout 获得的连接不会被自动释放。
来源: 显示 | 在 GitHub 上
# File activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb, line 431 def release_connection(existing_lease = nil) return if self.discarded? if conn = connection_lease.release checkin conn return true end false end
remove(conn) 链接
从连接池中移除一个连接。该连接将保持打开和活跃状态,但将不再由此池管理。
来源: 显示 | 在 GitHub 上
# File activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb, line 672 def remove(conn) needs_new_connection = false synchronize do remove_connection_from_thread_cache conn @connections.delete conn @available.delete conn # @available.any_waiting? => true means that prior to removing this # conn, the pool was at its max size (@connections.size == @max_connections). # This would mean that any threads stuck waiting in the queue wouldn't # know they could checkout_new_connection, so let's do it for them. # Because condition-wait loop is encapsulated in the Queue class # (that in turn is oblivious to ConnectionPool implementation), threads # that are "stuck" there are helpless. They have no way of creating # new connections and are completely reliant on us feeding available # connections into the Queue. needs_new_connection = @available.num_waiting > @maintaining end # This is intentionally done outside of the synchronized section as we # would like not to hold the main mutex while checking out new connections. # Thus there is some chance that needs_new_connection information is now # stale, we can live with that (bulk_make_new_connections will make # sure not to exceed the pool's @max_connections limit). bulk_make_new_connections(1) if needs_new_connection end
retire_old_connections(max_age = @max_age) 链接
来源: 显示 | 在 GitHub 上
# File activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb, line 797 def retire_old_connections(max_age = @max_age) max_age ||= Float::INFINITY sequential_maintenance -> c { c.connection_age&.>= c.pool_jitter(max_age) } do |conn| # Disconnect, then return the adapter to the pool. Preconnect will # handle the rest. conn.disconnect! end end
schema_cache() 链接
来源: 显示 | 在 GitHub 上
# File activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb, line 317 def schema_cache @schema_cache ||= BoundSchemaReflection.new(schema_reflection, self) end
schema_reflection=(schema_reflection) 链接
来源: 显示 | 在 GitHub 上
# File activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb, line 321 def schema_reflection=(schema_reflection) pool_config.schema_reflection = schema_reflection @schema_cache = nil end
stat() 链接
返回连接池的使用统计信息。
ActiveRecord::Base.connection_pool.stat # => { size: 15, connections: 1, busy: 1, dead: 0, idle: 0, waiting: 0, checkout_timeout: 5 }
来源: 显示 | 在 GitHub 上
# File activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb, line 864 def stat synchronize do { size: size, connections: @connections.size, busy: @connections.count { |c| c.in_use? && c.owner.alive? }, dead: @connections.count { |c| c.in_use? && !c.owner.alive? }, idle: @connections.count { |c| !c.in_use? }, waiting: num_waiting_in_queue, checkout_timeout: checkout_timeout } end end
with_connection(prevent_permanent_checkout: false) 链接
将一个连接从连接池传递给块。如果当前线程还没有检出连接,则会从池中检出一个连接,传递给块,并在块完成后返回到池中。如果当前线程已经检出连接(例如通过 lease_connection 或 with_connection),则会传递该现有连接,并且该连接不会在块结束时自动返回到池中;预计该现有连接将被检出它的代码正确地返回到池中。
来源: 显示 | 在 GitHub 上
# File activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb, line 450 def with_connection(prevent_permanent_checkout: false) lease = connection_lease sticky_was = lease.sticky lease.sticky = false if prevent_permanent_checkout if lease.connection begin yield lease.connection ensure lease.sticky = sticky_was if prevent_permanent_checkout && !sticky_was end else begin yield lease.connection = checkout ensure lease.sticky = sticky_was if prevent_permanent_checkout && !sticky_was release_connection(lease) unless lease.sticky end end end