Class: LosantMqtt::DeviceConnection
- Inherits:
-
EventMachine::Connection
- Object
- EventMachine::Connection
- LosantMqtt::DeviceConnection
- Includes:
- Events::Emitter
- Defined in:
- lib/losant_mqtt/device_connection.rb
Instance Attribute Summary collapse
-
#state ⇒ Object
readonly
Returns the value of attribute state.
Class Method Summary collapse
Instance Method Summary collapse
- #connect_ack(packet) ⇒ Object
- #connected? ⇒ Boolean
- #connection_completed ⇒ Object
- #disconnect(send_msg: true) ⇒ Object
-
#initialize(options = {}) ⇒ DeviceConnection
constructor
A new instance of DeviceConnection.
- #next_packet_id ⇒ Object
- #post_init ⇒ Object
- #process_packet(packet) ⇒ Object
- #publish(topic, payload) ⇒ Object
- #receive_data(data) ⇒ Object
- #send_connect_packet ⇒ Object
- #send_packet(packet) ⇒ Object
- #ssl_handshake_completed ⇒ Object
- #ssl_verify_peer(cert_string) ⇒ Object
- #subscribe(topic, &block) ⇒ Object
- #unbind(msg) ⇒ Object
- #unsubscribe(topic) ⇒ Object
Constructor Details
#initialize(options = {}) ⇒ DeviceConnection
Returns a new instance of DeviceConnection.
36 37 38 39 40 41 42 43 44 45 46 |
# File 'lib/losant_mqtt/device_connection.rb', line 36 def initialize( = {}) @options = { client_id: MQTT::Client.generate_client_id, keep_alive: 15, clean_session: true, username: nil, password: nil, version: "3.1.1" }.merge() @subscriptions = {} end |
Instance Attribute Details
#state ⇒ Object (readonly)
Returns the value of attribute state.
29 30 31 |
# File 'lib/losant_mqtt/device_connection.rb', line 29 def state @state end |
Class Method Details
.connect(options = {}) ⇒ Object
31 32 33 34 |
# File 'lib/losant_mqtt/device_connection.rb', line 31 def self.connect( = {}) = { host: "localhost", port: MQTT::DEFAULT_PORT }.merge() EventMachine.connect([:host], [:port], self, ) end |
Instance Method Details
#connect_ack(packet) ⇒ Object
118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 |
# File 'lib/losant_mqtt/device_connection.rb', line 118 def connect_ack(packet) if packet.return_code != 0x00 @ex = MQTT::ProtocolException.new("Authentication Error - " + packet.return_msg) return close_connection end @state = :connected if @options[:keep_alive] > 0 @timer = EventMachine::PeriodicTimer.new(@options[:keep_alive]) do if(Time.now.to_i - @last_received > @options[:keep_alive]) @ex = MQTT::NotConnectedException.new("Keep alive failure, disconnecting") close_connection else send_packet(MQTT::Packet::Pingreq.new) end end end emit(@state) end |
#connected? ⇒ Boolean
48 49 50 |
# File 'lib/losant_mqtt/device_connection.rb', line 48 def connected? state == :connected end |
#connection_completed ⇒ Object
190 191 192 193 194 195 196 197 198 199 |
# File 'lib/losant_mqtt/device_connection.rb', line 190 def connection_completed if @options[:secure] @last_seen_cert = nil @certificate_store = OpenSSL::X509::Store.new @certificate_store.add_file(CA_FILE_PATH) start_tls(:verify_peer => true) else send_connect_packet end end |
#disconnect(send_msg: true) ⇒ Object
71 72 73 74 75 76 77 |
# File 'lib/losant_mqtt/device_connection.rb', line 71 def disconnect(send_msg: true) return if @state == :disconnecting || @state == :disconnected packet = connected? && send_msg && MQTT::Packet::Disconnect.new @state = :disconnecting emit(@state) packet ? send_packet(packet) : close_connection end |
#next_packet_id ⇒ Object
140 141 142 |
# File 'lib/losant_mqtt/device_connection.rb', line 140 def next_packet_id @packet_id += 1 end |
#post_init ⇒ Object
180 181 182 183 184 185 186 187 188 |
# File 'lib/losant_mqtt/device_connection.rb', line 180 def post_init @state = :connecting @last_received = 0 @packet_id = 0 @packet = nil @data = "" @ex = nil emit(@state) end |
#process_packet(packet) ⇒ Object
92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 |
# File 'lib/losant_mqtt/device_connection.rb', line 92 def process_packet(packet) @last_received = Time.now.to_i if state == :connect_sent && packet.class == MQTT::Packet::Connack connect_ack(packet) elsif state == :connected && packet.class == MQTT::Packet::Pingresp # Pong! elsif state == :connected && packet.class == MQTT::Packet::Publish @subscriptions[packet.topic].call(packet.payload) if @subscriptions[packet.topic] elsif state == :connected && packet.class == MQTT::Packet::Puback # publish acked elsif state == :connected && packet.class == MQTT::Packet::Suback # Subscribed! elsif state == :connected && packet.class == MQTT::Packet::Unsuback # Unsubscribed! else # CONNECT only sent by client # SUBSCRIBE only sent by client # PINGREQ only sent by client # UNSUBSCRIBE only sent by client # DISCONNECT only sent by client # PUBREC/PUBREL/PUBCOMP for QOS2 - do not support @ex = MQTT::ProtocolException.new("Wasn't expecting packet of type #{packet.class} when in state #{state}") close_connection end end |
#publish(topic, payload) ⇒ Object
52 53 54 55 56 57 58 59 |
# File 'lib/losant_mqtt/device_connection.rb', line 52 def publish(topic, payload) send_packet(MQTT::Packet::Publish.new( id: next_packet_id, qos: 0, retain: false, topic: topic, payload: payload)) end |
#receive_data(data) ⇒ Object
163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 |
# File 'lib/losant_mqtt/device_connection.rb', line 163 def receive_data(data) @data << data # Are we at the start of a new packet? if !@packet && @data.length >= 2 @packet = MQTT::Packet.parse_header(@data) end # Do we have the the full packet body now? if @packet && @data.length >= @packet.body_length @packet.parse_body(@data.slice!(0...@packet.body_length)) process_packet(@packet) @packet = nil receive_data("") end end |
#send_connect_packet ⇒ Object
79 80 81 82 83 84 85 86 87 88 89 90 |
# File 'lib/losant_mqtt/device_connection.rb', line 79 def send_connect_packet packet = MQTT::Packet::Connect.new( client_id: @options[:client_id], clean_session: @options[:clean_session], keep_alive: @options[:keep_alive], username: @options[:username], password: @options[:password], version: @options[:version]) send_packet(packet) @state = :connect_sent emit(@state) end |
#send_packet(packet) ⇒ Object
144 145 146 |
# File 'lib/losant_mqtt/device_connection.rb', line 144 def send_packet(packet) send_data(packet.to_s) end |
#ssl_handshake_completed ⇒ Object
220 221 222 223 224 225 226 227 |
# File 'lib/losant_mqtt/device_connection.rb', line 220 def ssl_handshake_completed unless OpenSSL::SSL.verify_certificate_identity(@last_seen_cert, @options[:host]) @ex = OpenSSL::OpenSSLError.new("The hostname #{@options[:host]} does not match the server certificate") return close_connection end send_connect_packet end |
#ssl_verify_peer(cert_string) ⇒ Object
201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 |
# File 'lib/losant_mqtt/device_connection.rb', line 201 def ssl_verify_peer(cert_string) @last_seen_cert = OpenSSL::X509::Certificate.new(cert_string) unless @certificate_store.verify(@last_seen_cert) @ex = OpenSSL::OpenSSLError.new("Unable to verify the certificate for #{@options[:host]}") return false end begin @certificate_store.add_cert(@last_seen_cert) rescue OpenSSL::X509::StoreError => e unless e. == "cert already in hash table" @ex = e return false end end true end |
#subscribe(topic, &block) ⇒ Object
61 62 63 64 |
# File 'lib/losant_mqtt/device_connection.rb', line 61 def subscribe(topic, &block) @subscriptions[topic] = block send_packet(MQTT::Packet::Subscribe.new(id: next_packet_id, topics: [topic])) end |
#unbind(msg) ⇒ Object
148 149 150 151 152 153 154 155 156 157 158 159 160 161 |
# File 'lib/losant_mqtt/device_connection.rb', line 148 def unbind(msg) if @timer @timer.cancel @timer = nil end unless @state == :disconnecting @ex ||= $! || MQTT::NotConnectedException.new("Connection to server lost") end @state = :disconnected emit(@state, @ex) @ex = nil end |
#unsubscribe(topic) ⇒ Object
66 67 68 69 |
# File 'lib/losant_mqtt/device_connection.rb', line 66 def unsubscribe(topic) @subscriptions.delete(topic) send_packet(MQTT::Packet::Unsubscribe.new(id: next_packet_id, topics: [topic])) end |