Class: SignalFlowWebsocketTransport

Inherits:
Object
  • Object
show all
Defined in:
lib/signalfx/signalflow/websocket.rb

Overview

A WebSocket transport for SignalFlow. This should not be used directly by end-users.

Constant Summary collapse

DETACHED =
"DETACHED"
COMPUTATION_START_TIMEOUT_SECONDS =

A lower bound on the amount of time to wait for a computation to start

30

Instance Method Summary collapse

Constructor Details

#initialize(api_token, stream_endpoint, logger: Logger.new(STDOUT, progname: "signalfx")) ⇒ SignalFlowWebsocketTransport

Returns a new instance of SignalFlowWebsocketTransport.



20
21
22
23
24
25
26
27
28
29
# File 'lib/signalfx/signalflow/websocket.rb', line 20

def initialize(api_token, stream_endpoint, logger: Logger.new(STDOUT, progname: "signalfx"))
  @api_token = api_token
  @stream_endpoint = stream_endpoint
  @logger = logger
  @compress = true

  @lock = Mutex.new
  @close_reason = nil
  reinit
end

Instance Method Details

#attach(handle, filters: nil, resolution: nil) ⇒ Object

This doesn’t actually work on the backend yet



141
142
143
144
145
146
147
148
149
150
151
152
153
154
# File 'lib/signalfx/signalflow/websocket.rb', line 141

def attach(handle, filters: nil, resolution: nil)
  channel = make_new_channel

  send_msg({
    :type => "attach",
    :channel => channel.name,
    :handle => handle,
    :filters => filters,
    :resolution => resolution,
    :compress => @compress,
  }.reject!{|k,v| v.nil?}.to_json)

  channel
end

#closeObject



171
172
173
174
175
# File 'lib/signalfx/signalflow/websocket.rb', line 171

def close
  if @ws
    @ws.close
  end
end

#detach(channel, reason = nil) ⇒ Object



156
157
158
159
160
161
162
163
164
165
166
167
168
169
# File 'lib/signalfx/signalflow/websocket.rb', line 156

def detach(channel, reason=nil)
  send_msg({
    :type => "detach",
    :channel => channel,
    :reason => reason,
  }.to_json)

  # There is no response message from the server signifying detach complete
  # and there could be messages coming in even after the detach request is
  # sent.  Therefore, use a sentinal value in place of the callback block so
  # that the message receiver logic can distinguish this case from some
  # anomolous case (say, due to bad logic in the code).
  @chan_callbacks[channel] = DETACHED
end

#execute(program, start: nil, stop: nil, resolution: nil, max_delay: nil, persistent: nil, immediate: false) ⇒ Object



86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
# File 'lib/signalfx/signalflow/websocket.rb', line 86

def execute(program, start: nil, stop: nil, resolution: nil, max_delay: nil, persistent: nil, immediate: false)
  start_job do |channel_name|
    send_msg({
      :type => "execute",
      :channel => channel_name,
      :program => program,
      :start => start,
      :stop => stop,
      :resolution => resolution,
      :max_delay => max_delay,
      :persistent => persistent,
      :immediate => immediate,
      :compress => @compress,
    }.reject!{|k,v| v.nil?}.to_json)
  end
end

#on_close(msg) ⇒ Object



202
203
204
205
206
207
208
209
# File 'lib/signalfx/signalflow/websocket.rb', line 202

def on_close(msg)
  @close_reason = "(#{msg.code}, #{msg.data})"
  @chan_callbacks.keys.each do |channel_name|
    invoke_callback_for_channel({ :event => "CONNECTION_CLOSED" }, channel_name)
  end

  reinit
end

#on_message(m) ⇒ Object



211
212
213
214
215
216
217
218
219
220
221
222
223
# File 'lib/signalfx/signalflow/websocket.rb', line 211

def on_message(m)
  begin
    return if m.type == :ping
    if m.type == :close
      on_close(m)
      return
    end

    message_received(m.data, m.type == :text)
  rescue Exception => e
    @logger.error("Error processing SignalFlow message: #{e.backtrace.first}: #{e.message} (#{e.class})")
  end
end

#on_openObject



225
226
227
228
229
230
# File 'lib/signalfx/signalflow/websocket.rb', line 225

def on_open
  @ws.send({
    :type => "authenticate",
    :token => @api_token,
  }.to_json)
end

#preflight(program, start, stop, resolution: nil, max_delay: nil) ⇒ Object



103
104
105
106
107
108
109
110
111
112
113
114
115
116
# File 'lib/signalfx/signalflow/websocket.rb', line 103

def preflight(program, start, stop, resolution: nil, max_delay: nil)
  start_job do |channel_name|
    send_msg({
      :type => "preflight",
      :channel => channel_name,
      :program => program,
      :start => start,
      :stop => stop,
      :resolution => resolution,
      :max_delay => max_delay,
      :compress => @compress,
    }.reject!{|k,v| v.nil?}.to_json)
  end
end

#start(program, start: nil, stop: nil, resolution: nil, max_delay: nil) ⇒ Object



118
119
120
121
122
123
124
125
126
127
128
129
130
# File 'lib/signalfx/signalflow/websocket.rb', line 118

def start(program, start: nil, stop: nil, resolution: nil, max_delay: nil)
  start_job do |channel_name|
    send_msg({
      :type => "start",
      :channel => channel_name,
      :program => program,
      :start => start,
      :stop => stop,
      :resolution => resolution,
      :max_delay => max_delay,
    }.reject!{|k,v| v.nil?}.to_json)
  end
end

#start_job {|channel.name| ... } ⇒ Object

Starts a job (either execute or preflight) and waits until the JOB_START message is received with the computation handle arrives so that we can create a properly initialized computation object. Yields to the given block which should send the WS message to start the job.

Yields:

  • (channel.name)


52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
# File 'lib/signalfx/signalflow/websocket.rb', line 52

def start_job
  computation = nil

  channel = make_new_channel

  yield channel.name

  while true
    begin
      msg = channel.pop(COMPUTATION_START_TIMEOUT_SECONDS)
    rescue ChannelTimeout
      raise "Computation did not start after at least #{COMPUTATION_START_TIMEOUT_SECONDS} seconds"
    end
    if msg[:type] == "error"
      raise ComputationFailure.new(msg[:message])
    end

    # STREAM_START comes before this but contains no useful information
    if msg[:event] == "JOB_START"
      computation = Computation.new(msg[:handle], method(:attach), method(:stop))
      computation.channel = channel
    elsif msg[:type] == "computation-started"
      computation = Computation.new(msg[:computationId], method(:attach), method(:stop))
      # Start jobs only use the channel to get error messages and can
      # detach from the channel once the job has started.
      channel.detach
    else
      next
    end

    return computation
  end
end

#stop(handle, reason) ⇒ Object



132
133
134
135
136
137
138
# File 'lib/signalfx/signalflow/websocket.rb', line 132

def stop(handle, reason)
  send_msg({
    :type => "stop",
    :handle => handle,
    :reason => reason,
  }.reject!{|k,v| v.nil?}.to_json)
end