跳至内容 跳至搜索

Active Support 事件报告器

ActiveSupport::EventReporter 提供了一个接口,用于向订阅者报告结构化事件。

要报告一个事件,可以使用 notify 方法

Rails.event.notify("user_created", { id: 123 })
# Emits event:
#  {
#    name: "user_created",
#    payload: { id: 123 },
#    timestamp: 1738964843208679035,
#    source_location: { filepath: "path/to/file.rb", lineno: 123, label: "UserService#create" }
#  }

notify API 可以接收事件名称和负载哈希,或者一个事件对象。名称会被强制转换为字符串。

事件对象

如果将事件对象传递给 notify API,它将按原样传递给订阅者,并且对象的类名将用作事件名称。

class UserCreatedEvent
  def initialize(id:, name:)
    @id = id
    @name = name
  end

  def serialize
    {
      id: @id,
      name: @name
    }
  end
end

Rails.event.notify(UserCreatedEvent.new(id: 123, name: "John Doe"))
# Emits event:
#  {
#    name: "UserCreatedEvent",
#    payload: #<UserCreatedEvent:0x111>,
#    timestamp: 1738964843208679035,
#    source_location: { filepath: "path/to/file.rb", lineno: 123, label: "UserService#create" }
#  }

事件是任何代表模式化事件的 Ruby 对象。虽然负载哈希允许任意的、隐式结构的​​数据,但事件对象旨在强制执行特定的模式。

订阅者负责序列化事件对象。

订阅者

订阅者必须实现 emit 方法,该方法将使用事件哈希进行调用。

事件哈希具有以下键

name: String (The name of the event)
payload: Hash, Object (The payload of the event, or the event object itself)
tags: Hash (The tags of the event)
context: Hash (The context of the event)
timestamp: Float (The timestamp of the event, in nanoseconds)
source_location: Hash (The source location of the event, containing the filepath, lineno, and label)

订阅者负责在将事件发送到目标目的地(例如流媒体平台、日志设备或警报服务)之前,将事件编码为所需的格式。

class JSONEventSubscriber
  def emit(event)
    json_data = JSON.generate(event)
    LogExporter.export(json_data)
  end
end

class LogSubscriber
  def emit(event)
    payload = event[:payload].map { |key, value| "#{key}=#{value}" }.join(" ")
    source_location = event[:source_location]
    log = "[#{event[:name]}] #{payload} at #{source_location[:filepath]}:#{source_location[:lineno]}"
    Rails.logger.info(log)
  end
end

请注意,事件对象将按原样传递给订阅者,并且可能需要在编码之前进行序列化

class UserCreatedEvent
  def initialize(id:, name:)
    @id = id
    @name = name
  end

  def serialize
    {
      id: @id,
      name: @name
    }
  end
end

class LogSubscriber
  def emit(event)
    payload = event[:payload]
    json_data = JSON.generate(payload.serialize)
    LogExporter.export(json_data)
  end
end

过滤订阅

可以通过可选的过滤器过程配置订阅者,以便只接收一部分事件

# Only receive events with names starting with "user."
Rails.event.subscribe(user_subscriber) { |event| event[:name].start_with?("user.") }

# Only receive events with specific payload types
Rails.event.subscribe(audit_subscriber) { |event| event[:payload].is_a?(AuditEvent) }

调试事件

您可以使用 debug 方法报告一个事件,该事件仅在事件报告器处于调试模式时才会报告

Rails.event.debug("my_debug_event", { foo: "bar" })

标签

要为事件添加额外的上下文,与事件负载分开,可以通过 tagged 方法添加标签

Rails.event.tagged("graphql") do
  Rails.event.notify("user_created", { id: 123 })
end

# Emits event:
#  {
#    name: "user_created",
#    payload: { id: 123 },
#    tags: { graphql: true },
#    context: {},
#    timestamp: 1738964843208679035,
#    source_location: { filepath: "path/to/file.rb", lineno: 123, label: "UserService#create" }
#  }

上下文存储

您可能希望将元数据附加到报告器发出的每个事件。虽然标签为一系列事件提供了特定域的上下文,但上下文作用域限定为作业/请求,并应用于与执行上下文相关的元数据。可以通过 set_context 方法设置上下文

Rails.event.set_context(request_id: "abcd123", user_agent: "TestAgent")
Rails.event.notify("user_created", { id: 123 })

# Emits event:
#  {
#    name: "user_created",
#    payload: { id: 123 },
#    tags: {},
#    context: { request_id: "abcd123", user_agent: TestAgent" },
#    timestamp: 1738964843208679035,
#    source_location: { filepath: "path/to/file.rb", lineno: 123, label: "UserService#create" }
#  }

每次请求之前和之后都会自动重置上下文。

可以通过 config.active_support.event_reporter_context_store 配置自定义上下文存储。

# config/application.rb
config.active_support.event_reporter_context_store = CustomContextStore

class CustomContextStore
  class << self
    def context
      # Return the context.
    end

    def set_context(context_hash)
      # Append context_hash to the existing context store.
    end

    def clear
      # Delete the stored context.
    end
  end
end

事件报告器对所有负载数据、标签和上下文存储条目使用符号键进行标准化。 String 键会自动转换为符号以保持一致性。

Rails.event.notify("user.created", { "id" => 123 })
# Emits event:
#  {
#    name: "user.created",
#    payload: { id: 123 },
#  }

安全

报告事件时,基于哈希的负载会自动过滤以根据 Rails.application.filter_parameters 删除敏感数据。

如果提供的是 事件对象,订阅者将需要自己过滤敏感数据,例如使用 ActiveSupport::ParameterFilter

方法
C
D
N
S
T
U
W

Attributes

[W] raise_on_error

设置在订阅者在事件发出过程中引发错误时,或在向 notify 传递意外参数时,是否引发错误。

[R] subscribers

类公共方法

new(*subscribers, raise_on_error: false)

# File activesupport/lib/active_support/event_reporter.rb, line 286
def initialize(*subscribers, raise_on_error: false)
  @subscribers = []
  subscribers.each { |subscriber| subscribe(subscriber) }
  @debug_mode = false
  @raise_on_error = raise_on_error
end

实例公共方法

clear_context()

清除所有上下文数据。

# File activesupport/lib/active_support/event_reporter.rb, line 525
def clear_context
  context_store.clear
end

context()

返回当前的上下文数据。

# File activesupport/lib/active_support/event_reporter.rb, line 530
def context
  context_store.context
end

debug(name_or_object, payload = nil, caller_depth: 1, **kwargs)

仅在调试模式下报告事件。例如

Rails.event.debug("sql.query", { sql: "SELECT * FROM users" })

参数

  • :payload - 使用字符串/符号事件名称时的事件负载。

  • :caller_depth - 用于源位置的堆栈深度(默认为 1)。

  • :kwargs - 使用字符串/符号事件名称时的附加负载数据。

# File activesupport/lib/active_support/event_reporter.rb, line 435
def debug(name_or_object, payload = nil, caller_depth: 1, **kwargs)
  if debug_mode?
    if block_given?
      notify(name_or_object, payload, caller_depth: caller_depth + 1, **kwargs.merge(yield))
    else
      notify(name_or_object, payload, caller_depth: caller_depth + 1, **kwargs)
    end
  end
end

debug_mode?()

检查调试模式当前是否已启用。调试模式通过 with_debug 在报告器上启用,并且在本地环境中也启用。

# File activesupport/lib/active_support/event_reporter.rb, line 420
def debug_mode?
  @debug_mode || Fiber[:event_reporter_debug_mode]
end

notify(name_or_object, payload = nil, caller_depth: 1, **kwargs)

向所有注册的订阅者报告事件。可以提供事件名称和负载

Rails.event.notify("user.created", { id: 123 })
# Emits event:
#  {
#    name: "user.created",
#    payload: { id: 123 },
#    tags: {},
#    context: {},
#    timestamp: 1738964843208679035,
#    source_location: { filepath: "path/to/file.rb", lineno: 123, label: "UserService#create" }
#  }

或者,可以提供一个事件对象

Rails.event.notify(UserCreatedEvent.new(id: 123))
# Emits event:
#  {
#    name: "UserCreatedEvent",
#    payload: #<UserCreatedEvent:0x111>,
#    tags: {},
#    context: {},
#    timestamp: 1738964843208679035,
#    source_location: { filepath: "path/to/file.rb", lineno: 123, label: "UserService#create" }
#  }

参数

  • :payload - 使用字符串/符号事件名称时的事件负载。

  • :caller_depth - 用于源位置的堆栈深度(默认为 1)。

  • :kwargs - 使用字符串/符号事件名称时的附加负载数据。

# File activesupport/lib/active_support/event_reporter.rb, line 363
def notify(name_or_object, payload = nil, caller_depth: 1, **kwargs)
  name = resolve_name(name_or_object)
  payload = resolve_payload(name_or_object, payload, **kwargs)

  event = {
    name: name,
    payload: payload,
    tags: TagStack.tags,
    context: context_store.context,
    timestamp: Process.clock_gettime(Process::CLOCK_REALTIME, :nanosecond),
  }

  caller_location = caller_locations(caller_depth, 1)&.first

  if caller_location
    source_location = {
      filepath: caller_location.path,
      lineno: caller_location.lineno,
      label: caller_location.label,
    }
    event[:source_location] = source_location
  end

  @subscribers.each do |subscriber_entry|
    subscriber = subscriber_entry[:subscriber]
    filter = subscriber_entry[:filter]

    next if filter && !filter.call(event)

    subscriber.emit(event)
  rescue => subscriber_error
    if raise_on_error?
      raise
    else
      ActiveSupport.error_reporter.report(subscriber_error, handled: true)
    end
  end

  nil
end

set_context(context)

设置上下文数据,这些数据将包含在报告器发出的所有事件中。上下文数据应作用域限定到作业或请求,并在每次请求和作业之前和之后自动重置。

Rails.event.set_context(user_agent: "TestAgent")
Rails.event.set_context(job_id: "abc123")
Rails.event.tagged("graphql") do
  Rails.event.notify("user_created", { id: 123 })
end

# Emits event:
#  {
#    name: "user_created",
#    payload: { id: 123 },
#    tags: { graphql: true },
#    context: { user_agent: "TestAgent", job_id: "abc123" },
#    timestamp: 1738964843208679035
#    source_location: { filepath: "path/to/file.rb", lineno: 123, label: "UserService#create" }
#  }
# File activesupport/lib/active_support/event_reporter.rb, line 520
def set_context(context)
  context_store.set_context(context)
end

subscribe(subscriber, &filter)

注册一个新的事件订阅者。订阅者必须响应

emit(event: Hash)

事件哈希将具有以下键

name: String (The name of the event)
payload: Hash, Object (The payload of the event, or the event object itself)
tags: Hash (The tags of the event)
context: Hash (The context of the event)
timestamp: Float (The timestamp of the event, in nanoseconds)
source_location: Hash (The source location of the event, containing the filepath, lineno, and label)

可以提供一个可选的过滤器过程,以便只接收一部分事件

Rails.event.subscribe(subscriber) { |event| event[:name].start_with?("user.") }
Rails.event.subscribe(subscriber) { |event| event[:payload].is_a?(UserEvent) }
# File activesupport/lib/active_support/event_reporter.rb, line 311
def subscribe(subscriber, &filter)
  unless subscriber.respond_to?(:emit)
    raise ArgumentError, "Event subscriber #{subscriber.class.name} must respond to #emit"
  end
  @subscribers << { subscriber: subscriber, filter: filter }
end

tagged(*args, **kwargs, &block)

为事件添加标签以提供额外上下文。标签以堆栈为导向,因此在块内发出的所有事件都继承相同的标签集。例如

Rails.event.tagged("graphql") do
  Rails.event.notify("user.created", { id: 123 })
end

# Emits event:
# {
#    name: "user.created",
#    payload: { id: 123 },
#    tags: { graphql: true },
#    context: {},
#    timestamp: 1738964843208679035,
#    source_location: { filepath: "path/to/file.rb", lineno: 123, label: "UserService#create" }
#  }

标签可以作为参数或关键字参数提供,并且可以嵌套

Rails.event.tagged("graphql") do
# Other code here...
  Rails.event.tagged(section: "admin") do
    Rails.event.notify("user.created", { id: 123 })
  end
end

# Emits event:
#  {
#    name: "user.created",
#    payload: { id: 123 },
#    tags: { section: "admin", graphql: true },
#    context: {},
#    timestamp: 1738964843208679035,
#    source_location: { filepath: "path/to/file.rb", lineno: 123, label: "UserService#create" }
#  }

tagged API 也可以接收标签对象

graphql_tag = GraphqlTag.new(operation_name: "user_created", operation_type: "mutation")
Rails.event.tagged(graphql_tag) do
  Rails.event.notify("user.created", { id: 123 })
end

# Emits event:
#  {
#    name: "user.created",
#    payload: { id: 123 },
#    tags: { "GraphqlTag": #<GraphqlTag:0x111> },
#    context: {},
#    timestamp: 1738964843208679035,
#    source_location: { filepath: "path/to/file.rb", lineno: 123, label: "UserService#create" }
#  }
# File activesupport/lib/active_support/event_reporter.rb, line 497
def tagged(*args, **kwargs, &block)
  TagStack.with_tags(*args, **kwargs, &block)
end

unsubscribe(subscriber)

取消注册事件订阅者。接受订阅者或类。

subscriber = MyEventSubscriber.new
Rails.event.subscribe(subscriber)

Rails.event.unsubscribe(subscriber)
# or
Rails.event.unsubscribe(MyEventSubscriber)
# File activesupport/lib/active_support/event_reporter.rb, line 326
def unsubscribe(subscriber)
  @subscribers.delete_if { |s| subscriber === s[:subscriber] }
end

with_debug()

暂时启用调试模式,持续执行块。只有在启用调试模式时,才会报告对 debug 的调用。

Rails.event.with_debug do
  Rails.event.debug("sql.query", { sql: "SELECT * FROM users" })
end
# File activesupport/lib/active_support/event_reporter.rb, line 410
def with_debug
  prior = Fiber[:event_reporter_debug_mode]
  Fiber[:event_reporter_debug_mode] = true
  yield
ensure
  Fiber[:event_reporter_debug_mode] = prior
end