跳至内容 跳至搜索

Active Job Continuation Step

表示一个可继续执行的任务中的一个步骤。

当一个步骤完成时,它会被记录在任务的 continuation 状态中。如果任务被中断,它将从最后一个完成的步骤之后恢复。

步骤还带有一个可选的 cursor,可用于跟踪步骤内的进度。如果任务在某个步骤中被中断,cursor 将会被保存并在任务恢复时传回。

由步骤中的代码负责正确使用 cursor 来从中断处继续执行。

方法
A
C
D
N
R
S
T

Attributes

[R] cursor

步骤的 cursor。

[R] name

步骤的名称。

类公共方法

new(name, cursor, job:, resumed:)

# File activejob/lib/active_job/continuation/step.rb, line 25
def initialize(name, cursor, job:, resumed:)
  @name = name.to_sym
  @initial_cursor = cursor
  @cursor = cursor
  @resumed = resumed
  @job = job
end

实例公共方法

advance!(from: nil)

将 cursor 从当前值或提供的值向前推进

cursor 将通过调用 cursor 的 `succ` 方法来推进。如果 cursor 没有实现 `succ`,则会抛出 UnadvanceableCursorError 错误。

# File activejob/lib/active_job/continuation/step.rb, line 49
def advance!(from: nil)
  from = cursor if from.nil?

  begin
    to = from.succ
  rescue NoMethodError
    raise UnadvanceableCursorError, "Cursor class '#{from.class}' does not implement 'succ'"
  end

  set! to
end

advanced?()

在本次任务执行期间,cursor 是否被推进过?

# File activejob/lib/active_job/continuation/step.rb, line 67
def advanced?
  initial_cursor != cursor
end

checkpoint!()

检查任务是否应该被中断,如果应该,则抛出 Interrupt 异常。任务将被重新排队等待重试。

# File activejob/lib/active_job/continuation/step.rb, line 35
def checkpoint!
  job.checkpoint!
end

description()

# File activejob/lib/active_job/continuation/step.rb, line 75
def description
  "at '#{name}', cursor '#{cursor.inspect}'"
end

resumed?()

此步骤是否从之前的任务执行中恢复?

# File activejob/lib/active_job/continuation/step.rb, line 62
def resumed?
  @resumed
end

set!(cursor)

设置 cursor 并根据需要中断任务。

# File activejob/lib/active_job/continuation/step.rb, line 40
def set!(cursor)
  @cursor = cursor
  checkpoint!
end

to_a()

# File activejob/lib/active_job/continuation/step.rb, line 71
def to_a
  [ name.to_s, cursor ]
end