Class: LosantMqtt::DeviceConnection

Inherits:
EventMachine::Connection
  • Object
show all
Includes:
Events::Emitter
Defined in:
lib/losant_mqtt/device_connection.rb

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options = {}) ⇒ DeviceConnection

Returns a new instance of DeviceConnection.



36
37
38
39
40
41
42
43
44
45
46
# File 'lib/losant_mqtt/device_connection.rb', line 36

def initialize(options = {})
  @options = {
    client_id:     MQTT::Client.generate_client_id,
    keep_alive:    15,
    clean_session: true,
    username:      nil,
    password:      nil,
    version:       "3.1.1"
  }.merge(options)
  @subscriptions = {}
end

Instance Attribute Details

#stateObject (readonly)

Returns the value of attribute state.



29
30
31
# File 'lib/losant_mqtt/device_connection.rb', line 29

def state
  @state
end

Class Method Details

.connect(options = {}) ⇒ Object



31
32
33
34
# File 'lib/losant_mqtt/device_connection.rb', line 31

def self.connect(options = {})
  options = { host: "localhost", port: MQTT::DEFAULT_PORT }.merge(options)
  EventMachine.connect(options[:host], options[:port], self, options)
end

Instance Method Details

#connect_ack(packet) ⇒ Object



118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
# File 'lib/losant_mqtt/device_connection.rb', line 118

def connect_ack(packet)
  if packet.return_code != 0x00
    @ex = MQTT::ProtocolException.new("Authentication Error - " + packet.return_msg)
    return close_connection
  end

  @state = :connected

  if @options[:keep_alive] > 0
    @timer = EventMachine::PeriodicTimer.new(@options[:keep_alive]) do
      if(Time.now.to_i - @last_received > @options[:keep_alive])
        @ex = MQTT::NotConnectedException.new("Keep alive failure, disconnecting")
        close_connection
      else
        send_packet(MQTT::Packet::Pingreq.new)
      end
    end
  end

  emit(@state)
end

#connected?Boolean

Returns:

  • (Boolean)


48
49
50
# File 'lib/losant_mqtt/device_connection.rb', line 48

def connected?
  state == :connected
end

#connection_completedObject



190
191
192
193
194
195
196
197
198
199
# File 'lib/losant_mqtt/device_connection.rb', line 190

def connection_completed
  if @options[:secure]
    @last_seen_cert    = nil
    @certificate_store = OpenSSL::X509::Store.new
    @certificate_store.add_file(CA_FILE_PATH)
    start_tls(:verify_peer => true)
  else
    send_connect_packet
  end
end

#disconnect(send_msg: true) ⇒ Object



71
72
73
74
75
76
77
# File 'lib/losant_mqtt/device_connection.rb', line 71

def disconnect(send_msg: true)
  return if @state == :disconnecting || @state == :disconnected
  packet = connected? && send_msg && MQTT::Packet::Disconnect.new
  @state = :disconnecting
  emit(@state)
  packet ? send_packet(packet) : close_connection
end

#next_packet_idObject



140
141
142
# File 'lib/losant_mqtt/device_connection.rb', line 140

def next_packet_id
  @packet_id += 1
end

#post_initObject



180
181
182
183
184
185
186
187
188
# File 'lib/losant_mqtt/device_connection.rb', line 180

def post_init
  @state         = :connecting
  @last_received = 0
  @packet_id     = 0
  @packet        = nil
  @data          = ""
  @ex            = nil
  emit(@state)
end

#process_packet(packet) ⇒ Object



92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
# File 'lib/losant_mqtt/device_connection.rb', line 92

def process_packet(packet)
  @last_received = Time.now.to_i
  if state == :connect_sent && packet.class == MQTT::Packet::Connack
    connect_ack(packet)
  elsif state == :connected && packet.class == MQTT::Packet::Pingresp
    # Pong!
  elsif state == :connected && packet.class == MQTT::Packet::Publish
    @subscriptions[packet.topic].call(packet.payload) if @subscriptions[packet.topic]
  elsif state == :connected && packet.class == MQTT::Packet::Puback
    # publish acked
  elsif state == :connected && packet.class == MQTT::Packet::Suback
    # Subscribed!
  elsif state == :connected && packet.class == MQTT::Packet::Unsuback
    # Unsubscribed!
  else
    # CONNECT only sent by client
    # SUBSCRIBE only sent by client
    # PINGREQ only sent by client
    # UNSUBSCRIBE only sent by client
    # DISCONNECT only sent by client
    # PUBREC/PUBREL/PUBCOMP for QOS2 - do not support
    @ex = MQTT::ProtocolException.new("Wasn't expecting packet of type #{packet.class} when in state #{state}")
    close_connection
  end
end

#publish(topic, payload) ⇒ Object



52
53
54
55
56
57
58
59
# File 'lib/losant_mqtt/device_connection.rb', line 52

def publish(topic, payload)
  send_packet(MQTT::Packet::Publish.new(
    id:      next_packet_id,
    qos:     0,
    retain:  false,
    topic:   topic,
    payload: payload))
end

#receive_data(data) ⇒ Object



163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
# File 'lib/losant_mqtt/device_connection.rb', line 163

def receive_data(data)
  @data << data

  # Are we at the start of a new packet?
  if !@packet && @data.length >= 2
    @packet = MQTT::Packet.parse_header(@data)
  end

  # Do we have the the full packet body now?
  if @packet && @data.length >= @packet.body_length
    @packet.parse_body(@data.slice!(0...@packet.body_length))
    process_packet(@packet)
    @packet = nil
    receive_data("")
  end
end

#send_connect_packetObject



79
80
81
82
83
84
85
86
87
88
89
90
# File 'lib/losant_mqtt/device_connection.rb', line 79

def send_connect_packet
  packet = MQTT::Packet::Connect.new(
    client_id:     @options[:client_id],
    clean_session: @options[:clean_session],
    keep_alive:    @options[:keep_alive],
    username:      @options[:username],
    password:      @options[:password],
    version:       @options[:version])
  send_packet(packet)
  @state = :connect_sent
  emit(@state)
end

#send_packet(packet) ⇒ Object



144
145
146
# File 'lib/losant_mqtt/device_connection.rb', line 144

def send_packet(packet)
  send_data(packet.to_s)
end

#ssl_handshake_completedObject



220
221
222
223
224
225
226
227
# File 'lib/losant_mqtt/device_connection.rb', line 220

def ssl_handshake_completed
  unless OpenSSL::SSL.verify_certificate_identity(@last_seen_cert, @options[:host])
    @ex = OpenSSL::OpenSSLError.new("The hostname #{@options[:host]} does not match the server certificate")
    return close_connection
  end

  send_connect_packet
end

#ssl_verify_peer(cert_string) ⇒ Object



201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
# File 'lib/losant_mqtt/device_connection.rb', line 201

def ssl_verify_peer(cert_string)
  @last_seen_cert = OpenSSL::X509::Certificate.new(cert_string)
  unless @certificate_store.verify(@last_seen_cert)
    @ex = OpenSSL::OpenSSLError.new("Unable to verify the certificate for #{@options[:host]}")
    return false
  end

  begin
    @certificate_store.add_cert(@last_seen_cert)
  rescue OpenSSL::X509::StoreError => e
    unless e.message == "cert already in hash table"
      @ex = e
      return false
    end
  end

  true
end

#subscribe(topic, &block) ⇒ Object



61
62
63
64
# File 'lib/losant_mqtt/device_connection.rb', line 61

def subscribe(topic, &block)
  @subscriptions[topic] = block
  send_packet(MQTT::Packet::Subscribe.new(id: next_packet_id, topics: [topic]))
end

#unbind(msg) ⇒ Object



148
149
150
151
152
153
154
155
156
157
158
159
160
161
# File 'lib/losant_mqtt/device_connection.rb', line 148

def unbind(msg)
  if @timer
    @timer.cancel
    @timer = nil
  end

  unless @state == :disconnecting
    @ex ||= $! || MQTT::NotConnectedException.new("Connection to server lost")
  end

  @state = :disconnected
  emit(@state, @ex)
  @ex = nil
end

#unsubscribe(topic) ⇒ Object



66
67
68
69
# File 'lib/losant_mqtt/device_connection.rb', line 66

def unsubscribe(topic)
  @subscriptions.delete(topic)
  send_packet(MQTT::Packet::Unsubscribe.new(id: next_packet_id, topics: [topic]))
end