Method: ActionCable::Channel::Streams#stream_from
- Defined in:
- actioncable/lib/action_cable/channel/streams.rb
#stream_from(broadcasting, callback = nil, coder: nil, &block) ⇒ Object
Start streaming from the named ‘broadcasting` pubsub queue. Optionally, you can pass a `callback` that’ll be used instead of the default of just transmitting the updates straight to the subscriber. Pass ‘coder: ActiveSupport::JSON` to decode messages as JSON before passing to the callback. Defaults to `coder: nil` which does no decoding, passes raw messages.
90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 |
# File 'actioncable/lib/action_cable/channel/streams.rb', line 90 def stream_from(broadcasting, callback = nil, coder: nil, &block) broadcasting = String(broadcasting) # Don't send the confirmation until pubsub#subscribe is successful defer_subscription_confirmation! # Build a stream handler by wrapping the user-provided callback with a decoder # or defaulting to a JSON-decoding retransmitter. handler = worker_pool_stream_handler(broadcasting, callback || block, coder: coder) streams[broadcasting] = handler connection.server.event_loop.post do pubsub.subscribe(broadcasting, handler, lambda do ensure_confirmation_sent logger.info "#{self.class.name} is streaming from #{broadcasting}" end) end end |