Class: Channel
- Inherits:
-
Object
- Object
- Channel
- Defined in:
- lib/signalfx/signalflow/channel.rb
Overview
Channel represents a medium through which SignalFlow messages pass. The main method for it is #each_message, which is how you get messages from the channel. There can only be one user of a channel and they are NOT thread-safe.
Channels are for one-time use only. Once a channel is detached from (either manually or due to the end of a computation) previous messages will be iterable but nothing new will show up.
Instance Attribute Summary collapse
-
#detached ⇒ Object
Returns the value of attribute detached.
-
#name ⇒ Object
Returns the value of attribute name.
Instance Method Summary collapse
- #detach(send_detach_to_server = true) ⇒ Object
-
#initialize(name, detach_cb) ⇒ Channel
constructor
A new instance of Channel.
- #inject_message(msg) ⇒ Object
-
#pop(timeout_seconds = nil) ⇒ Hash
Waits for and returns the next message in the channel.
Constructor Details
#initialize(name, detach_cb) ⇒ Channel
Returns a new instance of Channel.
20 21 22 23 24 25 26 27 |
# File 'lib/signalfx/signalflow/channel.rb', line 20 def initialize(name, detach_cb) @lock = Mutex.new @detach_lock = Mutex.new @detached = false @name = name @detach_from_transport = detach_cb @messages = QueueWithTimeout.new end |
Instance Attribute Details
#detached ⇒ Object
Returns the value of attribute detached.
18 19 20 |
# File 'lib/signalfx/signalflow/channel.rb', line 18 def detached @detached end |
#name ⇒ Object
Returns the value of attribute name.
17 18 19 |
# File 'lib/signalfx/signalflow/channel.rb', line 17 def name @name end |
Instance Method Details
#detach(send_detach_to_server = true) ⇒ Object
62 63 64 65 66 67 68 |
# File 'lib/signalfx/signalflow/channel.rb', line 62 def detach(send_detach_to_server=true) if !@detached @detached = true @detach_from_transport.call if send_detach_to_server @detach_from_transport = nil end end |
#inject_message(msg) ⇒ Object
70 71 72 73 74 75 76 77 78 |
# File 'lib/signalfx/signalflow/channel.rb', line 70 def (msg) # Since messages are injected by a separate websocket thread, they could # come in after the user has detached manually from the channel. Just # silently ignore them in that case. return if @detached raise 'Cannot inject nil message' if msg.nil? @messages << msg end |
#pop(timeout_seconds = nil) ⇒ Hash
Waits for and returns the next message in the channel.
of ‘nil` indicates that the channel has detected it is done and will not be receiving any more useful messages.
38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 |
# File 'lib/signalfx/signalflow/channel.rb', line 38 def pop(timeout_seconds=nil) raise "Channel #{@name} is detached" if @detached msg = nil begin msg = @messages.pop_with_timeout(timeout_seconds) rescue ThreadError raise ChannelTimeout.new( "Did not receive a message on channel #{@name} within #{timeout_seconds} seconds") end if msg[:event] == "END_OF_CHANNEL" || msg[:event] == "CONNECTION_CLOSED" || msg[:event] == "CHANNEL_ABORT" # Mark this channel as detached and then return nil as an indicator that # this channel is done detach(false) nil else msg end end |