Class: Signalwire::Blade::Connection
- Inherits:
-
Object
- Object
- Signalwire::Blade::Connection
- Includes:
- EventHandler, Common, Logger
- Defined in:
- lib/signalwire/blade/connection.rb
Instance Attribute Summary collapse
-
#connected ⇒ Object
readonly
Returns the value of attribute connected.
-
#connection ⇒ Object
readonly
Returns the value of attribute connection.
-
#node_id ⇒ Object
readonly
Returns the value of attribute node_id.
-
#session_id ⇒ Object
readonly
Returns the value of attribute session_id.
Instance Method Summary collapse
- #clear_connections ⇒ Object
- #connect! ⇒ Object
- #connect_request ⇒ Object
- #connected? ⇒ Boolean
- #disconnect! ⇒ Object
- #enable_epoll ⇒ Object
- #enqueue_inbound(message) ⇒ Object
- #enqueue_outbound(message) ⇒ Object
- #execute(params, &block) ⇒ Object
- #flush_queues ⇒ Object
- #handle_close ⇒ Object
- #handle_execute_response(event, &block) ⇒ Object
- #handle_signals ⇒ Object
-
#initialize(**options) ⇒ Connection
constructor
A new instance of Connection.
- #keep_alive ⇒ Object
- #log_traffic(direction, message) ⇒ Object
- #main_loop! ⇒ Object
- #ping(&block) ⇒ Object
- #receive(message) ⇒ Object
- #reconnect! ⇒ Object
- #register_for_shutdown(obj) ⇒ Object
- #schedule_flush_queues ⇒ Object
- #setup_started_event ⇒ Object
- #shutdown_from_signal ⇒ Object
- #shutdown_registered ⇒ Object
- #subscribe(params, &block) ⇒ Object
- #transmit(message) ⇒ Object
- #write(message) ⇒ Object
- #write_command(command, &block) ⇒ Object
Methods included from EventHandler
Methods included from Logger
Constructor Details
#initialize(**options) ⇒ Connection
Returns a new instance of Connection.
17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 |
# File 'lib/signalwire/blade/connection.rb', line 17 def initialize(**) @options = @session_id = nil @node_id = nil @connected = false @url = @options.fetch(:url, 'wss://relay.signalwire.com') @log_traffic = .fetch(:log_traffic, true) @authentication = .fetch(:authentication, nil) @inbound_queue = EM::Queue.new @outbound_queue = EM::Queue.new @pong = Concurrent::AtomicBoolean.new @keep_alive_timer = nil @ping_is_sent = Concurrent::AtomicBoolean.new @shutdown_list = [] end |
Instance Attribute Details
#connected ⇒ Object (readonly)
Returns the value of attribute connected.
15 16 17 |
# File 'lib/signalwire/blade/connection.rb', line 15 def connected @connected end |
#connection ⇒ Object (readonly)
Returns the value of attribute connection.
15 16 17 |
# File 'lib/signalwire/blade/connection.rb', line 15 def connection @connection end |
#node_id ⇒ Object (readonly)
Returns the value of attribute node_id.
15 16 17 |
# File 'lib/signalwire/blade/connection.rb', line 15 def node_id @node_id end |
#session_id ⇒ Object (readonly)
Returns the value of attribute session_id.
15 16 17 |
# File 'lib/signalwire/blade/connection.rb', line 15 def session_id @session_id end |
Instance Method Details
#clear_connections ⇒ Object
154 155 156 157 158 |
# File 'lib/signalwire/blade/connection.rb', line 154 def clear_connections @ws = nil @connected = false @keep_alive_timer.cancel if @keep_alive_timer end |
#connect! ⇒ Object
38 39 40 41 42 43 44 |
# File 'lib/signalwire/blade/connection.rb', line 38 def connect! setup_started_event enable_epoll handle_signals main_loop! end |
#connect_request ⇒ Object
182 183 184 185 186 |
# File 'lib/signalwire/blade/connection.rb', line 182 def connect_request req = Connect.new req[:params][:authentication] = @authentication if @authentication req end |
#connected? ⇒ Boolean
188 189 190 |
# File 'lib/signalwire/blade/connection.rb', line 188 def connected? @connected == true end |
#disconnect! ⇒ Object
160 161 162 163 |
# File 'lib/signalwire/blade/connection.rb', line 160 def disconnect! clear_connections EM.stop end |
#enable_epoll ⇒ Object
97 98 99 100 101 |
# File 'lib/signalwire/blade/connection.rb', line 97 def enable_epoll # This is only enabled on Linux EM.epoll logger.debug "Running with epoll #{EM.epoll?}" end |
#enqueue_inbound(message) ⇒ Object
174 175 176 |
# File 'lib/signalwire/blade/connection.rb', line 174 def enqueue_inbound() @inbound_queue.push end |
#enqueue_outbound(message) ⇒ Object
178 179 180 |
# File 'lib/signalwire/blade/connection.rb', line 178 def enqueue_outbound() @outbound_queue.push end |
#execute(params, &block) ⇒ Object
135 136 137 |
# File 'lib/signalwire/blade/connection.rb', line 135 def execute(params, &block) block_given? ? write_command(Execute.new(params), &block) : write_command(Execute.new(params)) end |
#flush_queues ⇒ Object
165 166 167 168 169 170 171 172 |
# File 'lib/signalwire/blade/connection.rb', line 165 def flush_queues @inbound_queue.pop { |inbound| receive(inbound) } until @inbound_queue.empty? if connected? @outbound_queue.pop { |outbound| write(outbound) } until @outbound_queue.empty? end schedule_flush_queues end |
#handle_close ⇒ Object
149 150 151 152 |
# File 'lib/signalwire/blade/connection.rb', line 149 def handle_close logger.warn "WS Socket closed!" reconnect! end |
#handle_execute_response(event, &block) ⇒ Object
130 131 132 133 |
# File 'lib/signalwire/blade/connection.rb', line 130 def handle_execute_response(event, &block) logger.error("Blade error occurred, code #{event.error_code}: #{event.}") if event.error? block.call(event) end |
#handle_signals ⇒ Object
223 224 225 226 227 228 229 230 231 |
# File 'lib/signalwire/blade/connection.rb', line 223 def handle_signals Signal.trap('INT') do shutdown_from_signal end Signal.trap('TERM') do shutdown_from_signal end end |
#keep_alive ⇒ Object
192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 |
# File 'lib/signalwire/blade/connection.rb', line 192 def keep_alive if @ping_is_sent.false? ping do @pong.make_true end @ping_is_sent.make_true else if @pong.false? logger.error "KEEPALIVE: Ping failed" reconnect! if connected? end @ping_is_sent.make_false end @keep_alive_timer = EventMachine::Timer.new(Signalwire::Relay::PING_TIMEOUT) do keep_alive end end |
#log_traffic(direction, message) ⇒ Object
211 212 213 214 215 216 217 218 219 220 221 |
# File 'lib/signalwire/blade/connection.rb', line 211 def log_traffic(direction, ) if @log_traffic pretty = case direction when :send JSON.pretty_generate(JSON.parse()) when :recv JSON.pretty_generate() end end logger.debug "#{direction.to_s.upcase}: #{pretty}" end |
#main_loop! ⇒ Object
54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 |
# File 'lib/signalwire/blade/connection.rb', line 54 def main_loop! EM.run do logger.info "CREATING SOCKET" @ws = Faye::WebSocket::Client.new(@url) @ws.on(:open) { |event| broadcast :started, event } @ws.on(:message) { |event| enqueue_inbound event } @ws.on(:close) { handle_close } @ws.on :error do |error| logger.error "Error occurred: #{error.}" end schedule_flush_queues end end |
#ping(&block) ⇒ Object
139 140 141 142 143 |
# File 'lib/signalwire/blade/connection.rb', line 139 def ping(&block) ping_cmd = Ping.new block_given? ? write_command(ping_cmd, &block) : write_command(ping_cmd) ping_cmd end |
#receive(message) ⇒ Object
112 113 114 115 116 117 118 |
# File 'lib/signalwire/blade/connection.rb', line 112 def receive() event = Message.from_json(.data) log_traffic :recv, event.payload EM.defer do broadcast :message, event end end |
#reconnect! ⇒ Object
46 47 48 49 50 51 52 |
# File 'lib/signalwire/blade/connection.rb', line 46 def reconnect! clear_connections return if @shutdown sleep Signalwire::Blade::RECONNECT_PERIOD logger.info "Attempting reconnection" main_loop! end |
#register_for_shutdown(obj) ⇒ Object
241 242 243 |
# File 'lib/signalwire/blade/connection.rb', line 241 def register_for_shutdown(obj) @shutdown_list << obj end |
#schedule_flush_queues ⇒ Object
71 72 73 |
# File 'lib/signalwire/blade/connection.rb', line 71 def schedule_flush_queues EM.add_timer(0.005) { flush_queues } end |
#setup_started_event ⇒ Object
75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 |
# File 'lib/signalwire/blade/connection.rb', line 75 def setup_started_event on :started do |_event| begin @connected = true myreq = connect_request @pong.make_false write_command(myreq) do |event| @session_id = event.dig(:result, :sessionid) unless @session_id @node_id = event.dig(:result, :nodeid) unless @node_d logger.info "Blade Session connected with id: #{@session_id}" broadcast :connected, event keep_alive end rescue StandardError => e logger.error e.inspect logger.error e.backtrace end end end |
#shutdown_from_signal ⇒ Object
234 235 236 237 238 239 |
# File 'lib/signalwire/blade/connection.rb', line 234 def shutdown_from_signal @shutdown = true shutdown_registered disconnect! exit end |
#shutdown_registered ⇒ Object
245 246 247 248 249 |
# File 'lib/signalwire/blade/connection.rb', line 245 def shutdown_registered @shutdown_list.each do |obj| obj.stop end end |
#subscribe(params, &block) ⇒ Object
145 146 147 |
# File 'lib/signalwire/blade/connection.rb', line 145 def subscribe(params, &block) block_given? ? write_command(Subscribe.new(params), &block) : write_command(Subscribe.new(params)) end |
#transmit(message) ⇒ Object
103 104 105 |
# File 'lib/signalwire/blade/connection.rb', line 103 def transmit() enqueue_outbound end |
#write(message) ⇒ Object
107 108 109 110 |
# File 'lib/signalwire/blade/connection.rb', line 107 def write() log_traffic :send, @ws.send() end |
#write_command(command, &block) ⇒ Object
120 121 122 123 124 125 126 127 128 |
# File 'lib/signalwire/blade/connection.rb', line 120 def write_command(command, &block) if block_given? once :message, id: command.id do |event| handle_execute_response(event, &block) end end transmit(command.build_request.to_json) end |