Action Cable Channel Streams¶ ↑
Streams 允许频道将广播路由给订阅者。广播,正如其他地方讨论过的,是一个发布/订阅队列,任何放入其中的数据都会自动发送给当时已连接的客户端。然而,它纯粹是一个在线队列。如果你在广播发送更新的那个时刻没有流式传输它,即使你在此之后连接,你也收不到那个更新。
最常见的情况是,流式传输的广播会直接发送给客户端的订阅者。频道只是作为双方(广播者和频道订阅者)之间的连接器。下面是一个允许订阅者获取给定页面上所有新评论的频道示例。
class CommentsChannel < ApplicationCable::Channel def follow(data) stream_from "comments_for_#{data['recording_id']}" end def unfollow stop_all_streams end end
基于上面的例子,这个频道的订阅者将立即获得放入 comments_for_45 广播中的任何数据。
该频道的一个广播示例如下所示:
ActionCable.server.broadcast "comments_for_45", { author: 'DHH', content: 'Rails is just swell' }
如果你的流与模型相关,那么使用的广播可以从模型和频道生成。下面的例子将订阅一个像 comments:Z2lkOi8vVGVzdEFwcC9Qb3N0LzE 这样的广播。
class CommentsChannel < ApplicationCable::Channel def subscribed post = Post.find(params[:id]) stream_for post end end
然后你可以通过以下方式向此频道广播:
CommentsChannel.broadcast_to(@post, @comment)
如果你不想未经过滤地将广播传递给订阅者,你也可以提供一个回调函数,让你修改发送的内容。下面的例子展示了如何利用这一点在过程中提供性能内省。
class ChatChannel < ApplicationCable::Channel def subscribed @room = Chat::Room[params[:room_number]] stream_for @room, coder: ActiveSupport::JSON do |message| if message['originated_at'].present? elapsed_time = (Time.now.to_f - message['originated_at']).round(2) ActiveSupport::Notifications.instrument :performance, measurement: 'Chat.message_delay', value: elapsed_time, action: :timing logger.info "Message took #{elapsed_time}s to arrive" end transmit message end end end
你可以通过调用 stop_all_streams 来停止从所有广播流式传输。
- S
实例公共方法
stop_all_streams() Link
从发布/订阅队列中取消订阅与此频道关联的所有流。
stop_stream_for(model) Link
取消订阅 model 的流。
stop_stream_from(broadcasting) Link
从指定的 broadcasting 取消订阅流。
stream_for(broadcastables, callback = nil, coder: nil, &block) Link
开始流式传输此频道中 broadcastables 的发布/订阅队列。可选地,您可以传递一个 callback,它将用于代替默认的直接将更新传输给订阅者。
传递 coder: ActiveSupport::JSON 以在传递给回调之前将消息解码为 JSON。默认为 coder: nil,它不进行解码,而是传递原始消息。
stream_from(broadcasting, callback = nil, coder: nil, &block) Link
开始从指定的 broadcasting 发布/订阅队列进行流式传输。可选地,您可以传递一个 callback,它将用于代替默认的直接将更新传输给订阅者。传递 coder: ActiveSupport::JSON 以在将消息传递给回调之前将其解码为 JSON。默认为 coder: nil,它不进行解码,而是传递原始消息。
# File actioncable/lib/action_cable/channel/streams.rb, line 90 def stream_from(broadcasting, callback = nil, coder: nil, &block) return if unsubscribed? broadcasting = String(broadcasting) # Don't send the confirmation until pubsub#subscribe is successful defer_subscription_confirmation! # Build a stream handler by wrapping the user-provided callback with a decoder # or defaulting to a JSON-decoding retransmitter. handler = worker_pool_stream_handler(broadcasting, callback || block, coder: coder) streams[broadcasting] = handler connection.server.event_loop.post do pubsub.subscribe(broadcasting, handler, lambda do ensure_confirmation_sent logger.info "#{self.class.name} is streaming from #{broadcasting}" end) end end
stream_or_reject_for(model) Link
如果 model 存在,则调用 stream_for 来开始流式传输,否则拒绝订阅。