Class: Channel

Inherits:
Object
  • Object
show all
Defined in:
lib/signalfx/signalflow/channel.rb

Overview

Channel represents a medium through which SignalFlow messages pass. The main method for it is #each_message, which is how you get messages from the channel. There can only be one user of a channel and they are NOT thread-safe.

Channels are for one-time use only. Once a channel is detached from (either manually or due to the end of a computation) previous messages will be iterable but nothing new will show up.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(name, detach_cb) ⇒ Channel

Returns a new instance of Channel.



20
21
22
23
24
25
26
27
# File 'lib/signalfx/signalflow/channel.rb', line 20

def initialize(name, detach_cb)
  @lock = Mutex.new
  @detach_lock = Mutex.new
  @detached = false
  @name = name
  @detach_from_transport = detach_cb
  @messages = QueueWithTimeout.new
end

Instance Attribute Details

#detachedObject

Returns the value of attribute detached.



18
19
20
# File 'lib/signalfx/signalflow/channel.rb', line 18

def detached
  @detached
end

#nameObject

Returns the value of attribute name.



17
18
19
# File 'lib/signalfx/signalflow/channel.rb', line 17

def name
  @name
end

Instance Method Details

#detach(send_detach_to_server = true) ⇒ Object



62
63
64
65
66
67
68
# File 'lib/signalfx/signalflow/channel.rb', line 62

def detach(send_detach_to_server=true)
  if !@detached
    @detached = true
    @detach_from_transport.call if send_detach_to_server
    @detach_from_transport = nil
  end
end

#inject_message(msg) ⇒ Object



70
71
72
73
74
75
76
77
78
# File 'lib/signalfx/signalflow/channel.rb', line 70

def inject_message(msg)
  # Since messages are injected by a separate websocket thread, they could
  # come in after the user has detached manually from the channel.  Just
  # silently ignore them in that case.
  return if @detached
  raise 'Cannot inject nil message' if msg.nil?

  @messages << msg
end

#pop(timeout_seconds = nil) ⇒ Hash

Waits for and returns the next message in the channel.

of ‘nil` indicates that the channel has detected it is done and will not be receiving any more useful messages.

Parameters:

  • timeout_seconds (Float) (defaults to: nil)

    Number of seconds to wait for a message.

Returns:

  • (Hash)

    The next message received by this channel. A return value

Raises:



38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
# File 'lib/signalfx/signalflow/channel.rb', line 38

def pop(timeout_seconds=nil)
  raise "Channel #{@name} is detached" if @detached

  msg = nil
  begin
    msg = @messages.pop_with_timeout(timeout_seconds)
  rescue ThreadError
    raise ChannelTimeout.new(
      "Did not receive a message on channel #{@name} within #{timeout_seconds} seconds")
  end

  if msg[:event] == "END_OF_CHANNEL" || msg[:event] == "CONNECTION_CLOSED" || msg[:event] == "CHANNEL_ABORT"
    # Mark this channel as detached and then return nil as an indicator that
    # this channel is done
    detach(false)

    nil
  else
    msg
  end

end