跳至内容 跳至搜索

Active Job Continuation

Continuations 提供了一种中断和恢复作业的机制。这使得长时间运行的作业能够在应用程序重启后继续执行。

作业应包含 ActiveJob::Continuable 模块以启用 continuations。Continuable 作业在中断后会自动重试。

使用 step 方法来定义作业中的步骤。步骤可以使用可选的光标来跟踪步骤中的进度。

步骤在遇到时会立即执行。如果作业被中断,之前已完成的步骤将被跳过。如果一个步骤正在进行中,它将使用最后记录的光标恢复。

不属于任何步骤的代码将在每次作业运行时执行。

您可以将块或方法名传递给 step 方法。块将以 step 对象作为参数进行调用。方法可以不接受参数,也可以接受一个 step 对象作为单个参数。

class ProcessImportJob < ApplicationJob
  include ActiveJob::Continuable

  def perform(import_id)
    # This always runs, even if the job is resumed.
    @import = Import.find(import_id)

    step :validate do
      @import.validate!
    end

    step(:process_records) do |step|
      @import.records.find_each(start: step.cursor) do |record|
        record.process
        step.advance! from: record.id
      end
    end

    step :reprocess_records
    step :finalize
  end

  def reprocess_records(step)
    @import.records.find_each(start: step.cursor) do |record|
      record.reprocess
      step.advance! from: record.id
    end
  end

  def finalize
    @import.finalize!
  end
end

Cursors

Cursors 用于跟踪步骤内的进度。cursor 可以是任何可以序列化为 ActiveJob::Base.serialize 参数的对象。它默认为 nil

当一个步骤恢复时,最后一个 cursor 值会被恢复。步骤中的代码负责使用 cursor 从正确的位置继续。

set! 将 cursor 设置为特定值。

step :iterate_items do |step|
  items[step.cursor..].each do |item|
    process(item)
    step.set! (step.cursor || 0) + 1
  end
end

可以在定义步骤时设置 cursor 的起始值

step :iterate_items, start: 0 do |step|
  items[step.cursor..].each do |item|
    process(item)
    step.set! step.cursor + 1
  end
end

可以使用 advance! 来推进 cursor。这会调用当前 cursor 值的 succ 方法。如果 cursor 没有实现 succ,它将引发 ActiveJob::Continuation::UnadvanceableCursorError

step :iterate_items, start: 0 do |step|
  items[step.cursor..].each do |item|
    process(item)
    step.advance!
  end
end

您可以选择将 from 参数传递给 advance!。这在使用 ID 可能不连续的记录集合进行迭代时很有用。

step :process_records do |step|
  import.records.find_each(start: step.cursor) do |record|
    record.process
    step.advance! from: record.id
  end
end

您可以使用数组来迭代嵌套记录

step :process_nested_records, start: [ 0, 0 ] do |step|
  Account.find_each(start: step.cursor[0]) do |account|
    account.records.find_each(start: step.cursor[1]) do |record|
      record.process
      step.set! [ account.id, record.id + 1 ]
    end
    step.set! [ account.id + 1, 0 ]
  end
end

设置或推进 cursor 会创建一个检查点。您也可以通过调用 step 上的 checkpoint! 方法来手动创建检查点。如果您想允许中断,但又不需要更新 cursor,这很有用。

step :destroy_records do |step|
  import.records.find_each do |record|
    record.destroy!
    step.checkpoint!
  end
end

Checkpoints

检查点是作业可以被中断的地方。在检查点处,作业将调用 queue_adapter.stopping?。如果返回 true,作业将引发 ActiveJob::Continuation::Interrupt 异常。

在每次作业执行的每个步骤开始之前(第一个步骤除外)都会有一个自动检查点。在一个步骤内部,在调用 set!advance!checkpoint! 时会创建一个检查点。

当队列适配器被标记为正在停止时,作业不会自动中断——它们将继续运行,直到下一个检查点,或者当进程停止时。

这是为了允许在安全点中断作业,但这也意味着作业应该比关机超时时间更频繁地进行检查点,以确保平稳重启。

中断时,作业将自动重试,其进度序列化在作业数据中,位于 continuation 键下。

序列化的进度包含

  • 已完成步骤的列表

  • 当前步骤及其 cursor 值(如果正在进行中)

Isolated Steps

除非作业被中断,否则步骤将在单个作业执行中顺序运行。

通过传递 +isolated: true+ 选项,您可以指定一个步骤始终在自己的执行中运行。

这对于长时间运行的步骤很有用,因为这些步骤可能无法在作业的宽限期内进行检查点——它确保在步骤开始之前将进度序列化回作业数据。

step :quick_step1
step :slow_step, isolated: true
step :quick_step2
step :quick_step3

Errors

如果作业引发错误但未通过 Active Job 重试,它将被传递回底层队列后端,并且此执行中的任何进度都将丢失。

为了缓解这种情况,如果作业在取得进度后引发错误,它将自动重试。取得进度被定义为已完成一个步骤或在当前步骤中推进了 cursor。

Configuration

Continuable 作业有几个配置选项

  • :max_resumptions - 作业可以恢复的最大次数。默认为 nil,表示无限次恢复。

  • :resume_options - 恢复作业时要传递给 retry_job 的选项。默认为 { wait: 5.seconds }。有关可用选项,请参阅 ActiveJob::Exceptions#retry_job

  • :resume_errors_after_advancing - 在推进 continuation 后是否恢复错误。默认为 true

命名空间
方法
A
I
S

实例公共方法

advanced?()

# File activejob/lib/active_job/continuation.rb, line 256
def advanced?
  @advanced
end

instrumentation()

# File activejob/lib/active_job/continuation.rb, line 260
def instrumentation
  { description: description,
    completed_steps: completed,
    current_step: current }
end

started?()

# File activejob/lib/active_job/continuation.rb, line 252
def started?
  completed.any? || current.present?
end