Class: Signalwire::Blade::Connection
- Inherits:
-
Object
- Object
- Signalwire::Blade::Connection
- Includes:
- EventHandler, Common, Logger
- Defined in:
- lib/signalwire/blade/connection.rb
Instance Attribute Summary collapse
-
#connected ⇒ Object
readonly
Returns the value of attribute connected.
-
#connection ⇒ Object
readonly
Returns the value of attribute connection.
-
#node_id ⇒ Object
readonly
Returns the value of attribute node_id.
-
#session_id ⇒ Object
readonly
Returns the value of attribute session_id.
Instance Method Summary collapse
- #connect! ⇒ Object
- #connect_request ⇒ Object
- #connected? ⇒ Boolean
- #disconnect! ⇒ Object
- #enable_epoll ⇒ Object
- #enqueue_inbound(message) ⇒ Object
- #enqueue_outbound(message) ⇒ Object
- #execute(params, &block) ⇒ Object
- #flush_queues ⇒ Object
- #handle_close ⇒ Object
- #handle_execute_response(event, &block) ⇒ Object
- #handle_signals ⇒ Object
-
#initialize(**options) ⇒ Connection
constructor
A new instance of Connection.
- #log_traffic(direction, message) ⇒ Object
- #main_loop! ⇒ Object
- #receive(message) ⇒ Object
- #reconnect! ⇒ Object
- #register_for_shutdown(obj) ⇒ Object
- #schedule_flush_queues ⇒ Object
- #setup_started_event ⇒ Object
- #shutdown_from_signal ⇒ Object
- #shutdown_registered ⇒ Object
- #start_periodic_timer ⇒ Object
- #subscribe(params, &block) ⇒ Object
- #transmit(message) ⇒ Object
- #write(message) ⇒ Object
- #write_command(command, &block) ⇒ Object
Methods included from EventHandler
Methods included from Logger
Constructor Details
#initialize(**options) ⇒ Connection
Returns a new instance of Connection.
16 17 18 19 20 21 22 23 24 25 26 27 28 29 |
# File 'lib/signalwire/blade/connection.rb', line 16 def initialize(**) @options = @session_id = nil @node_id = nil @connected = false @url = @options.fetch(:url, 'wss://relay.signalwire.com') @log_traffic = .fetch(:log_traffic, true) @authentication = .fetch(:authentication, nil) @inbound_queue = EM::Queue.new @outbound_queue = EM::Queue.new @shutdown_list = [] end |
Instance Attribute Details
#connected ⇒ Object (readonly)
Returns the value of attribute connected.
14 15 16 |
# File 'lib/signalwire/blade/connection.rb', line 14 def connected @connected end |
#connection ⇒ Object (readonly)
Returns the value of attribute connection.
14 15 16 |
# File 'lib/signalwire/blade/connection.rb', line 14 def connection @connection end |
#node_id ⇒ Object (readonly)
Returns the value of attribute node_id.
14 15 16 |
# File 'lib/signalwire/blade/connection.rb', line 14 def node_id @node_id end |
#session_id ⇒ Object (readonly)
Returns the value of attribute session_id.
14 15 16 |
# File 'lib/signalwire/blade/connection.rb', line 14 def session_id @session_id end |
Instance Method Details
#connect! ⇒ Object
31 32 33 34 35 36 37 |
# File 'lib/signalwire/blade/connection.rb', line 31 def connect! setup_started_event enable_epoll handle_signals main_loop! end |
#connect_request ⇒ Object
162 163 164 165 166 |
# File 'lib/signalwire/blade/connection.rb', line 162 def connect_request req = Connect.new req[:params][:authentication] = @authentication if @authentication req end |
#connected? ⇒ Boolean
168 169 170 |
# File 'lib/signalwire/blade/connection.rb', line 168 def connected? @connected == true end |
#disconnect! ⇒ Object
138 139 140 141 142 143 |
# File 'lib/signalwire/blade/connection.rb', line 138 def disconnect! # logger.info 'Stopping Blade event loop' @ws = nil @connected = false EM.stop end |
#enable_epoll ⇒ Object
88 89 90 91 92 |
# File 'lib/signalwire/blade/connection.rb', line 88 def enable_epoll # This is only enabled on Linux EM.epoll logger.debug "Running with epoll #{EM.epoll?}" end |
#enqueue_inbound(message) ⇒ Object
154 155 156 |
# File 'lib/signalwire/blade/connection.rb', line 154 def enqueue_inbound() @inbound_queue.push end |
#enqueue_outbound(message) ⇒ Object
158 159 160 |
# File 'lib/signalwire/blade/connection.rb', line 158 def enqueue_outbound() @outbound_queue.push end |
#execute(params, &block) ⇒ Object
126 127 128 |
# File 'lib/signalwire/blade/connection.rb', line 126 def execute(params, &block) block_given? ? write_command(Execute.new(params), &block) : write_command(Execute.new(params)) end |
#flush_queues ⇒ Object
145 146 147 148 149 150 151 152 |
# File 'lib/signalwire/blade/connection.rb', line 145 def flush_queues @inbound_queue.pop { |inbound| receive(inbound) } until @inbound_queue.empty? if connected? @outbound_queue.pop { |outbound| write(outbound) } until @outbound_queue.empty? end schedule_flush_queues end |
#handle_close ⇒ Object
134 135 136 |
# File 'lib/signalwire/blade/connection.rb', line 134 def handle_close reconnect! end |
#handle_execute_response(event, &block) ⇒ Object
121 122 123 124 |
# File 'lib/signalwire/blade/connection.rb', line 121 def handle_execute_response(event, &block) logger.error("Blade error occurred, code #{event.error_code}: #{event.}") if event.error? block.call(event) end |
#handle_signals ⇒ Object
201 202 203 204 205 206 207 208 209 |
# File 'lib/signalwire/blade/connection.rb', line 201 def handle_signals Signal.trap('INT') do shutdown_from_signal end Signal.trap('TERM') do shutdown_from_signal end end |
#log_traffic(direction, message) ⇒ Object
189 190 191 192 193 194 195 196 197 198 199 |
# File 'lib/signalwire/blade/connection.rb', line 189 def log_traffic(direction, ) if @log_traffic pretty = case direction when :send JSON.pretty_generate(JSON.parse()) when :recv JSON.pretty_generate() end end logger.debug "#{direction.to_s.upcase}: #{pretty}" end |
#main_loop! ⇒ Object
47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 |
# File 'lib/signalwire/blade/connection.rb', line 47 def main_loop! EM.run do @ws = Faye::WebSocket::Client.new(@url) @ws.on(:open) { |event| broadcast :started, event } @ws.on(:message) { |event| enqueue_inbound event } @ws.on(:close) { handle_close } @ws.on :error do |error| logger.error "Error occurred: #{error.}" end schedule_flush_queues end end |
#receive(message) ⇒ Object
103 104 105 106 107 108 109 |
# File 'lib/signalwire/blade/connection.rb', line 103 def receive() event = Message.from_json(.data) log_traffic :recv, event.payload EM.defer do broadcast :message, event end end |
#reconnect! ⇒ Object
39 40 41 42 43 44 45 |
# File 'lib/signalwire/blade/connection.rb', line 39 def reconnect! @connected = false return if @shutdown sleep Signalwire::Blade::RECONNECT_PERIOD logger.info "Attempting reconnection" main_loop! end |
#register_for_shutdown(obj) ⇒ Object
219 220 221 |
# File 'lib/signalwire/blade/connection.rb', line 219 def register_for_shutdown(obj) @shutdown_list << obj end |
#schedule_flush_queues ⇒ Object
63 64 65 |
# File 'lib/signalwire/blade/connection.rb', line 63 def schedule_flush_queues EM.add_timer(0.005) { flush_queues } end |
#setup_started_event ⇒ Object
67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 |
# File 'lib/signalwire/blade/connection.rb', line 67 def setup_started_event on :started do |_event| begin @connected = true myreq = connect_request start_periodic_timer write_command(myreq) do |event| @session_id = event.dig(:result, :sessionid) unless @session_id @node_id = event.dig(:result, :nodeid) unless @node_d logger.info "Blade Session connected with id: #{@session_id}" broadcast :connected, event end rescue StandardError => e logger.error e.inspect logger.error e.backtrace end end end |
#shutdown_from_signal ⇒ Object
212 213 214 215 216 217 |
# File 'lib/signalwire/blade/connection.rb', line 212 def shutdown_from_signal @shutdown = true shutdown_registered disconnect! exit end |
#shutdown_registered ⇒ Object
223 224 225 226 227 |
# File 'lib/signalwire/blade/connection.rb', line 223 def shutdown_registered @shutdown_list.each do |obj| obj.stop end end |
#start_periodic_timer ⇒ Object
172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 |
# File 'lib/signalwire/blade/connection.rb', line 172 def start_periodic_timer pinger = EventMachine::PeriodicTimer.new(Signalwire::Relay::PING_TIMEOUT) do timeouter = EventMachine::Timer.new(2) do # reconnect logic goes here logger.error "We got disconnected!" pinger.cancel reconnect! if connected? end if @connected @ws.ping 'detecting presence' do timeouter.cancel end end end end |
#subscribe(params, &block) ⇒ Object
130 131 132 |
# File 'lib/signalwire/blade/connection.rb', line 130 def subscribe(params, &block) block_given? ? write_command(Subscribe.new(params), &block) : write_command(Subscribe.new(params)) end |
#transmit(message) ⇒ Object
94 95 96 |
# File 'lib/signalwire/blade/connection.rb', line 94 def transmit() enqueue_outbound end |
#write(message) ⇒ Object
98 99 100 101 |
# File 'lib/signalwire/blade/connection.rb', line 98 def write() log_traffic :send, @ws.send() end |
#write_command(command, &block) ⇒ Object
111 112 113 114 115 116 117 118 119 |
# File 'lib/signalwire/blade/connection.rb', line 111 def write_command(command, &block) if block_given? once :message, id: command.id do |event| handle_execute_response(event, &block) end end transmit(command.build_request.to_json) end |