Class: SignalFlowWebsocketTransport
- Inherits:
-
Object
- Object
- SignalFlowWebsocketTransport
- Defined in:
- lib/signalfx/signalflow/websocket.rb
Overview
A WebSocket transport for SignalFlow. This should not be used directly by end-users.
Constant Summary collapse
- DETACHED =
"DETACHED"
- COMPUTATION_START_TIMEOUT_SECONDS =
A lower bound on the amount of time to wait for a computation to start
30
Instance Method Summary collapse
-
#attach(handle, filters: nil, resolution: nil) ⇒ Object
This doesn’t actually work on the backend yet.
- #close ⇒ Object
- #detach(channel, reason = nil) ⇒ Object
- #execute(program, start: nil, stop: nil, resolution: nil, max_delay: nil, persistent: nil, immediate: false) ⇒ Object
-
#initialize(api_token, stream_endpoint, logger: Logger.new(STDOUT, progname: "signalfx")) ⇒ SignalFlowWebsocketTransport
constructor
A new instance of SignalFlowWebsocketTransport.
- #on_close(msg) ⇒ Object
- #on_message(m) ⇒ Object
- #on_open ⇒ Object
- #preflight(program, start, stop, resolution: nil, max_delay: nil) ⇒ Object
- #start(program, start: nil, stop: nil, resolution: nil, max_delay: nil) ⇒ Object
-
#start_job {|channel.name| ... } ⇒ Object
Starts a job (either execute or preflight) and waits until the JOB_START message is received with the computation handle arrives so that we can create a properly initialized computation object.
- #stop(handle, reason) ⇒ Object
Constructor Details
#initialize(api_token, stream_endpoint, logger: Logger.new(STDOUT, progname: "signalfx")) ⇒ SignalFlowWebsocketTransport
Returns a new instance of SignalFlowWebsocketTransport.
20 21 22 23 24 25 26 27 28 29 |
# File 'lib/signalfx/signalflow/websocket.rb', line 20 def initialize(api_token, stream_endpoint, logger: Logger.new(STDOUT, progname: "signalfx")) @api_token = api_token @stream_endpoint = stream_endpoint @logger = logger @compress = true @lock = Mutex.new @close_reason = nil reinit end |
Instance Method Details
#attach(handle, filters: nil, resolution: nil) ⇒ Object
This doesn’t actually work on the backend yet
141 142 143 144 145 146 147 148 149 150 151 152 153 154 |
# File 'lib/signalfx/signalflow/websocket.rb', line 141 def attach(handle, filters: nil, resolution: nil) channel = make_new_channel send_msg({ :type => "attach", :channel => channel.name, :handle => handle, :filters => filters, :resolution => resolution, :compress => @compress, }.reject!{|k,v| v.nil?}.to_json) channel end |
#close ⇒ Object
171 172 173 174 175 |
# File 'lib/signalfx/signalflow/websocket.rb', line 171 def close if @ws @ws.close end end |
#detach(channel, reason = nil) ⇒ Object
156 157 158 159 160 161 162 163 164 165 166 167 168 169 |
# File 'lib/signalfx/signalflow/websocket.rb', line 156 def detach(channel, reason=nil) send_msg({ :type => "detach", :channel => channel, :reason => reason, }.to_json) # There is no response message from the server signifying detach complete # and there could be messages coming in even after the detach request is # sent. Therefore, use a sentinal value in place of the callback block so # that the message receiver logic can distinguish this case from some # anomolous case (say, due to bad logic in the code). @chan_callbacks[channel] = DETACHED end |
#execute(program, start: nil, stop: nil, resolution: nil, max_delay: nil, persistent: nil, immediate: false) ⇒ Object
86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 |
# File 'lib/signalfx/signalflow/websocket.rb', line 86 def execute(program, start: nil, stop: nil, resolution: nil, max_delay: nil, persistent: nil, immediate: false) start_job do |channel_name| send_msg({ :type => "execute", :channel => channel_name, :program => program, :start => start, :stop => stop, :resolution => resolution, :max_delay => max_delay, :persistent => persistent, :immediate => immediate, :compress => @compress, }.reject!{|k,v| v.nil?}.to_json) end end |
#on_close(msg) ⇒ Object
202 203 204 205 206 207 208 209 |
# File 'lib/signalfx/signalflow/websocket.rb', line 202 def on_close(msg) @close_reason = "(#{msg.code}, #{msg.data})" @chan_callbacks.keys.each do |channel_name| invoke_callback_for_channel({ :event => "CONNECTION_CLOSED" }, channel_name) end reinit end |
#on_message(m) ⇒ Object
211 212 213 214 215 216 217 218 219 220 221 222 223 |
# File 'lib/signalfx/signalflow/websocket.rb', line 211 def (m) begin return if m.type == :ping if m.type == :close on_close(m) return end (m.data, m.type == :text) rescue Exception => e @logger.error("Error processing SignalFlow message: #{e.backtrace.first}: #{e.} (#{e.class})") end end |
#on_open ⇒ Object
225 226 227 228 229 230 |
# File 'lib/signalfx/signalflow/websocket.rb', line 225 def on_open @ws.send({ :type => "authenticate", :token => @api_token, }.to_json) end |
#preflight(program, start, stop, resolution: nil, max_delay: nil) ⇒ Object
103 104 105 106 107 108 109 110 111 112 113 114 115 116 |
# File 'lib/signalfx/signalflow/websocket.rb', line 103 def preflight(program, start, stop, resolution: nil, max_delay: nil) start_job do |channel_name| send_msg({ :type => "preflight", :channel => channel_name, :program => program, :start => start, :stop => stop, :resolution => resolution, :max_delay => max_delay, :compress => @compress, }.reject!{|k,v| v.nil?}.to_json) end end |
#start(program, start: nil, stop: nil, resolution: nil, max_delay: nil) ⇒ Object
118 119 120 121 122 123 124 125 126 127 128 129 130 |
# File 'lib/signalfx/signalflow/websocket.rb', line 118 def start(program, start: nil, stop: nil, resolution: nil, max_delay: nil) start_job do |channel_name| send_msg({ :type => "start", :channel => channel_name, :program => program, :start => start, :stop => stop, :resolution => resolution, :max_delay => max_delay, }.reject!{|k,v| v.nil?}.to_json) end end |
#start_job {|channel.name| ... } ⇒ Object
Starts a job (either execute or preflight) and waits until the JOB_START message is received with the computation handle arrives so that we can create a properly initialized computation object. Yields to the given block which should send the WS message to start the job.
52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 |
# File 'lib/signalfx/signalflow/websocket.rb', line 52 def start_job computation = nil channel = make_new_channel yield channel.name while true begin msg = channel.pop(COMPUTATION_START_TIMEOUT_SECONDS) rescue ChannelTimeout raise "Computation did not start after at least #{COMPUTATION_START_TIMEOUT_SECONDS} seconds" end if msg[:type] == "error" raise ComputationFailure.new(msg[:message]) end # STREAM_START comes before this but contains no useful information if msg[:event] == "JOB_START" computation = Computation.new(msg[:handle], method(:attach), method(:stop)) computation.channel = channel elsif msg[:type] == "computation-started" computation = Computation.new(msg[:computationId], method(:attach), method(:stop)) # Start jobs only use the channel to get error messages and can # detach from the channel once the job has started. channel.detach else next end return computation end end |
#stop(handle, reason) ⇒ Object
132 133 134 135 136 137 138 |
# File 'lib/signalfx/signalflow/websocket.rb', line 132 def stop(handle, reason) send_msg({ :type => "stop", :handle => handle, :reason => reason, }.reject!{|k,v| v.nil?}.to_json) end |