Class: MQTT::Client

Inherits:
Object
  • Object
show all
Defined in:
lib/qubitro-mqtt/client.rb

Constant Summary collapse

SELECT_TIMEOUT =

Timeout between select polls (in seconds)

0.5
ATTR_DEFAULTS =

Default attribute values

{
  :host => nil,
  :port => nil,
  :version => '3.1.1',
  :keep_alive => 15,
  :clean_session => true,
  :client_id => nil,
  :device_id => nil,
  :device_token => nil,
  :will_topic => nil,
  :ack_timeout => 5,
  :will_payload => nil,
  :will_qos => 0,
  :will_retain => false,
  :ssl => false
}

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(*args) ⇒ Client

Create a new MQTT Client instance

Accepts one of the following:

  • a URI that uses the MQTT scheme

  • a hostname and port

  • a Hash containing attributes to be set on the new instance

If no arguments are given then the method will look for a URI in the MQTT_SERVER environment variable.

Examples:

client = MQTT::Client.new
client = MQTT::Client.new('mqtt://myserver.example.com')
client = MQTT::Client.new('mqtt://user:[email protected]')
client = MQTT::Client.new('myserver.example.com')
client = MQTT::Client.new('myserver.example.com', 18830)
client = MQTT::Client.new(:host => 'myserver.example.com')
client = MQTT::Client.new(:host => 'myserver.example.com', :keep_alive => 30)

Raises:

  • (ArgumentError)


128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
# File 'lib/qubitro-mqtt/client.rb', line 128

def initialize(*args)
  attributes = args.last.is_a?(Hash) ? args.pop : {}

  # Set server URI from environment if present
  attributes.merge!(parse_uri(ENV['MQTT_SERVER'])) if args.length.zero? && ENV['MQTT_SERVER']

  if args.length >= 1
    case args[0]
    when URI
      attributes.merge!(parse_uri(args[0]))
    when %r{^mqtts?://}
      attributes.merge!(parse_uri(args[0]))
    else
      attributes[:host] = args[0]
    end
  end

  if args.length >= 2
    attributes[:port] = args[1] unless args[1].nil?
  end

  raise ArgumentError, 'Unsupported number of arguments' if args.length >= 3

  # Merge arguments with default values for attributes
  ATTR_DEFAULTS.merge(attributes).each_pair do |k, v|
    send("#{k}=", v)
  end

  # Set a default port number
  if @port.nil?
    @port = @ssl ? MQTT::DEFAULT_SSL_PORT : MQTT::DEFAULT_PORT
  end

  # Initialise private instance variables
  @last_ping_request = Time.now
  @last_ping_response = Time.now
  @socket = nil
  @read_queue = Queue.new
  @pubacks = {}
  @read_thread = nil
  @write_semaphore = Mutex.new
  @pubacks_semaphore = Mutex.new
end

Instance Attribute Details

#ack_timeoutObject

Number of seconds to wait for acknowledgement packets (default is 5 seconds)



36
37
38
# File 'lib/qubitro-mqtt/client.rb', line 36

def ack_timeout
  @ack_timeout
end

#clean_sessionObject

Set the ‘Clean Session’ flag when connecting? (default is true)



30
31
32
# File 'lib/qubitro-mqtt/client.rb', line 30

def clean_session
  @clean_session
end

#client_idObject

Client Identifier



33
34
35
# File 'lib/qubitro-mqtt/client.rb', line 33

def client_id
  @client_id
end

#device_idObject

Device ID to authenticate to the server with



39
40
41
# File 'lib/qubitro-mqtt/client.rb', line 39

def device_id
  @device_id
end

#device_tokenObject

Device Token to authenticate to the server with



42
43
44
# File 'lib/qubitro-mqtt/client.rb', line 42

def device_token
  @device_token
end

#hostObject

Hostname of the remote server



8
9
10
# File 'lib/qubitro-mqtt/client.rb', line 8

def host
  @host
end

#keep_aliveObject

Time (in seconds) between pings to remote server (default is 15 seconds)



27
28
29
# File 'lib/qubitro-mqtt/client.rb', line 27

def keep_alive
  @keep_alive
end

#last_ping_responseObject (readonly)

Last ping response time



57
58
59
# File 'lib/qubitro-mqtt/client.rb', line 57

def last_ping_response
  @last_ping_response
end

#portObject

Port number of the remote server



11
12
13
# File 'lib/qubitro-mqtt/client.rb', line 11

def port
  @port
end

#sslObject

Set to true to enable SSL/TLS encrypted communication

Set to a symbol to use a specific variant of SSL/TLS. Allowed values include:

Examples:

Using TLS 1.0

client = Client.new('mqtt.example.com', :ssl => :TLSv1)

See Also:

  • OpenSSL::SSL::SSLContext::METHODS


24
25
26
# File 'lib/qubitro-mqtt/client.rb', line 24

def ssl
  @ssl
end

#versionObject

The version number of the MQTT protocol to use (default 3.1.1)



14
15
16
# File 'lib/qubitro-mqtt/client.rb', line 14

def version
  @version
end

#will_payloadObject

Contents of message that is sent by server when client disconnect



48
49
50
# File 'lib/qubitro-mqtt/client.rb', line 48

def will_payload
  @will_payload
end

#will_qosObject

The QoS level of the will message sent by the server



51
52
53
# File 'lib/qubitro-mqtt/client.rb', line 51

def will_qos
  @will_qos
end

#will_retainObject

If the Will message should be retain by the server after it is sent



54
55
56
# File 'lib/qubitro-mqtt/client.rb', line 54

def will_retain
  @will_retain
end

#will_topicObject

The topic that the Will message is published to



45
46
47
# File 'lib/qubitro-mqtt/client.rb', line 45

def will_topic
  @will_topic
end

Class Method Details

.connect(*args, &block) ⇒ Object

Create and connect a new MQTT Client

Accepts the same arguments as creating a new client. If a block is given, then it will be executed before disconnecting again.

Example:

MQTT::Client.connect('myserver.example.com') do |client|
  # do stuff here
end


90
91
92
93
94
# File 'lib/qubitro-mqtt/client.rb', line 90

def self.connect(*args, &block)
  client = MQTT::Client.new(*args)
  client.connect(&block)
  client
end

.generate_client_id(prefix = 'ruby', length = 16) ⇒ Object

Generate a random client identifier (using the characters 0-9 and a-z)



98
99
100
101
102
103
104
105
106
107
# File 'lib/qubitro-mqtt/client.rb', line 98

def self.generate_client_id(prefix = 'ruby', length = 16)
  str = prefix.dup
  length.times do
    num = rand(36)
    # Adjust based on number or letter.
    num += num < 10 ? 48 : 87
    str += num.chr
  end
  str
end

Instance Method Details

#ca_file=(path) ⇒ Object

Set a path to a file containing a PEM-format CA certificate and enable peer verification



200
201
202
203
# File 'lib/qubitro-mqtt/client.rb', line 200

def ca_file=(path)
  ssl_context.ca_file = path
  ssl_context.verify_mode = OpenSSL::SSL::VERIFY_PEER unless path.nil?
end

#cert=(cert) ⇒ Object

PEM-format client certificate



183
184
185
# File 'lib/qubitro-mqtt/client.rb', line 183

def cert=(cert)
  ssl_context.cert = OpenSSL::X509::Certificate.new(cert)
end

#cert_file=(path) ⇒ Object

Set a path to a file containing a PEM-format client certificate



178
179
180
# File 'lib/qubitro-mqtt/client.rb', line 178

def cert_file=(path)
  self.cert = File.read(path)
end

#clear_queueObject

Clear the incoming message queue.



440
441
442
# File 'lib/qubitro-mqtt/client.rb', line 440

def clear_queue
  @read_queue.clear
end

#connect(clientid = nil) ⇒ Object

Connect to the MQTT server If a block is given, then yield to that block and then disconnect again.



218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
# File 'lib/qubitro-mqtt/client.rb', line 218

def connect(clientid = nil)
  @client_id = clientid unless clientid.nil?

  if @client_id.nil? || @client_id.empty?
    raise 'Must provide a client_id if clean_session is set to false' unless @clean_session

    # Empty client id is not allowed for version 3.1.0
    @client_id = MQTT::Client.generate_client_id if @version == '3.1.0'
  end

  raise 'No MQTT server host set when attempting to connect' if @host.nil?

  unless connected?
    # Create network socket
    tcp_socket = TCPSocket.new(@host, @port)

    if @ssl
      # Set the protocol version
      ssl_context.ssl_version = @ssl if @ssl.is_a?(Symbol)

      @socket = OpenSSL::SSL::SSLSocket.new(tcp_socket, ssl_context)
      @socket.sync_close = true

      # Set hostname on secure socket for Server Name Indication (SNI)
      @socket.hostname = @host if @socket.respond_to?(:hostname=)

      @socket.connect
    else
      @socket = tcp_socket
    end

    # Construct a connect packet
    packet = MQTT::Packet::Connect.new(
      :version => @version,
      :clean_session => @clean_session,
      :keep_alive => @keep_alive,
      :client_id => @client_id,
      :device_id => @device_id,
      :device_token => @device_token,
      :will_topic => @will_topic,
      :will_payload => @will_payload,
      :will_qos => @will_qos,
      :will_retain => @will_retain
    )

    # Send packet
    send_packet(packet)

    # Receive response
    receive_connack

    # Start packet reading thread
    @read_thread = Thread.new(Thread.current) do |parent|
      Thread.current[:parent] = parent
      receive_packet while connected?
    end
  end

  return unless block_given?

  # If a block is given, then yield and disconnect
  begin
    yield(self)
  ensure
    disconnect
  end
end

#connected?Boolean

Checks whether the client is connected to the server.

Returns:

  • (Boolean)


306
307
308
# File 'lib/qubitro-mqtt/client.rb', line 306

def connected?
  !@socket.nil? && !@socket.closed?
end

#disconnect(send_msg = true) ⇒ Object

Disconnect from the MQTT server. If you don’t want to say goodbye to the server, set send_msg to false.



288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
# File 'lib/qubitro-mqtt/client.rb', line 288

def disconnect(send_msg = true)
  # Stop reading packets from the socket first
  @read_thread.kill if @read_thread && @read_thread.alive?
  @read_thread = nil

  return unless connected?

  # Close the socket if it is open
  if send_msg
    packet = MQTT::Packet::Disconnect.new
    send_packet(packet)
  end
  @socket.close unless @socket.nil?
  handle_close
  @socket = nil
end

#get(topic = nil, options = {}) ⇒ Object

Return the next message received from the MQTT server. An optional topic can be given to subscribe to.

The method either returns the topic and message as an array:

topic,message = client.get

Or can be used with a block to keep processing messages:

client.get('test') do |topic,payload|
  # Do stuff here
end


383
384
385
386
387
388
389
390
391
392
393
394
395
# File 'lib/qubitro-mqtt/client.rb', line 383

def get(topic = nil, options = {})
  if block_given?
    get_packet(topic) do |packet|
      yield(packet.topic, packet.payload) unless packet.retain && options[:omit_retained]
    end
  else
    loop do
      # Wait for one packet to be available
      packet = get_packet(topic)
      return packet.topic, packet.payload unless packet.retain && options[:omit_retained]
    end
  end
end

#get_packet(topic = nil) ⇒ Object

Return the next packet object received from the MQTT server. An optional topic can be given to subscribe to.

The method either returns a single packet:

packet = client.get_packet
puts packet.topic

Or can be used with a block to keep processing messages:

client.get_packet('test') do |packet|
  # Do stuff here
  puts packet.topic
end


410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
# File 'lib/qubitro-mqtt/client.rb', line 410

def get_packet(topic = nil)
  # Subscribe to a topic, if an argument is given
  subscribe(topic) unless topic.nil?

  if block_given?
    # Loop forever!
    loop do
      packet = @read_queue.pop
      yield(packet)
      puback_packet(packet) if packet.qos > 0
    end
  else
    # Wait for one packet to be available
    packet = @read_queue.pop
    puback_packet(packet) if packet.qos > 0
    return packet
  end
end

#key=(*args) ⇒ Object

Set to a PEM-format client private key



194
195
196
197
# File 'lib/qubitro-mqtt/client.rb', line 194

def key=(*args)
  cert, passphrase = args.flatten
  ssl_context.key = OpenSSL::PKey::RSA.new(cert, passphrase)
end

#key_file=(*args) ⇒ Object

Set a path to a file containing a PEM-format client private key



188
189
190
191
# File 'lib/qubitro-mqtt/client.rb', line 188

def key_file=(*args)
  path, passphrase = args.flatten
  ssl_context.key = OpenSSL::PKey::RSA.new(File.open(path), passphrase)
end

#publish(topic, payload = '', retain = false, qos = 0) ⇒ Object

Publish a message on a particular topic to the MQTT server.

Raises:

  • (ArgumentError)


311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
# File 'lib/qubitro-mqtt/client.rb', line 311

def publish(topic, payload = '', retain = false, qos = 0)
  raise ArgumentError, 'Topic name cannot be nil' if topic.nil?
  raise ArgumentError, 'Topic name cannot be empty' if topic.empty?

  packet = MQTT::Packet::Publish.new(
    :id => next_packet_id,
    :qos => qos,
    :retain => retain,
    :topic => topic,
    :payload => payload
  )

  # Send the packet
  res = send_packet(packet)

  return if qos.zero?

  queue = Queue.new

  wait_for_puback packet.id, queue

  deadline = current_time + @ack_timeout

  loop do
    response = queue.pop
    case response
    when :read_timeout
      return -1 if current_time > deadline
    when :close
      return -1
    else
      @pubacks_semaphore.synchronize do
        @pubacks.delete packet.id
      end
      break
    end
  end

  res
end

#queue_empty?Boolean

Returns true if the incoming message queue is empty.

Returns:

  • (Boolean)


430
431
432
# File 'lib/qubitro-mqtt/client.rb', line 430

def queue_empty?
  @read_queue.empty?
end

#queue_lengthObject

Returns the length of the incoming message queue.



435
436
437
# File 'lib/qubitro-mqtt/client.rb', line 435

def queue_length
  @read_queue.length
end

#remote_hostObject

Deprecated.

Please use #host instead



598
599
600
# File 'lib/qubitro-mqtt/client.rb', line 598

def remote_host
  host
end

#remote_host=(args) ⇒ Object

Deprecated.

Please use #host= instead



603
604
605
# File 'lib/qubitro-mqtt/client.rb', line 603

def remote_host=(args)
  self.host = args
end

#remote_portObject

Deprecated.

Please use #port instead



608
609
610
# File 'lib/qubitro-mqtt/client.rb', line 608

def remote_port
  port
end

#remote_port=(args) ⇒ Object

Deprecated.

Please use #port= instead



613
614
615
# File 'lib/qubitro-mqtt/client.rb', line 613

def remote_port=(args)
  self.port = args
end

#set_will(topic, payload, retain = false, qos = 0) ⇒ Object

Set the Will for the client

The will is a message that will be delivered by the server when the client dies. The Will must be set before establishing a connection to the server



209
210
211
212
213
214
# File 'lib/qubitro-mqtt/client.rb', line 209

def set_will(topic, payload, retain = false, qos = 0)
  self.will_topic = topic
  self.will_payload = payload
  self.will_retain = retain
  self.will_qos = qos
end

#ssl_contextObject

Get the OpenSSL context, that is used if SSL/TLS is enabled



173
174
175
# File 'lib/qubitro-mqtt/client.rb', line 173

def ssl_context
  @ssl_context ||= OpenSSL::SSL::SSLContext.new
end

#subscribe(*topics) ⇒ Object

Send a subscribe message for one or more topics on the MQTT server. The topics parameter should be one of the following:

  • String: subscribe to one topic with QoS 0

  • Array: subscribe to multiple topics with QoS 0

  • Hash: subscribe to multiple topics where the key is the topic and the value is the QoS level

For example:

client.subscribe( 'a/b' )
client.subscribe( 'a/b', 'c/d' )
client.subscribe( ['a/b',0], ['c/d',1] )
client.subscribe( 'a/b' => 0, 'c/d' => 1 )


364
365
366
367
368
369
370
# File 'lib/qubitro-mqtt/client.rb', line 364

def subscribe(*topics)
  packet = MQTT::Packet::Subscribe.new(
    :id => next_packet_id,
    :topics => topics
  )
  send_packet(packet)
end

#unsubscribe(*topics) ⇒ Object

Send a unsubscribe message for one or more topics on the MQTT server



445
446
447
448
449
450
451
452
453
# File 'lib/qubitro-mqtt/client.rb', line 445

def unsubscribe(*topics)
  topics = topics.first if topics.is_a?(Enumerable) && topics.count == 1

  packet = MQTT::Packet::Unsubscribe.new(
    :topics => topics,
    :id => next_packet_id
  )
  send_packet(packet)
end