Class: Signalwire::Blade::Connection

Inherits:
Object
  • Object
show all
Includes:
EventHandler, Common, Logger
Defined in:
lib/signalwire/blade/connection.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from EventHandler

#broadcast

Methods included from Logger

#level=, #logger, logger

Constructor Details

#initialize(**options) ⇒ Connection

Returns a new instance of Connection.



16
17
18
19
20
21
22
23
24
25
26
27
28
29
# File 'lib/signalwire/blade/connection.rb', line 16

def initialize(**options)
  @options = options
  @session_id = nil
  @node_id = nil
  @connected = false
  @url = @options.fetch(:url, 'wss://relay.signalwire.com')
  @log_traffic = options.fetch(:log_traffic, true)
  @authentication = options.fetch(:authentication, nil)

  @inbound_queue = EM::Queue.new
  @outbound_queue = EM::Queue.new

  @shutdown_list = []
end

Instance Attribute Details

#connectedObject (readonly)

Returns the value of attribute connected.



14
15
16
# File 'lib/signalwire/blade/connection.rb', line 14

def connected
  @connected
end

#connectionObject (readonly)

Returns the value of attribute connection.



14
15
16
# File 'lib/signalwire/blade/connection.rb', line 14

def connection
  @connection
end

#node_idObject (readonly)

Returns the value of attribute node_id.



14
15
16
# File 'lib/signalwire/blade/connection.rb', line 14

def node_id
  @node_id
end

#session_idObject (readonly)

Returns the value of attribute session_id.



14
15
16
# File 'lib/signalwire/blade/connection.rb', line 14

def session_id
  @session_id
end

Instance Method Details

#connect!Object



31
32
33
34
35
36
37
# File 'lib/signalwire/blade/connection.rb', line 31

def connect!
  setup_started_event
  enable_epoll
  handle_signals

  main_loop!
end

#connect_requestObject



162
163
164
165
166
# File 'lib/signalwire/blade/connection.rb', line 162

def connect_request
  req = Connect.new
  req[:params][:authentication] = @authentication if @authentication
  req
end

#connected?Boolean

Returns:

  • (Boolean)


168
169
170
# File 'lib/signalwire/blade/connection.rb', line 168

def connected?
  @connected == true
end

#disconnect!Object



138
139
140
141
142
143
# File 'lib/signalwire/blade/connection.rb', line 138

def disconnect!
  # logger.info 'Stopping Blade event loop'
  @ws = nil
  @connected = false
  EM.stop
end

#enable_epollObject



88
89
90
91
92
# File 'lib/signalwire/blade/connection.rb', line 88

def enable_epoll
  # This is only enabled on Linux
  EM.epoll
  logger.debug "Running with epoll #{EM.epoll?}"
end

#enqueue_inbound(message) ⇒ Object



154
155
156
# File 'lib/signalwire/blade/connection.rb', line 154

def enqueue_inbound(message)
  @inbound_queue.push message
end

#enqueue_outbound(message) ⇒ Object



158
159
160
# File 'lib/signalwire/blade/connection.rb', line 158

def enqueue_outbound(message)
  @outbound_queue.push message
end

#execute(params, &block) ⇒ Object



126
127
128
# File 'lib/signalwire/blade/connection.rb', line 126

def execute(params, &block)
  block_given? ? write_command(Execute.new(params), &block) : write_command(Execute.new(params))
end

#flush_queuesObject



145
146
147
148
149
150
151
152
# File 'lib/signalwire/blade/connection.rb', line 145

def flush_queues
  @inbound_queue.pop { |inbound| receive(inbound) } until @inbound_queue.empty?
  if connected?
    @outbound_queue.pop { |outbound| write(outbound) } until @outbound_queue.empty?
  end

  schedule_flush_queues
end

#handle_closeObject



134
135
136
# File 'lib/signalwire/blade/connection.rb', line 134

def handle_close
  reconnect!
end

#handle_execute_response(event, &block) ⇒ Object



121
122
123
124
# File 'lib/signalwire/blade/connection.rb', line 121

def handle_execute_response(event, &block)
  logger.error("Blade error occurred, code #{event.error_code}: #{event.error_message}") if event.error?
  block.call(event)
end

#handle_signalsObject



201
202
203
204
205
206
207
208
209
# File 'lib/signalwire/blade/connection.rb', line 201

def handle_signals
  Signal.trap('INT') do
    shutdown_from_signal
  end
  
  Signal.trap('TERM') do
    shutdown_from_signal
  end
end

#log_traffic(direction, message) ⇒ Object



189
190
191
192
193
194
195
196
197
198
199
# File 'lib/signalwire/blade/connection.rb', line 189

def log_traffic(direction, message)
  if @log_traffic
    pretty = case direction
             when :send
               JSON.pretty_generate(JSON.parse(message))
             when :recv
               JSON.pretty_generate(message)
             end
  end
  logger.debug "#{direction.to_s.upcase}: #{pretty}"
end

#main_loop!Object



47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
# File 'lib/signalwire/blade/connection.rb', line 47

def main_loop!
  EM.run do
    @ws = Faye::WebSocket::Client.new(@url)

    @ws.on(:open) { |event| broadcast :started, event }
    @ws.on(:message) { |event| enqueue_inbound event }
    @ws.on(:close) { handle_close }

    @ws.on :error do |error|
      logger.error "Error occurred: #{error.message}"
    end

    schedule_flush_queues
  end
end

#receive(message) ⇒ Object



103
104
105
106
107
108
109
# File 'lib/signalwire/blade/connection.rb', line 103

def receive(message)
  event = Message.from_json(message.data)
  log_traffic :recv, event.payload
  EM.defer do
    broadcast :message, event
  end
end

#reconnect!Object



39
40
41
42
43
44
45
# File 'lib/signalwire/blade/connection.rb', line 39

def reconnect!
  @connected = false
  return if @shutdown
  sleep Signalwire::Blade::RECONNECT_PERIOD
  logger.info "Attempting reconnection"
  main_loop!
end

#register_for_shutdown(obj) ⇒ Object



219
220
221
# File 'lib/signalwire/blade/connection.rb', line 219

def register_for_shutdown(obj)
  @shutdown_list << obj
end

#schedule_flush_queuesObject



63
64
65
# File 'lib/signalwire/blade/connection.rb', line 63

def schedule_flush_queues
  EM.add_timer(0.005) { flush_queues }
end

#setup_started_eventObject



67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
# File 'lib/signalwire/blade/connection.rb', line 67

def setup_started_event
  on :started do |_event|
    begin
      @connected = true
      myreq = connect_request
      start_periodic_timer

      write_command(myreq) do |event|
        @session_id = event.dig(:result, :sessionid) unless @session_id
        @node_id = event.dig(:result, :nodeid) unless @node_d
        logger.info "Blade Session connected with id: #{@session_id}"
        broadcast :connected, event
      end

    rescue StandardError => e
      logger.error e.inspect
      logger.error e.backtrace
    end
  end
end

#shutdown_from_signalObject



212
213
214
215
216
217
# File 'lib/signalwire/blade/connection.rb', line 212

def shutdown_from_signal
  @shutdown = true
  shutdown_registered
  disconnect!
  exit
end

#shutdown_registeredObject



223
224
225
226
227
# File 'lib/signalwire/blade/connection.rb', line 223

def shutdown_registered
  @shutdown_list.each do |obj|
    obj.stop
  end
end

#start_periodic_timerObject



172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
# File 'lib/signalwire/blade/connection.rb', line 172

def start_periodic_timer
  pinger = EventMachine::PeriodicTimer.new(Signalwire::Relay::PING_TIMEOUT) do
    timeouter = EventMachine::Timer.new(2) do
      # reconnect logic goes here
      logger.error "We got disconnected!"
      pinger.cancel
      reconnect! if connected?
    end
    
    if @connected
      @ws.ping 'detecting presence' do
        timeouter.cancel
      end
    end
  end
end

#subscribe(params, &block) ⇒ Object



130
131
132
# File 'lib/signalwire/blade/connection.rb', line 130

def subscribe(params, &block)
  block_given? ? write_command(Subscribe.new(params), &block) : write_command(Subscribe.new(params))
end

#transmit(message) ⇒ Object



94
95
96
# File 'lib/signalwire/blade/connection.rb', line 94

def transmit(message)
  enqueue_outbound message
end

#write(message) ⇒ Object



98
99
100
101
# File 'lib/signalwire/blade/connection.rb', line 98

def write(message)
  log_traffic :send, message
  @ws.send(message)
end

#write_command(command, &block) ⇒ Object



111
112
113
114
115
116
117
118
119
# File 'lib/signalwire/blade/connection.rb', line 111

def write_command(command, &block)
  if block_given?
    once :message, id: command.id do |event|
      handle_execute_response(event, &block)
    end
  end

  transmit(command.build_request.to_json)
end