Active Job Continuation¶ ↑
Continuations 提供了一种中断和恢复作业的机制。这使得长时间运行的作业能够在应用程序重启后继续执行。
作业应包含 ActiveJob::Continuable 模块以启用 continuations。Continuable 作业在中断后会自动重试。
使用 step 方法来定义作业中的步骤。步骤可以使用可选的光标来跟踪步骤中的进度。
步骤在遇到时会立即执行。如果作业被中断,之前已完成的步骤将被跳过。如果一个步骤正在进行中,它将使用最后记录的光标恢复。
不属于任何步骤的代码将在每次作业运行时执行。
您可以将块或方法名传递给 step 方法。块将以 step 对象作为参数进行调用。方法可以不接受参数,也可以接受一个 step 对象作为单个参数。
class ProcessImportJob < ApplicationJob include ActiveJob::Continuable def perform(import_id) # This always runs, even if the job is resumed. @import = Import.find(import_id) step :validate do @import.validate! end step(:process_records) do |step| @import.records.find_each(start: step.cursor) do |record| record.process step.advance! from: record.id end end step :reprocess_records step :finalize end def reprocess_records(step) @import.records.find_each(start: step.cursor) do |record| record.reprocess step.advance! from: record.id end end def finalize @import.finalize! end end
Cursors¶ ↑
Cursors 用于跟踪步骤内的进度。cursor 可以是任何可以序列化为 ActiveJob::Base.serialize 参数的对象。它默认为 nil。
当一个步骤恢复时,最后一个 cursor 值会被恢复。步骤中的代码负责使用 cursor 从正确的位置继续。
set! 将 cursor 设置为特定值。
step :iterate_items do |step| items[step.cursor..].each do |item| process(item) step.set! (step.cursor || 0) + 1 end end
可以在定义步骤时设置 cursor 的起始值
step :iterate_items, start: 0 do |step| items[step.cursor..].each do |item| process(item) step.set! step.cursor + 1 end end
可以使用 advance! 来推进 cursor。这会调用当前 cursor 值的 succ 方法。如果 cursor 没有实现 succ,它将引发 ActiveJob::Continuation::UnadvanceableCursorError。
step :iterate_items, start: 0 do |step| items[step.cursor..].each do |item| process(item) step.advance! end end
您可以选择将 from 参数传递给 advance!。这在使用 ID 可能不连续的记录集合进行迭代时很有用。
step :process_records do |step| import.records.find_each(start: step.cursor) do |record| record.process step.advance! from: record.id end end
您可以使用数组来迭代嵌套记录
step :process_nested_records, start: [ 0, 0 ] do |step| Account.find_each(start: step.cursor[0]) do |account| account.records.find_each(start: step.cursor[1]) do |record| record.process step.set! [ account.id, record.id + 1 ] end step.set! [ account.id + 1, 0 ] end end
设置或推进 cursor 会创建一个检查点。您也可以通过调用 step 上的 checkpoint! 方法来手动创建检查点。如果您想允许中断,但又不需要更新 cursor,这很有用。
step :destroy_records do |step| import.records.find_each do |record| record.destroy! step.checkpoint! end end
Checkpoints¶ ↑
检查点是作业可以被中断的地方。在检查点处,作业将调用 queue_adapter.stopping?。如果返回 true,作业将引发 ActiveJob::Continuation::Interrupt 异常。
在每次作业执行的每个步骤开始之前(第一个步骤除外)都会有一个自动检查点。在一个步骤内部,在调用 set!、advance! 或 checkpoint! 时会创建一个检查点。
当队列适配器被标记为正在停止时,作业不会自动中断——它们将继续运行,直到下一个检查点,或者当进程停止时。
这是为了允许在安全点中断作业,但这也意味着作业应该比关机超时时间更频繁地进行检查点,以确保平稳重启。
中断时,作业将自动重试,其进度序列化在作业数据中,位于 continuation 键下。
序列化的进度包含
-
已完成步骤的列表
-
当前步骤及其 cursor 值(如果正在进行中)
Isolated Steps¶ ↑
除非作业被中断,否则步骤将在单个作业执行中顺序运行。
通过传递 +isolated: true+ 选项,您可以指定一个步骤始终在自己的执行中运行。
这对于长时间运行的步骤很有用,因为这些步骤可能无法在作业的宽限期内进行检查点——它确保在步骤开始之前将进度序列化回作业数据。
step :quick_step1 step :slow_step, isolated: true step :quick_step2 step :quick_step3
Errors¶ ↑
如果作业引发错误但未通过 Active Job 重试,它将被传递回底层队列后端,并且此执行中的任何进度都将丢失。
为了缓解这种情况,如果作业在取得进度后引发错误,它将自动重试。取得进度被定义为已完成一个步骤或在当前步骤中推进了 cursor。
Configuration¶ ↑
Continuable 作业有几个配置选项
-
:max_resumptions- 作业可以恢复的最大次数。默认为nil,表示无限次恢复。 -
:resume_options- 恢复作业时要传递给retry_job的选项。默认为{ wait: 5.seconds }。有关可用选项,请参阅ActiveJob::Exceptions#retry_job。 -
:resume_errors_after_advancing- 在推进 continuation 后是否恢复错误。默认为true。
- 模块 ActiveJob::Continuation::TestHelper
- 类 ActiveJob::Continuation::CheckpointError
- 类 ActiveJob::Continuation::Error
- 类 ActiveJob::Continuation::Interrupt
- 类 ActiveJob::Continuation::InvalidStepError
- 类 ActiveJob::Continuation::ResumeLimitError
- 类 ActiveJob::Continuation::Step
- 类 ActiveJob::Continuation::UnadvanceableCursorError
- A
- I
- S