Class: Signalwire::Blade::Connection

Inherits:
Object
  • Object
show all
Includes:
EventHandler, Common, Logger
Defined in:
lib/signalwire/blade/connection.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from EventHandler

#broadcast

Methods included from Logger

#level=, #logger, logger

Constructor Details

#initialize(**options) ⇒ Connection

Returns a new instance of Connection.



17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# File 'lib/signalwire/blade/connection.rb', line 17

def initialize(**options)
  @options = options
  @session_id = nil
  @node_id = nil
  @connected = false
  @url = @options.fetch(:url, 'wss://relay.signalwire.com')
  @log_traffic = options.fetch(:log_traffic, true)
  @authentication = options.fetch(:authentication, nil)

  

  @inbound_queue = EM::Queue.new
  @outbound_queue = EM::Queue.new

  @pong = Concurrent::AtomicBoolean.new
  @keep_alive_timer = nil
  @ping_is_sent = Concurrent::AtomicBoolean.new

  @shutdown_list = []
end

Instance Attribute Details

#connectedObject (readonly)

Returns the value of attribute connected.



15
16
17
# File 'lib/signalwire/blade/connection.rb', line 15

def connected
  @connected
end

#connectionObject (readonly)

Returns the value of attribute connection.



15
16
17
# File 'lib/signalwire/blade/connection.rb', line 15

def connection
  @connection
end

#node_idObject (readonly)

Returns the value of attribute node_id.



15
16
17
# File 'lib/signalwire/blade/connection.rb', line 15

def node_id
  @node_id
end

#session_idObject (readonly)

Returns the value of attribute session_id.



15
16
17
# File 'lib/signalwire/blade/connection.rb', line 15

def session_id
  @session_id
end

Instance Method Details

#clear_connectionsObject



154
155
156
157
158
# File 'lib/signalwire/blade/connection.rb', line 154

def clear_connections
  @ws = nil
  @connected = false
  @keep_alive_timer.cancel if @keep_alive_timer
end

#connect!Object



38
39
40
41
42
43
44
# File 'lib/signalwire/blade/connection.rb', line 38

def connect!
  setup_started_event
  enable_epoll
  handle_signals

  main_loop!
end

#connect_requestObject



182
183
184
185
186
# File 'lib/signalwire/blade/connection.rb', line 182

def connect_request
  req = Connect.new
  req[:params][:authentication] = @authentication if @authentication
  req
end

#connected?Boolean

Returns:

  • (Boolean)


188
189
190
# File 'lib/signalwire/blade/connection.rb', line 188

def connected?
  @connected == true
end

#disconnect!Object



160
161
162
163
# File 'lib/signalwire/blade/connection.rb', line 160

def disconnect!
  clear_connections
  EM.stop
end

#enable_epollObject



97
98
99
100
101
# File 'lib/signalwire/blade/connection.rb', line 97

def enable_epoll
  # This is only enabled on Linux
  EM.epoll
  logger.debug "Running with epoll #{EM.epoll?}"
end

#enqueue_inbound(message) ⇒ Object



174
175
176
# File 'lib/signalwire/blade/connection.rb', line 174

def enqueue_inbound(message)
  @inbound_queue.push message
end

#enqueue_outbound(message) ⇒ Object



178
179
180
# File 'lib/signalwire/blade/connection.rb', line 178

def enqueue_outbound(message)
  @outbound_queue.push message
end

#execute(params, &block) ⇒ Object



135
136
137
# File 'lib/signalwire/blade/connection.rb', line 135

def execute(params, &block)
  block_given? ? write_command(Execute.new(params), &block) : write_command(Execute.new(params))
end

#flush_queuesObject



165
166
167
168
169
170
171
172
# File 'lib/signalwire/blade/connection.rb', line 165

def flush_queues
  @inbound_queue.pop { |inbound| receive(inbound) } until @inbound_queue.empty?
  if connected?
    @outbound_queue.pop { |outbound| write(outbound) } until @outbound_queue.empty?
  end

  schedule_flush_queues
end

#handle_closeObject



149
150
151
152
# File 'lib/signalwire/blade/connection.rb', line 149

def handle_close
  logger.warn "WS Socket closed!"
  reconnect!
end

#handle_execute_response(event, &block) ⇒ Object



130
131
132
133
# File 'lib/signalwire/blade/connection.rb', line 130

def handle_execute_response(event, &block)
  logger.error("Blade error occurred, code #{event.error_code}: #{event.error_message}") if event.error?
  block.call(event)
end

#handle_signalsObject



223
224
225
226
227
228
229
230
231
# File 'lib/signalwire/blade/connection.rb', line 223

def handle_signals
  Signal.trap('INT') do
    shutdown_from_signal
  end
  
  Signal.trap('TERM') do
    shutdown_from_signal
  end
end

#keep_aliveObject



192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
# File 'lib/signalwire/blade/connection.rb', line 192

def keep_alive
  if @ping_is_sent.false?
    ping do
      @pong.make_true
    end
    @ping_is_sent.make_true
  else
    if @pong.false?
      logger.error "KEEPALIVE: Ping failed"
      reconnect! if connected?
    end
    @ping_is_sent.make_false
  end

  @keep_alive_timer = EventMachine::Timer.new(Signalwire::Relay::PING_TIMEOUT) do
    keep_alive
  end
end

#log_traffic(direction, message) ⇒ Object



211
212
213
214
215
216
217
218
219
220
221
# File 'lib/signalwire/blade/connection.rb', line 211

def log_traffic(direction, message)
  if @log_traffic
    pretty = case direction
             when :send
               JSON.pretty_generate(JSON.parse(message))
             when :recv
               JSON.pretty_generate(message)
             end
  end
  logger.debug "#{direction.to_s.upcase}: #{pretty}"
end

#main_loop!Object



54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
# File 'lib/signalwire/blade/connection.rb', line 54

def main_loop!
  EM.run do
    logger.info "CREATING SOCKET"
    @ws = Faye::WebSocket::Client.new(@url)

    @ws.on(:open) { |event| broadcast :started, event }
    @ws.on(:message) { |event| enqueue_inbound event }
    @ws.on(:close) { handle_close }

    @ws.on :error do |error|
      logger.error "Error occurred: #{error.message}"
    end

    schedule_flush_queues
  end
end

#ping(&block) ⇒ Object



139
140
141
142
143
# File 'lib/signalwire/blade/connection.rb', line 139

def ping(&block)
  ping_cmd = Ping.new
  block_given? ? write_command(ping_cmd, &block) : write_command(ping_cmd)
  ping_cmd
end

#receive(message) ⇒ Object



112
113
114
115
116
117
118
# File 'lib/signalwire/blade/connection.rb', line 112

def receive(message)
  event = Message.from_json(message.data)
  log_traffic :recv, event.payload
  EM.defer do
    broadcast :message, event
  end
end

#reconnect!Object



46
47
48
49
50
51
52
# File 'lib/signalwire/blade/connection.rb', line 46

def reconnect!
  clear_connections
  return if @shutdown
  sleep Signalwire::Blade::RECONNECT_PERIOD
  logger.info "Attempting reconnection"
  main_loop!
end

#register_for_shutdown(obj) ⇒ Object



241
242
243
# File 'lib/signalwire/blade/connection.rb', line 241

def register_for_shutdown(obj)
  @shutdown_list << obj
end

#schedule_flush_queuesObject



71
72
73
# File 'lib/signalwire/blade/connection.rb', line 71

def schedule_flush_queues
  EM.add_timer(0.005) { flush_queues }
end

#setup_started_eventObject



75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
# File 'lib/signalwire/blade/connection.rb', line 75

def setup_started_event
  on :started do |_event|
    begin
      @connected = true
      myreq = connect_request
      @pong.make_false

      write_command(myreq) do |event|
        @session_id = event.dig(:result, :sessionid) unless @session_id
        @node_id = event.dig(:result, :nodeid) unless @node_d
        logger.info "Blade Session connected with id: #{@session_id}"
        broadcast :connected, event
        keep_alive
      end

    rescue StandardError => e
      logger.error e.inspect
      logger.error e.backtrace
    end
  end
end

#shutdown_from_signalObject



234
235
236
237
238
239
# File 'lib/signalwire/blade/connection.rb', line 234

def shutdown_from_signal
  @shutdown = true
  shutdown_registered
  disconnect!
  exit
end

#shutdown_registeredObject



245
246
247
248
249
# File 'lib/signalwire/blade/connection.rb', line 245

def shutdown_registered
  @shutdown_list.each do |obj|
    obj.stop
  end
end

#subscribe(params, &block) ⇒ Object



145
146
147
# File 'lib/signalwire/blade/connection.rb', line 145

def subscribe(params, &block)
  block_given? ? write_command(Subscribe.new(params), &block) : write_command(Subscribe.new(params))
end

#transmit(message) ⇒ Object



103
104
105
# File 'lib/signalwire/blade/connection.rb', line 103

def transmit(message)
  enqueue_outbound message
end

#write(message) ⇒ Object



107
108
109
110
# File 'lib/signalwire/blade/connection.rb', line 107

def write(message)
  log_traffic :send, message
  @ws.send(message)
end

#write_command(command, &block) ⇒ Object



120
121
122
123
124
125
126
127
128
# File 'lib/signalwire/blade/connection.rb', line 120

def write_command(command, &block)
  if block_given?
    once :message, id: command.id do |event|
      handle_execute_response(event, &block)
    end
  end

  transmit(command.build_request.to_json)
end