Method: ActionCable::Channel::Streams#stream_from

Defined in:
actioncable/lib/action_cable/channel/streams.rb

#stream_from(broadcasting, callback = nil, coder: nil, &block) ⇒ Object

Start streaming from the named ‘broadcasting` pubsub queue. Optionally, you can pass a `callback` that’ll be used instead of the default of just transmitting the updates straight to the subscriber. Pass ‘coder: ActiveSupport::JSON` to decode messages as JSON before passing to the callback. Defaults to `coder: nil` which does no decoding, passes raw messages.



90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
# File 'actioncable/lib/action_cable/channel/streams.rb', line 90

def stream_from(broadcasting, callback = nil, coder: nil, &block)
  broadcasting = String(broadcasting)

  # Don't send the confirmation until pubsub#subscribe is successful
  defer_subscription_confirmation!

  # Build a stream handler by wrapping the user-provided callback with a decoder
  # or defaulting to a JSON-decoding retransmitter.
  handler = worker_pool_stream_handler(broadcasting, callback || block, coder: coder)
  streams[broadcasting] = handler

  connection.server.event_loop.post do
    pubsub.subscribe(broadcasting, handler, lambda do
      ensure_confirmation_sent
      logger.info "#{self.class.name} is streaming from #{broadcasting}"
    end)
  end
end