Class: MQTT::Client
- Inherits:
-
Object
- Object
- MQTT::Client
- Defined in:
- lib/qubitro-mqtt/client.rb
Constant Summary collapse
- SELECT_TIMEOUT =
Timeout between select polls (in seconds)
0.5
- ATTR_DEFAULTS =
Default attribute values
{ :host => nil, :port => nil, :version => '3.1.1', :keep_alive => 15, :clean_session => true, :client_id => nil, :device_id => nil, :device_token => nil, :will_topic => nil, :ack_timeout => 5, :will_payload => nil, :will_qos => 0, :will_retain => false, :ssl => false }
Instance Attribute Summary collapse
-
#ack_timeout ⇒ Object
Number of seconds to wait for acknowledgement packets (default is 5 seconds).
-
#clean_session ⇒ Object
Set the ‘Clean Session’ flag when connecting? (default is true).
-
#client_id ⇒ Object
Client Identifier.
-
#device_id ⇒ Object
Device ID to authenticate to the server with.
-
#device_token ⇒ Object
Device Token to authenticate to the server with.
-
#host ⇒ Object
Hostname of the remote server.
-
#keep_alive ⇒ Object
Time (in seconds) between pings to remote server (default is 15 seconds).
-
#last_ping_response ⇒ Object
readonly
Last ping response time.
-
#port ⇒ Object
Port number of the remote server.
-
#ssl ⇒ Object
Set to true to enable SSL/TLS encrypted communication.
-
#version ⇒ Object
The version number of the MQTT protocol to use (default 3.1.1).
-
#will_payload ⇒ Object
Contents of message that is sent by server when client disconnect.
-
#will_qos ⇒ Object
The QoS level of the will message sent by the server.
-
#will_retain ⇒ Object
If the Will message should be retain by the server after it is sent.
-
#will_topic ⇒ Object
The topic that the Will message is published to.
Class Method Summary collapse
-
.connect(*args, &block) ⇒ Object
Create and connect a new MQTT Client.
-
.generate_client_id(prefix = 'ruby', length = 16) ⇒ Object
Generate a random client identifier (using the characters 0-9 and a-z).
Instance Method Summary collapse
-
#ca_file=(path) ⇒ Object
Set a path to a file containing a PEM-format CA certificate and enable peer verification.
-
#cert=(cert) ⇒ Object
PEM-format client certificate.
-
#cert_file=(path) ⇒ Object
Set a path to a file containing a PEM-format client certificate.
-
#clear_queue ⇒ Object
Clear the incoming message queue.
-
#connect(clientid = nil) ⇒ Object
Connect to the MQTT server If a block is given, then yield to that block and then disconnect again.
-
#connected? ⇒ Boolean
Checks whether the client is connected to the server.
-
#disconnect(send_msg = true) ⇒ Object
Disconnect from the MQTT server.
-
#get(topic = nil, options = {}) ⇒ Object
Return the next message received from the MQTT server.
-
#get_packet(topic = nil) ⇒ Object
Return the next packet object received from the MQTT server.
-
#initialize(*args) ⇒ Client
constructor
Create a new MQTT Client instance.
-
#key=(*args) ⇒ Object
Set to a PEM-format client private key.
-
#key_file=(*args) ⇒ Object
Set a path to a file containing a PEM-format client private key.
-
#publish(topic, payload = '', retain = false, qos = 0) ⇒ Object
Publish a message on a particular topic to the MQTT server.
-
#queue_empty? ⇒ Boolean
Returns true if the incoming message queue is empty.
-
#queue_length ⇒ Object
Returns the length of the incoming message queue.
-
#remote_host ⇒ Object
deprecated
Deprecated.
Please use #host instead
-
#remote_host=(args) ⇒ Object
deprecated
Deprecated.
Please use #host= instead
-
#remote_port ⇒ Object
deprecated
Deprecated.
Please use #port instead
-
#remote_port=(args) ⇒ Object
deprecated
Deprecated.
Please use #port= instead
-
#set_will(topic, payload, retain = false, qos = 0) ⇒ Object
Set the Will for the client.
-
#ssl_context ⇒ Object
Get the OpenSSL context, that is used if SSL/TLS is enabled.
-
#subscribe(*topics) ⇒ Object
Send a subscribe message for one or more topics on the MQTT server.
-
#unsubscribe(*topics) ⇒ Object
Send a unsubscribe message for one or more topics on the MQTT server.
Constructor Details
#initialize(*args) ⇒ Client
Create a new MQTT Client instance
Accepts one of the following:
-
a URI that uses the MQTT scheme
-
a hostname and port
-
a Hash containing attributes to be set on the new instance
If no arguments are given then the method will look for a URI in the MQTT_SERVER environment variable.
Examples:
client = MQTT::Client.new
client = MQTT::Client.new('mqtt://myserver.example.com')
client = MQTT::Client.new('mqtt://user:[email protected]')
client = MQTT::Client.new('myserver.example.com')
client = MQTT::Client.new('myserver.example.com', 18830)
client = MQTT::Client.new(:host => 'myserver.example.com')
client = MQTT::Client.new(:host => 'myserver.example.com', :keep_alive => 30)
128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 |
# File 'lib/qubitro-mqtt/client.rb', line 128 def initialize(*args) attributes = args.last.is_a?(Hash) ? args.pop : {} # Set server URI from environment if present attributes.merge!(parse_uri(ENV['MQTT_SERVER'])) if args.length.zero? && ENV['MQTT_SERVER'] if args.length >= 1 case args[0] when URI attributes.merge!(parse_uri(args[0])) when %r{^mqtts?://} attributes.merge!(parse_uri(args[0])) else attributes[:host] = args[0] end end if args.length >= 2 attributes[:port] = args[1] unless args[1].nil? end raise ArgumentError, 'Unsupported number of arguments' if args.length >= 3 # Merge arguments with default values for attributes ATTR_DEFAULTS.merge(attributes).each_pair do |k, v| send("#{k}=", v) end # Set a default port number if @port.nil? @port = @ssl ? MQTT::DEFAULT_SSL_PORT : MQTT::DEFAULT_PORT end # Initialise private instance variables @last_ping_request = Time.now @last_ping_response = Time.now @socket = nil @read_queue = Queue.new @pubacks = {} @read_thread = nil @write_semaphore = Mutex.new @pubacks_semaphore = Mutex.new end |
Instance Attribute Details
#ack_timeout ⇒ Object
Number of seconds to wait for acknowledgement packets (default is 5 seconds)
36 37 38 |
# File 'lib/qubitro-mqtt/client.rb', line 36 def ack_timeout @ack_timeout end |
#clean_session ⇒ Object
Set the ‘Clean Session’ flag when connecting? (default is true)
30 31 32 |
# File 'lib/qubitro-mqtt/client.rb', line 30 def clean_session @clean_session end |
#client_id ⇒ Object
Client Identifier
33 34 35 |
# File 'lib/qubitro-mqtt/client.rb', line 33 def client_id @client_id end |
#device_id ⇒ Object
Device ID to authenticate to the server with
39 40 41 |
# File 'lib/qubitro-mqtt/client.rb', line 39 def device_id @device_id end |
#device_token ⇒ Object
Device Token to authenticate to the server with
42 43 44 |
# File 'lib/qubitro-mqtt/client.rb', line 42 def device_token @device_token end |
#host ⇒ Object
Hostname of the remote server
8 9 10 |
# File 'lib/qubitro-mqtt/client.rb', line 8 def host @host end |
#keep_alive ⇒ Object
Time (in seconds) between pings to remote server (default is 15 seconds)
27 28 29 |
# File 'lib/qubitro-mqtt/client.rb', line 27 def keep_alive @keep_alive end |
#last_ping_response ⇒ Object (readonly)
Last ping response time
57 58 59 |
# File 'lib/qubitro-mqtt/client.rb', line 57 def last_ping_response @last_ping_response end |
#port ⇒ Object
Port number of the remote server
11 12 13 |
# File 'lib/qubitro-mqtt/client.rb', line 11 def port @port end |
#ssl ⇒ Object
Set to true to enable SSL/TLS encrypted communication
Set to a symbol to use a specific variant of SSL/TLS. Allowed values include:
24 25 26 |
# File 'lib/qubitro-mqtt/client.rb', line 24 def ssl @ssl end |
#version ⇒ Object
The version number of the MQTT protocol to use (default 3.1.1)
14 15 16 |
# File 'lib/qubitro-mqtt/client.rb', line 14 def version @version end |
#will_payload ⇒ Object
Contents of message that is sent by server when client disconnect
48 49 50 |
# File 'lib/qubitro-mqtt/client.rb', line 48 def will_payload @will_payload end |
#will_qos ⇒ Object
The QoS level of the will message sent by the server
51 52 53 |
# File 'lib/qubitro-mqtt/client.rb', line 51 def will_qos @will_qos end |
#will_retain ⇒ Object
If the Will message should be retain by the server after it is sent
54 55 56 |
# File 'lib/qubitro-mqtt/client.rb', line 54 def will_retain @will_retain end |
#will_topic ⇒ Object
The topic that the Will message is published to
45 46 47 |
# File 'lib/qubitro-mqtt/client.rb', line 45 def will_topic @will_topic end |
Class Method Details
.connect(*args, &block) ⇒ Object
90 91 92 93 94 |
# File 'lib/qubitro-mqtt/client.rb', line 90 def self.connect(*args, &block) client = MQTT::Client.new(*args) client.connect(&block) client end |
.generate_client_id(prefix = 'ruby', length = 16) ⇒ Object
Generate a random client identifier (using the characters 0-9 and a-z)
98 99 100 101 102 103 104 105 106 107 |
# File 'lib/qubitro-mqtt/client.rb', line 98 def self.generate_client_id(prefix = 'ruby', length = 16) str = prefix.dup length.times do num = rand(36) # Adjust based on number or letter. num += num < 10 ? 48 : 87 str += num.chr end str end |
Instance Method Details
#ca_file=(path) ⇒ Object
Set a path to a file containing a PEM-format CA certificate and enable peer verification
200 201 202 203 |
# File 'lib/qubitro-mqtt/client.rb', line 200 def ca_file=(path) ssl_context.ca_file = path ssl_context.verify_mode = OpenSSL::SSL::VERIFY_PEER unless path.nil? end |
#cert=(cert) ⇒ Object
PEM-format client certificate
183 184 185 |
# File 'lib/qubitro-mqtt/client.rb', line 183 def cert=(cert) ssl_context.cert = OpenSSL::X509::Certificate.new(cert) end |
#cert_file=(path) ⇒ Object
Set a path to a file containing a PEM-format client certificate
178 179 180 |
# File 'lib/qubitro-mqtt/client.rb', line 178 def cert_file=(path) self.cert = File.read(path) end |
#clear_queue ⇒ Object
Clear the incoming message queue.
440 441 442 |
# File 'lib/qubitro-mqtt/client.rb', line 440 def clear_queue @read_queue.clear end |
#connect(clientid = nil) ⇒ Object
Connect to the MQTT server If a block is given, then yield to that block and then disconnect again.
218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 |
# File 'lib/qubitro-mqtt/client.rb', line 218 def connect(clientid = nil) @client_id = clientid unless clientid.nil? if @client_id.nil? || @client_id.empty? raise 'Must provide a client_id if clean_session is set to false' unless @clean_session # Empty client id is not allowed for version 3.1.0 @client_id = MQTT::Client.generate_client_id if @version == '3.1.0' end raise 'No MQTT server host set when attempting to connect' if @host.nil? unless connected? # Create network socket tcp_socket = TCPSocket.new(@host, @port) if @ssl # Set the protocol version ssl_context.ssl_version = @ssl if @ssl.is_a?(Symbol) @socket = OpenSSL::SSL::SSLSocket.new(tcp_socket, ssl_context) @socket.sync_close = true # Set hostname on secure socket for Server Name Indication (SNI) @socket.hostname = @host if @socket.respond_to?(:hostname=) @socket.connect else @socket = tcp_socket end # Construct a connect packet packet = MQTT::Packet::Connect.new( :version => @version, :clean_session => @clean_session, :keep_alive => @keep_alive, :client_id => @client_id, :device_id => @device_id, :device_token => @device_token, :will_topic => @will_topic, :will_payload => @will_payload, :will_qos => @will_qos, :will_retain => @will_retain ) # Send packet send_packet(packet) # Receive response receive_connack # Start packet reading thread @read_thread = Thread.new(Thread.current) do |parent| Thread.current[:parent] = parent receive_packet while connected? end end return unless block_given? # If a block is given, then yield and disconnect begin yield(self) ensure disconnect end end |
#connected? ⇒ Boolean
Checks whether the client is connected to the server.
306 307 308 |
# File 'lib/qubitro-mqtt/client.rb', line 306 def connected? !@socket.nil? && !@socket.closed? end |
#disconnect(send_msg = true) ⇒ Object
Disconnect from the MQTT server. If you don’t want to say goodbye to the server, set send_msg to false.
288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 |
# File 'lib/qubitro-mqtt/client.rb', line 288 def disconnect(send_msg = true) # Stop reading packets from the socket first @read_thread.kill if @read_thread && @read_thread.alive? @read_thread = nil return unless connected? # Close the socket if it is open if send_msg packet = MQTT::Packet::Disconnect.new send_packet(packet) end @socket.close unless @socket.nil? handle_close @socket = nil end |
#get(topic = nil, options = {}) ⇒ Object
Return the next message received from the MQTT server. An optional topic can be given to subscribe to.
The method either returns the topic and message as an array:
topic, = client.get
Or can be used with a block to keep processing messages:
client.get('test') do |topic,payload|
# Do stuff here
end
383 384 385 386 387 388 389 390 391 392 393 394 395 |
# File 'lib/qubitro-mqtt/client.rb', line 383 def get(topic = nil, = {}) if block_given? get_packet(topic) do |packet| yield(packet.topic, packet.payload) unless packet.retain && [:omit_retained] end else loop do # Wait for one packet to be available packet = get_packet(topic) return packet.topic, packet.payload unless packet.retain && [:omit_retained] end end end |
#get_packet(topic = nil) ⇒ Object
Return the next packet object received from the MQTT server. An optional topic can be given to subscribe to.
The method either returns a single packet:
packet = client.get_packet
puts packet.topic
Or can be used with a block to keep processing messages:
client.get_packet('test') do |packet|
# Do stuff here
puts packet.topic
end
410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 |
# File 'lib/qubitro-mqtt/client.rb', line 410 def get_packet(topic = nil) # Subscribe to a topic, if an argument is given subscribe(topic) unless topic.nil? if block_given? # Loop forever! loop do packet = @read_queue.pop yield(packet) puback_packet(packet) if packet.qos > 0 end else # Wait for one packet to be available packet = @read_queue.pop puback_packet(packet) if packet.qos > 0 return packet end end |
#key=(*args) ⇒ Object
Set to a PEM-format client private key
194 195 196 197 |
# File 'lib/qubitro-mqtt/client.rb', line 194 def key=(*args) cert, passphrase = args.flatten ssl_context.key = OpenSSL::PKey::RSA.new(cert, passphrase) end |
#key_file=(*args) ⇒ Object
Set a path to a file containing a PEM-format client private key
188 189 190 191 |
# File 'lib/qubitro-mqtt/client.rb', line 188 def key_file=(*args) path, passphrase = args.flatten ssl_context.key = OpenSSL::PKey::RSA.new(File.open(path), passphrase) end |
#publish(topic, payload = '', retain = false, qos = 0) ⇒ Object
Publish a message on a particular topic to the MQTT server.
311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 |
# File 'lib/qubitro-mqtt/client.rb', line 311 def publish(topic, payload = '', retain = false, qos = 0) raise ArgumentError, 'Topic name cannot be nil' if topic.nil? raise ArgumentError, 'Topic name cannot be empty' if topic.empty? packet = MQTT::Packet::Publish.new( :id => next_packet_id, :qos => qos, :retain => retain, :topic => topic, :payload => payload ) # Send the packet res = send_packet(packet) return if qos.zero? queue = Queue.new wait_for_puback packet.id, queue deadline = current_time + @ack_timeout loop do response = queue.pop case response when :read_timeout return -1 if current_time > deadline when :close return -1 else @pubacks_semaphore.synchronize do @pubacks.delete packet.id end break end end res end |
#queue_empty? ⇒ Boolean
Returns true if the incoming message queue is empty.
430 431 432 |
# File 'lib/qubitro-mqtt/client.rb', line 430 def queue_empty? @read_queue.empty? end |
#queue_length ⇒ Object
Returns the length of the incoming message queue.
435 436 437 |
# File 'lib/qubitro-mqtt/client.rb', line 435 def queue_length @read_queue.length end |
#remote_host ⇒ Object
Please use #host instead
598 599 600 |
# File 'lib/qubitro-mqtt/client.rb', line 598 def remote_host host end |
#remote_host=(args) ⇒ Object
Please use #host= instead
603 604 605 |
# File 'lib/qubitro-mqtt/client.rb', line 603 def remote_host=(args) self.host = args end |
#remote_port ⇒ Object
Please use #port instead
608 609 610 |
# File 'lib/qubitro-mqtt/client.rb', line 608 def remote_port port end |
#remote_port=(args) ⇒ Object
Please use #port= instead
613 614 615 |
# File 'lib/qubitro-mqtt/client.rb', line 613 def remote_port=(args) self.port = args end |
#set_will(topic, payload, retain = false, qos = 0) ⇒ Object
Set the Will for the client
The will is a message that will be delivered by the server when the client dies. The Will must be set before establishing a connection to the server
209 210 211 212 213 214 |
# File 'lib/qubitro-mqtt/client.rb', line 209 def set_will(topic, payload, retain = false, qos = 0) self.will_topic = topic self.will_payload = payload self.will_retain = retain self.will_qos = qos end |
#ssl_context ⇒ Object
Get the OpenSSL context, that is used if SSL/TLS is enabled
173 174 175 |
# File 'lib/qubitro-mqtt/client.rb', line 173 def ssl_context @ssl_context ||= OpenSSL::SSL::SSLContext.new end |
#subscribe(*topics) ⇒ Object
Send a subscribe message for one or more topics on the MQTT server. The topics parameter should be one of the following:
-
String: subscribe to one topic with QoS 0
-
Array: subscribe to multiple topics with QoS 0
-
Hash: subscribe to multiple topics where the key is the topic and the value is the QoS level
For example:
client.subscribe( 'a/b' )
client.subscribe( 'a/b', 'c/d' )
client.subscribe( ['a/b',0], ['c/d',1] )
client.subscribe( 'a/b' => 0, 'c/d' => 1 )
364 365 366 367 368 369 370 |
# File 'lib/qubitro-mqtt/client.rb', line 364 def subscribe(*topics) packet = MQTT::Packet::Subscribe.new( :id => next_packet_id, :topics => topics ) send_packet(packet) end |
#unsubscribe(*topics) ⇒ Object
Send a unsubscribe message for one or more topics on the MQTT server
445 446 447 448 449 450 451 452 453 |
# File 'lib/qubitro-mqtt/client.rb', line 445 def unsubscribe(*topics) topics = topics.first if topics.is_a?(Enumerable) && topics.count == 1 packet = MQTT::Packet::Unsubscribe.new( :topics => topics, :id => next_packet_id ) send_packet(packet) end |