Class: SignalFlowWebsocketTransport
- Inherits:
-
Object
- Object
- SignalFlowWebsocketTransport
- Defined in:
- lib/signalfx/signalflow/websocket.rb
Overview
A WebSocket transport for SignalFlow. This should not be used directly by end-users.
Constant Summary collapse
- DETACHED =
"DETACHED"
- COMPUTATION_START_TIMEOUT_SECONDS =
A lower bound on the amount of time to wait for a computation to start
30
Instance Method Summary collapse
-
#attach(handle, filters: nil, resolution: nil) ⇒ Object
This doesn’t actually work on the backend yet.
- #close ⇒ Object
- #detach(channel, reason = nil) ⇒ Object
- #execute(program, start: nil, stop: nil, resolution: nil, max_delay: nil, persistent: nil, immediate: false) ⇒ Object
-
#initialize(api_token, stream_endpoint) ⇒ SignalFlowWebsocketTransport
constructor
A new instance of SignalFlowWebsocketTransport.
- #on_close(msg) ⇒ Object
- #on_message(m) ⇒ Object
- #on_open ⇒ Object
- #preflight(program, start, stop, resolution: nil, max_delay: nil) ⇒ Object
- #start(program, start: nil, stop: nil, resolution: nil, max_delay: nil) ⇒ Object
-
#start_job {|channel.name| ... } ⇒ Object
Starts a job (either execute or preflight) and waits until the JOB_START message is received with the computation handle arrives so that we can create a properly initialized computation object.
- #stop(handle, reason) ⇒ Object
Constructor Details
#initialize(api_token, stream_endpoint) ⇒ SignalFlowWebsocketTransport
Returns a new instance of SignalFlowWebsocketTransport.
20 21 22 23 24 25 26 27 28 |
# File 'lib/signalfx/signalflow/websocket.rb', line 20 def initialize(api_token, stream_endpoint) @api_token = api_token @stream_endpoint = stream_endpoint @compress = true @lock = Mutex.new @close_reason = nil reinit end |
Instance Method Details
#attach(handle, filters: nil, resolution: nil) ⇒ Object
This doesn’t actually work on the backend yet
140 141 142 143 144 145 146 147 148 149 150 151 152 153 |
# File 'lib/signalfx/signalflow/websocket.rb', line 140 def attach(handle, filters: nil, resolution: nil) channel = make_new_channel send_msg({ :type => "attach", :channel => channel.name, :handle => handle, :filters => filters, :resolution => resolution, :compress => @compress, }.reject!{|k,v| v.nil?}.to_json) channel end |
#close ⇒ Object
170 171 172 173 174 |
# File 'lib/signalfx/signalflow/websocket.rb', line 170 def close if @ws @ws.close end end |
#detach(channel, reason = nil) ⇒ Object
155 156 157 158 159 160 161 162 163 164 165 166 167 168 |
# File 'lib/signalfx/signalflow/websocket.rb', line 155 def detach(channel, reason=nil) send_msg({ :type => "detach", :channel => channel, :reason => reason, }.to_json) # There is no response message from the server signifying detach complete # and there could be messages coming in even after the detach request is # sent. Therefore, use a sentinal value in place of the callback block so # that the message receiver logic can distinguish this case from some # anomolous case (say, due to bad logic in the code). @chan_callbacks[channel] = DETACHED end |
#execute(program, start: nil, stop: nil, resolution: nil, max_delay: nil, persistent: nil, immediate: false) ⇒ Object
85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 |
# File 'lib/signalfx/signalflow/websocket.rb', line 85 def execute(program, start: nil, stop: nil, resolution: nil, max_delay: nil, persistent: nil, immediate: false) start_job do |channel_name| send_msg({ :type => "execute", :channel => channel_name, :program => program, :start => start, :stop => stop, :resolution => resolution, :max_delay => max_delay, :persistent => persistent, :immediate => immediate, :compress => @compress, }.reject!{|k,v| v.nil?}.to_json) end end |
#on_close(msg) ⇒ Object
201 202 203 204 205 206 207 208 |
# File 'lib/signalfx/signalflow/websocket.rb', line 201 def on_close(msg) @close_reason = "(#{msg.code}, #{msg.data})" @chan_callbacks.keys.each do |channel_name| invoke_callback_for_channel({ :event => "CONNECTION_CLOSED" }, channel_name) end reinit end |
#on_message(m) ⇒ Object
210 211 212 213 214 215 216 217 218 219 220 221 222 |
# File 'lib/signalfx/signalflow/websocket.rb', line 210 def (m) begin return if m.type == :ping if m.type == :close on_close(m) return end (m.data, m.type == :text) rescue Exception => e puts "Error processing SignalFlow message: #{e.backtrace.first}: #{e.} (#{e.class})" end end |
#on_open ⇒ Object
224 225 226 227 228 229 |
# File 'lib/signalfx/signalflow/websocket.rb', line 224 def on_open @ws.send({ :type => "authenticate", :token => @api_token, }.to_json) end |
#preflight(program, start, stop, resolution: nil, max_delay: nil) ⇒ Object
102 103 104 105 106 107 108 109 110 111 112 113 114 115 |
# File 'lib/signalfx/signalflow/websocket.rb', line 102 def preflight(program, start, stop, resolution: nil, max_delay: nil) start_job do |channel_name| send_msg({ :type => "preflight", :channel => channel_name, :program => program, :start => start, :stop => stop, :resolution => resolution, :max_delay => max_delay, :compress => @compress, }.reject!{|k,v| v.nil?}.to_json) end end |
#start(program, start: nil, stop: nil, resolution: nil, max_delay: nil) ⇒ Object
117 118 119 120 121 122 123 124 125 126 127 128 129 |
# File 'lib/signalfx/signalflow/websocket.rb', line 117 def start(program, start: nil, stop: nil, resolution: nil, max_delay: nil) start_job do |channel_name| send_msg({ :type => "start", :channel => channel_name, :program => program, :start => start, :stop => stop, :resolution => resolution, :max_delay => max_delay, }.reject!{|k,v| v.nil?}.to_json) end end |
#start_job {|channel.name| ... } ⇒ Object
Starts a job (either execute or preflight) and waits until the JOB_START message is received with the computation handle arrives so that we can create a properly initialized computation object. Yields to the given block which should send the WS message to start the job.
51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 |
# File 'lib/signalfx/signalflow/websocket.rb', line 51 def start_job computation = nil channel = make_new_channel yield channel.name while true begin msg = channel.pop(COMPUTATION_START_TIMEOUT_SECONDS) rescue ChannelTimeout raise "Computation did not start after at least #{COMPUTATION_START_TIMEOUT_SECONDS} seconds" end if msg[:type] == "error" raise ComputationFailure.new(msg[:message]) end # STREAM_START comes before this but contains no useful information if msg[:event] == "JOB_START" computation = Computation.new(msg[:handle], method(:attach), method(:stop)) computation.channel = channel elsif msg[:type] == "computation-started" computation = Computation.new(msg[:computationId], method(:attach), method(:stop)) # Start jobs only use the channel to get error messages and can # detach from the channel once the job has started. channel.detach else next end return computation end end |
#stop(handle, reason) ⇒ Object
131 132 133 134 135 136 137 |
# File 'lib/signalfx/signalflow/websocket.rb', line 131 def stop(handle, reason) send_msg({ :type => "stop", :handle => handle, :reason => reason, }.reject!{|k,v| v.nil?}.to_json) end |