Class: SignalFlowWebsocketTransport
- Inherits:
-
Object
- Object
- SignalFlowWebsocketTransport
- Defined in:
- lib/signalfx/signalflow/websocket.rb
Overview
A WebSocket transport for SignalFlow. This should not be used directly by end-users.
Constant Summary collapse
- DETACHED =
"DETACHED"
- COMPUTATION_START_TIMEOUT_SECONDS =
A lower bound on the amount of time to wait for a computation to start
30
Instance Method Summary collapse
-
#attach(handle, filters: nil, resolution: nil) ⇒ Object
This doesn’t actually work on the backend yet.
- #close ⇒ Object
- #detach(channel, reason = nil) ⇒ Object
- #execute(program, start: nil, stop: nil, resolution: nil, max_delay: nil, persistent: nil, immediate: false) ⇒ Object
-
#initialize(api_token, stream_endpoint, proxy_url: nil, logger: Logger.new(STDOUT, progname: "signalfx"), debug: false) ⇒ SignalFlowWebsocketTransport
constructor
A new instance of SignalFlowWebsocketTransport.
- #on_close(msg) ⇒ Object
- #on_error(e) ⇒ Object
- #on_message(m) ⇒ Object
- #on_open ⇒ Object
- #preflight(program, start, stop, resolution: nil, max_delay: nil) ⇒ Object
- #start(program, start: nil, stop: nil, resolution: nil, max_delay: nil) ⇒ Object
-
#start_job {|channel.name| ... } ⇒ Object
Starts a job (either execute or preflight) and waits until the JOB_START message is received with the computation handle arrives so that we can create a properly initialized computation object.
- #stop(handle, reason) ⇒ Object
Constructor Details
#initialize(api_token, stream_endpoint, proxy_url: nil, logger: Logger.new(STDOUT, progname: "signalfx"), debug: false) ⇒ SignalFlowWebsocketTransport
Returns a new instance of SignalFlowWebsocketTransport.
26 27 28 29 30 31 32 33 34 35 36 37 38 |
# File 'lib/signalfx/signalflow/websocket.rb', line 26 def initialize(api_token, stream_endpoint, proxy_url: nil, logger: Logger.new(STDOUT, progname: "signalfx"), debug: false) @api_token = api_token @stream_endpoint = stream_endpoint @logger = logger @compress = true @proxy_url = proxy_url @debug = debug @lock = Mutex.new @close_reason = nil @last_error = nil reinit end |
Instance Method Details
#attach(handle, filters: nil, resolution: nil) ⇒ Object
This doesn’t actually work on the backend yet
150 151 152 153 154 155 156 157 158 159 160 161 162 163 |
# File 'lib/signalfx/signalflow/websocket.rb', line 150 def attach(handle, filters: nil, resolution: nil) channel = make_new_channel send_msg({ :type => "attach", :channel => channel.name, :handle => handle, :filters => filters, :resolution => resolution, :compress => @compress, }.reject!{|k,v| v.nil?}.to_json) channel end |
#close ⇒ Object
180 181 182 183 184 |
# File 'lib/signalfx/signalflow/websocket.rb', line 180 def close if @ws @ws.close end end |
#detach(channel, reason = nil) ⇒ Object
165 166 167 168 169 170 171 172 173 174 175 176 177 178 |
# File 'lib/signalfx/signalflow/websocket.rb', line 165 def detach(channel, reason=nil) send_msg({ :type => "detach", :channel => channel, :reason => reason, }.to_json) # There is no response message from the server signifying detach complete # and there could be messages coming in even after the detach request is # sent. Therefore, use a sentinal value in place of the callback block so # that the message receiver logic can distinguish this case from some # anomolous case (say, due to bad logic in the code). @chan_callbacks[channel] = DETACHED end |
#execute(program, start: nil, stop: nil, resolution: nil, max_delay: nil, persistent: nil, immediate: false) ⇒ Object
95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 |
# File 'lib/signalfx/signalflow/websocket.rb', line 95 def execute(program, start: nil, stop: nil, resolution: nil, max_delay: nil, persistent: nil, immediate: false) start_job do |channel_name| send_msg({ :type => "execute", :channel => channel_name, :program => program, :start => start, :stop => stop, :resolution => resolution, :max_delay => max_delay, :persistent => persistent, :immediate => immediate, :compress => @compress, }.reject!{|k,v| v.nil?}.to_json) end end |
#on_close(msg) ⇒ Object
214 215 216 217 218 219 220 221 222 223 224 225 |
# File 'lib/signalfx/signalflow/websocket.rb', line 214 def on_close(msg) if @debug @logger.info("Websocket on_close: #{msg}") end @close_reason = "(#{msg.code}, #{msg.reason})" @chan_callbacks.keys.each do |channel_name| invoke_callback_for_channel({ :event => "CONNECTION_CLOSED" }, channel_name) end reinit end |
#on_error(e) ⇒ Object
227 228 229 230 |
# File 'lib/signalfx/signalflow/websocket.rb', line 227 def on_error(e) @logger.error("ERROR #{e.inspect}") @last_error = e end |
#on_message(m) ⇒ Object
233 234 235 236 237 238 239 240 241 242 243 244 245 |
# File 'lib/signalfx/signalflow/websocket.rb', line 233 def (m) if @debug @logger.info("Websocket on_message: #{m}") end is_text = m.data.kind_of?(String) begin (m.data, is_text) rescue Exception => e @logger.error("Error processing SignalFlow message: #{e.backtrace.first}: #{e.} (#{e.class})") end end |
#on_open ⇒ Object
247 248 249 250 251 252 253 254 255 256 |
# File 'lib/signalfx/signalflow/websocket.rb', line 247 def on_open if @debug @logger.info("Websocket on_open") end @ws.send({ :type => "authenticate", :token => @api_token, }.to_json) end |
#preflight(program, start, stop, resolution: nil, max_delay: nil) ⇒ Object
112 113 114 115 116 117 118 119 120 121 122 123 124 125 |
# File 'lib/signalfx/signalflow/websocket.rb', line 112 def preflight(program, start, stop, resolution: nil, max_delay: nil) start_job do |channel_name| send_msg({ :type => "preflight", :channel => channel_name, :program => program, :start => start, :stop => stop, :resolution => resolution, :max_delay => max_delay, :compress => @compress, }.reject!{|k,v| v.nil?}.to_json) end end |
#start(program, start: nil, stop: nil, resolution: nil, max_delay: nil) ⇒ Object
127 128 129 130 131 132 133 134 135 136 137 138 139 |
# File 'lib/signalfx/signalflow/websocket.rb', line 127 def start(program, start: nil, stop: nil, resolution: nil, max_delay: nil) start_job do |channel_name| send_msg({ :type => "start", :channel => channel_name, :program => program, :start => start, :stop => stop, :resolution => resolution, :max_delay => max_delay, }.reject!{|k,v| v.nil?}.to_json) end end |
#start_job {|channel.name| ... } ⇒ Object
Starts a job (either execute or preflight) and waits until the JOB_START message is received with the computation handle arrives so that we can create a properly initialized computation object. Yields to the given block which should send the WS message to start the job.
61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 |
# File 'lib/signalfx/signalflow/websocket.rb', line 61 def start_job computation = nil channel = make_new_channel yield channel.name while true begin msg = channel.pop(COMPUTATION_START_TIMEOUT_SECONDS) rescue ChannelTimeout raise "Computation did not start after at least #{COMPUTATION_START_TIMEOUT_SECONDS} seconds" end if msg[:type] == "error" raise ComputationFailure.new(msg[:message]) end # STREAM_START comes before this but contains no useful information if msg[:event] == "JOB_START" computation = Computation.new(msg[:handle], method(:attach), method(:stop)) computation.channel = channel elsif msg[:type] == "computation-started" computation = Computation.new(msg[:computationId], method(:attach), method(:stop)) # Start jobs only use the channel to get error messages and can # detach from the channel once the job has started. channel.detach else next end return computation end end |
#stop(handle, reason) ⇒ Object
141 142 143 144 145 146 147 |
# File 'lib/signalfx/signalflow/websocket.rb', line 141 def stop(handle, reason) send_msg({ :type => "stop", :handle => handle, :reason => reason, }.reject!{|k,v| v.nil?}.to_json) end |