Class: SignalFlowWebsocketTransport

Inherits:
Object
  • Object
show all
Defined in:
lib/signalfx/signalflow/websocket.rb

Overview

A WebSocket transport for SignalFlow. This should not be used directly by end-users.

Constant Summary collapse

DETACHED =
"DETACHED"
COMPUTATION_START_TIMEOUT_SECONDS =

A lower bound on the amount of time to wait for a computation to start

30

Instance Method Summary collapse

Constructor Details

#initialize(api_token, stream_endpoint, proxy_url: nil, logger: Logger.new(STDOUT, progname: "signalfx"), debug: false) ⇒ SignalFlowWebsocketTransport

Returns a new instance of SignalFlowWebsocketTransport.



26
27
28
29
30
31
32
33
34
35
36
37
38
# File 'lib/signalfx/signalflow/websocket.rb', line 26

def initialize(api_token, stream_endpoint, proxy_url: nil, logger: Logger.new(STDOUT, progname: "signalfx"), debug: false)
  @api_token = api_token
  @stream_endpoint = stream_endpoint
  @logger = logger
  @compress = true
  @proxy_url = proxy_url
  @debug = debug

  @lock = Mutex.new
  @close_reason = nil
  @last_error = nil
  reinit
end

Instance Method Details

#attach(handle, filters: nil, resolution: nil) ⇒ Object

This doesn’t actually work on the backend yet



150
151
152
153
154
155
156
157
158
159
160
161
162
163
# File 'lib/signalfx/signalflow/websocket.rb', line 150

def attach(handle, filters: nil, resolution: nil)
  channel = make_new_channel

  send_msg({
    :type => "attach",
    :channel => channel.name,
    :handle => handle,
    :filters => filters,
    :resolution => resolution,
    :compress => @compress,
  }.reject!{|k,v| v.nil?}.to_json)

  channel
end

#closeObject



180
181
182
183
184
# File 'lib/signalfx/signalflow/websocket.rb', line 180

def close
  if @ws
    @ws.close
  end
end

#detach(channel, reason = nil) ⇒ Object



165
166
167
168
169
170
171
172
173
174
175
176
177
178
# File 'lib/signalfx/signalflow/websocket.rb', line 165

def detach(channel, reason=nil)
  send_msg({
    :type => "detach",
    :channel => channel,
    :reason => reason,
  }.to_json)

  # There is no response message from the server signifying detach complete
  # and there could be messages coming in even after the detach request is
  # sent.  Therefore, use a sentinal value in place of the callback block so
  # that the message receiver logic can distinguish this case from some
  # anomolous case (say, due to bad logic in the code).
  @chan_callbacks[channel] = DETACHED
end

#execute(program, start: nil, stop: nil, resolution: nil, max_delay: nil, persistent: nil, immediate: false) ⇒ Object



95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
# File 'lib/signalfx/signalflow/websocket.rb', line 95

def execute(program, start: nil, stop: nil, resolution: nil, max_delay: nil, persistent: nil, immediate: false)
  start_job do |channel_name|
    send_msg({
      :type => "execute",
      :channel => channel_name,
      :program => program,
      :start => start,
      :stop => stop,
      :resolution => resolution,
      :max_delay => max_delay,
      :persistent => persistent,
      :immediate => immediate,
      :compress => @compress,
    }.reject!{|k,v| v.nil?}.to_json)
  end
end

#on_close(msg) ⇒ Object



214
215
216
217
218
219
220
221
222
223
224
225
# File 'lib/signalfx/signalflow/websocket.rb', line 214

def on_close(msg)
  if @debug
    @logger.info("Websocket on_close: #{msg}")
  end

  @close_reason = "(#{msg.code}, #{msg.reason})"
  @chan_callbacks.keys.each do |channel_name|
    invoke_callback_for_channel({ :event => "CONNECTION_CLOSED" }, channel_name)
  end

  reinit
end

#on_error(e) ⇒ Object



227
228
229
230
# File 'lib/signalfx/signalflow/websocket.rb', line 227

def on_error(e)
  @logger.error("ERROR #{e.inspect}")
  @last_error = e
end

#on_message(m) ⇒ Object



233
234
235
236
237
238
239
240
241
242
243
244
245
# File 'lib/signalfx/signalflow/websocket.rb', line 233

def on_message(m)
  if @debug
    @logger.info("Websocket on_message: #{m}")
  end

  is_text = m.data.kind_of?(String)

  begin
    message_received(m.data, is_text)
  rescue Exception => e
    @logger.error("Error processing SignalFlow message: #{e.backtrace.first}: #{e.message} (#{e.class})")
  end
end

#on_openObject



247
248
249
250
251
252
253
254
255
256
# File 'lib/signalfx/signalflow/websocket.rb', line 247

def on_open
  if @debug
    @logger.info("Websocket on_open")
  end

  @ws.send({
    :type => "authenticate",
    :token => @api_token,
  }.to_json)
end

#preflight(program, start, stop, resolution: nil, max_delay: nil) ⇒ Object



112
113
114
115
116
117
118
119
120
121
122
123
124
125
# File 'lib/signalfx/signalflow/websocket.rb', line 112

def preflight(program, start, stop, resolution: nil, max_delay: nil)
  start_job do |channel_name|
    send_msg({
      :type => "preflight",
      :channel => channel_name,
      :program => program,
      :start => start,
      :stop => stop,
      :resolution => resolution,
      :max_delay => max_delay,
      :compress => @compress,
    }.reject!{|k,v| v.nil?}.to_json)
  end
end

#start(program, start: nil, stop: nil, resolution: nil, max_delay: nil) ⇒ Object



127
128
129
130
131
132
133
134
135
136
137
138
139
# File 'lib/signalfx/signalflow/websocket.rb', line 127

def start(program, start: nil, stop: nil, resolution: nil, max_delay: nil)
  start_job do |channel_name|
    send_msg({
      :type => "start",
      :channel => channel_name,
      :program => program,
      :start => start,
      :stop => stop,
      :resolution => resolution,
      :max_delay => max_delay,
    }.reject!{|k,v| v.nil?}.to_json)
  end
end

#start_job {|channel.name| ... } ⇒ Object

Starts a job (either execute or preflight) and waits until the JOB_START message is received with the computation handle arrives so that we can create a properly initialized computation object. Yields to the given block which should send the WS message to start the job.

Yields:

  • (channel.name)


61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
# File 'lib/signalfx/signalflow/websocket.rb', line 61

def start_job
  computation = nil

  channel = make_new_channel

  yield channel.name

  while true
    begin
      msg = channel.pop(COMPUTATION_START_TIMEOUT_SECONDS)
    rescue ChannelTimeout
      raise "Computation did not start after at least #{COMPUTATION_START_TIMEOUT_SECONDS} seconds"
    end
    if msg[:type] == "error"
      raise ComputationFailure.new(msg[:message])
    end

    # STREAM_START comes before this but contains no useful information
    if msg[:event] == "JOB_START"
      computation = Computation.new(msg[:handle], method(:attach), method(:stop))
      computation.channel = channel
    elsif msg[:type] == "computation-started"
      computation = Computation.new(msg[:computationId], method(:attach), method(:stop))
      # Start jobs only use the channel to get error messages and can
      # detach from the channel once the job has started.
      channel.detach
    else
      next
    end

    return computation
  end
end

#stop(handle, reason) ⇒ Object



141
142
143
144
145
146
147
# File 'lib/signalfx/signalflow/websocket.rb', line 141

def stop(handle, reason)
  send_msg({
    :type => "stop",
    :handle => handle,
    :reason => reason,
  }.reject!{|k,v| v.nil?}.to_json)
end