Class: MQTT::Client
- Inherits:
-
Object
- Object
- MQTT::Client
- Defined in:
- lib/mqtt/client.rb
Constant Summary collapse
- SELECT_TIMEOUT =
Timeout between select polls (in seconds)
0.5- ATTR_DEFAULTS =
Default attribute values
{ :host => nil, :port => nil, :version => '3.1.1', :keep_alive => 15, :clean_session => true, :client_id => nil, :ack_timeout => 5, :connect_timeout => 30, :username => nil, :password => nil, :will_topic => nil, :will_payload => nil, :will_qos => 0, :will_retain => false, :ssl => false, :verify_host => true }
Instance Attribute Summary collapse
-
#ack_timeout ⇒ Object
Number of seconds to wait for acknowledgement packets (default is 5 seconds).
-
#clean_session ⇒ Object
Set the ‘Clean Session’ flag when connecting? (default is true).
-
#client_id ⇒ Object
Client Identifier.
-
#connect_timeout ⇒ Object
Number of seconds to connect to the server (default is 90 seconds).
-
#host ⇒ Object
Hostname of the remote server.
-
#keep_alive ⇒ Object
Time (in seconds) between pings to remote server (default is 15 seconds).
-
#last_ping_response ⇒ Object
readonly
Last ping response time.
-
#password ⇒ Object
Password to authenticate to the server with.
-
#port ⇒ Object
Port number of the remote server.
-
#ssl ⇒ Object
Set to true to enable SSL/TLS encrypted communication.
-
#username ⇒ Object
Username to authenticate to the server with.
-
#verify_host ⇒ Object
Set to false to skip tls hostname verification.
-
#version ⇒ Object
The version number of the MQTT protocol to use (default 3.1.1).
-
#will_payload ⇒ Object
Contents of message that is sent by server when client disconnect.
-
#will_qos ⇒ Object
The QoS level of the will message sent by the server.
-
#will_retain ⇒ Object
If the Will message should be retain by the server after it is sent.
-
#will_topic ⇒ Object
The topic that the Will message is published to.
Class Method Summary collapse
-
.connect(*args, &block) ⇒ Object
Create and connect a new MQTT Client.
-
.generate_client_id(prefix = 'ruby', length = 16) ⇒ Object
Generate a random client identifier (using the characters 0-9 and a-z).
Instance Method Summary collapse
-
#ca_file=(path) ⇒ Object
Set a path to a file containing a PEM-format CA certificate and enable peer verification.
-
#cert=(cert) ⇒ Object
PEM-format client certificate.
-
#cert_file=(path) ⇒ Object
Set a path to a file containing a PEM-format client certificate.
-
#clear_queue ⇒ Object
Clear the incoming message queue.
-
#connect(clientid = nil) ⇒ Object
Connect to the MQTT server If a block is given, then yield to that block and then disconnect again.
-
#connected? ⇒ Boolean
Checks whether the client is connected to the server.
-
#disconnect(send_msg = true) ⇒ Object
Disconnect from the MQTT server.
-
#get(topic = nil, options = {}) ⇒ Object
Return the next message received from the MQTT server.
-
#get_packet(topic = nil) ⇒ Object
Return the next packet object received from the MQTT server.
-
#initialize(*args) ⇒ Client
constructor
Create a new MQTT Client instance.
-
#key=(*args) ⇒ Object
Set to a PEM-format client private key.
-
#key_file=(*args) ⇒ Object
Set a path to a file containing a PEM-format client private key.
-
#publish(topic, payload = '', retain = false, qos = 0) ⇒ Object
Publish a message on a particular topic to the MQTT server.
-
#queue_empty? ⇒ Boolean
Returns true if the incoming message queue is empty.
-
#queue_length ⇒ Object
Returns the length of the incoming message queue.
-
#remote_host ⇒ Object
deprecated
Deprecated.
Please use #host instead
-
#remote_host=(args) ⇒ Object
deprecated
Deprecated.
Please use #host= instead
-
#remote_port ⇒ Object
deprecated
Deprecated.
Please use #port instead
-
#remote_port=(args) ⇒ Object
deprecated
Deprecated.
Please use #port= instead
-
#set_will(topic, payload, retain = false, qos = 0) ⇒ Object
Set the Will for the client.
-
#ssl_context ⇒ Object
Get the OpenSSL context, that is used if SSL/TLS is enabled.
-
#subscribe(*topics) ⇒ Object
Send a subscribe message for one or more topics on the MQTT server.
-
#unsubscribe(*topics) ⇒ Object
Send a unsubscribe message for one or more topics on the MQTT server.
Constructor Details
#initialize(*args) ⇒ Client
Create a new MQTT Client instance
Accepts one of the following:
-
a URI that uses the MQTT scheme
-
a hostname and port
-
a Hash containing attributes to be set on the new instance
If no arguments are given then the method will look for a URI in the MQTT_SERVER environment variable.
Examples:
client = MQTT::Client.new
client = MQTT::Client.new('mqtt://myserver.example.com')
client = MQTT::Client.new('mqtt://user:[email protected]')
client = MQTT::Client.new('myserver.example.com')
client = MQTT::Client.new('myserver.example.com', 18830)
client = MQTT::Client.new(:host => 'myserver.example.com')
client = MQTT::Client.new(:host => 'myserver.example.com', :keep_alive => 30)
136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 |
# File 'lib/mqtt/client.rb', line 136 def initialize(*args) attributes = args.last.is_a?(Hash) ? args.pop : {} # Set server URI from environment if present attributes.merge!(parse_uri(ENV['MQTT_SERVER'])) if args.length.zero? && ENV['MQTT_SERVER'] if args.length >= 1 case args[0] when URI attributes.merge!(parse_uri(args[0])) when %r{^mqtts?://} attributes.merge!(parse_uri(args[0])) else attributes[:host] = args[0] end end if args.length >= 2 attributes[:port] = args[1] unless args[1].nil? end raise ArgumentError, 'Unsupported number of arguments' if args.length >= 3 # Merge arguments with default values for attributes ATTR_DEFAULTS.merge(attributes).each_pair do |k, v| send("#{k}=", v) end # Set a default port number if @port.nil? @port = @ssl ? MQTT::DEFAULT_SSL_PORT : MQTT::DEFAULT_PORT end if @ssl require 'openssl' require 'mqtt/openssl_fix' end # Initialise private instance variables @last_ping_request = current_time @last_ping_response = current_time @socket = nil @read_queue = Queue.new @pubacks = {} @read_thread = nil @write_semaphore = Mutex.new @pubacks_semaphore = Mutex.new end |
Instance Attribute Details
#ack_timeout ⇒ Object
Number of seconds to wait for acknowledgement packets (default is 5 seconds)
39 40 41 |
# File 'lib/mqtt/client.rb', line 39 def ack_timeout @ack_timeout end |
#clean_session ⇒ Object
Set the ‘Clean Session’ flag when connecting? (default is true)
33 34 35 |
# File 'lib/mqtt/client.rb', line 33 def clean_session @clean_session end |
#client_id ⇒ Object
Client Identifier
36 37 38 |
# File 'lib/mqtt/client.rb', line 36 def client_id @client_id end |
#connect_timeout ⇒ Object
Number of seconds to connect to the server (default is 90 seconds)
42 43 44 |
# File 'lib/mqtt/client.rb', line 42 def connect_timeout @connect_timeout end |
#host ⇒ Object
Hostname of the remote server
8 9 10 |
# File 'lib/mqtt/client.rb', line 8 def host @host end |
#keep_alive ⇒ Object
Time (in seconds) between pings to remote server (default is 15 seconds)
30 31 32 |
# File 'lib/mqtt/client.rb', line 30 def keep_alive @keep_alive end |
#last_ping_response ⇒ Object (readonly)
Last ping response time
63 64 65 |
# File 'lib/mqtt/client.rb', line 63 def last_ping_response @last_ping_response end |
#password ⇒ Object
Password to authenticate to the server with
48 49 50 |
# File 'lib/mqtt/client.rb', line 48 def password @password end |
#port ⇒ Object
Port number of the remote server
11 12 13 |
# File 'lib/mqtt/client.rb', line 11 def port @port end |
#ssl ⇒ Object
Set to true to enable SSL/TLS encrypted communication
Set to a symbol to use a specific variant of SSL/TLS. Allowed values include:
24 25 26 |
# File 'lib/mqtt/client.rb', line 24 def ssl @ssl end |
#username ⇒ Object
Username to authenticate to the server with
45 46 47 |
# File 'lib/mqtt/client.rb', line 45 def username @username end |
#verify_host ⇒ Object
Set to false to skip tls hostname verification
27 28 29 |
# File 'lib/mqtt/client.rb', line 27 def verify_host @verify_host end |
#version ⇒ Object
The version number of the MQTT protocol to use (default 3.1.1)
14 15 16 |
# File 'lib/mqtt/client.rb', line 14 def version @version end |
#will_payload ⇒ Object
Contents of message that is sent by server when client disconnect
54 55 56 |
# File 'lib/mqtt/client.rb', line 54 def will_payload @will_payload end |
#will_qos ⇒ Object
The QoS level of the will message sent by the server
57 58 59 |
# File 'lib/mqtt/client.rb', line 57 def will_qos @will_qos end |
#will_retain ⇒ Object
If the Will message should be retain by the server after it is sent
60 61 62 |
# File 'lib/mqtt/client.rb', line 60 def will_retain @will_retain end |
#will_topic ⇒ Object
The topic that the Will message is published to
51 52 53 |
# File 'lib/mqtt/client.rb', line 51 def will_topic @will_topic end |
Class Method Details
.connect(*args, &block) ⇒ Object
98 99 100 101 102 |
# File 'lib/mqtt/client.rb', line 98 def self.connect(*args, &block) client = MQTT::Client.new(*args) client.connect(&block) client end |
.generate_client_id(prefix = 'ruby', length = 16) ⇒ Object
Generate a random client identifier (using the characters 0-9 and a-z)
106 107 108 109 110 111 112 113 114 115 |
# File 'lib/mqtt/client.rb', line 106 def self.generate_client_id(prefix = 'ruby', length = 16) str = prefix.dup length.times do num = rand(36) # Adjust based on number or letter. num += num < 10 ? 48 : 87 str += num.chr end str end |
Instance Method Details
#ca_file=(path) ⇒ Object
Set a path to a file containing a PEM-format CA certificate and enable peer verification
213 214 215 216 |
# File 'lib/mqtt/client.rb', line 213 def ca_file=(path) ssl_context.ca_file = path ssl_context.verify_mode = OpenSSL::SSL::VERIFY_PEER unless path.nil? end |
#cert=(cert) ⇒ Object
PEM-format client certificate
196 197 198 |
# File 'lib/mqtt/client.rb', line 196 def cert=(cert) ssl_context.cert = OpenSSL::X509::Certificate.new(cert) end |
#cert_file=(path) ⇒ Object
Set a path to a file containing a PEM-format client certificate
191 192 193 |
# File 'lib/mqtt/client.rb', line 191 def cert_file=(path) self.cert = File.read(path) end |
#clear_queue ⇒ Object
Clear the incoming message queue.
452 453 454 |
# File 'lib/mqtt/client.rb', line 452 def clear_queue @read_queue.clear end |
#connect(clientid = nil) ⇒ Object
Connect to the MQTT server If a block is given, then yield to that block and then disconnect again.
231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 |
# File 'lib/mqtt/client.rb', line 231 def connect(clientid = nil) @client_id = clientid unless clientid.nil? if @client_id.nil? || @client_id.empty? raise 'Must provide a client_id if clean_session is set to false' unless @clean_session # Empty client id is not allowed for version 3.1.0 @client_id = MQTT::Client.generate_client_id if @version == '3.1.0' end raise 'No MQTT server host set when attempting to connect' if @host.nil? unless connected? # Create network socket tcp_socket = open_tcp_socket if @ssl # Set the protocol version ssl_context.ssl_version = @ssl if @ssl.is_a?(Symbol) @socket = OpenSSL::SSL::SSLSocket.new(tcp_socket, ssl_context) @socket.sync_close = true # Set hostname on secure socket for Server Name Indication (SNI) @socket.hostname = @host if @socket.respond_to?(:hostname=) @socket.connect @socket.post_connection_check(@host) if @verify_host else @socket = tcp_socket end # Construct a connect packet packet = MQTT::Packet::Connect.new( :version => @version, :clean_session => @clean_session, :keep_alive => @keep_alive, :client_id => @client_id, :username => @username, :password => @password, :will_topic => @will_topic, :will_payload => @will_payload, :will_qos => @will_qos, :will_retain => @will_retain ) # Send packet send_packet(packet) # Receive response receive_connack # Start packet reading thread @read_thread = Thread.new(Thread.current) do |parent| Thread.current[:parent] = parent receive_packet while connected? end end return unless block_given? # If a block is given, then yield and disconnect begin yield(self) ensure disconnect end end |
#connected? ⇒ Boolean
Checks whether the client is connected to the server.
321 322 323 |
# File 'lib/mqtt/client.rb', line 321 def connected? !@socket.nil? && !@socket.closed? end |
#disconnect(send_msg = true) ⇒ Object
Disconnect from the MQTT server. If you don’t want to say goodbye to the server, set send_msg to false.
303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 |
# File 'lib/mqtt/client.rb', line 303 def disconnect(send_msg = true) # Stop reading packets from the socket first @read_thread.kill if @read_thread && @read_thread.alive? @read_thread = nil return unless connected? # Close the socket if it is open if send_msg packet = MQTT::Packet::Disconnect.new send_packet(packet) end @socket.close unless @socket.nil? handle_close @socket = nil end |
#get(topic = nil, options = {}) ⇒ Object
Return the next message received from the MQTT server. An optional topic can be given to subscribe to.
The method either returns the topic and message as an array:
topic, = client.get
Or can be used with a block to keep processing messages:
client.get('test') do |topic,payload|
# Do stuff here
end
395 396 397 398 399 400 401 402 403 404 405 406 407 |
# File 'lib/mqtt/client.rb', line 395 def get(topic = nil, = {}) if block_given? get_packet(topic) do |packet| yield(packet.topic, packet.payload) unless packet.retain && [:omit_retained] end else loop do # Wait for one packet to be available packet = get_packet(topic) return packet.topic, packet.payload unless packet.retain && [:omit_retained] end end end |
#get_packet(topic = nil) ⇒ Object
Return the next packet object received from the MQTT server. An optional topic can be given to subscribe to.
The method either returns a single packet:
packet = client.get_packet
puts packet.topic
Or can be used with a block to keep processing messages:
client.get_packet('test') do |packet|
# Do stuff here
puts packet.topic
end
422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 |
# File 'lib/mqtt/client.rb', line 422 def get_packet(topic = nil) # Subscribe to a topic, if an argument is given subscribe(topic) unless topic.nil? if block_given? # Loop forever! loop do packet = @read_queue.pop yield(packet) puback_packet(packet) if packet.qos > 0 end else # Wait for one packet to be available packet = @read_queue.pop puback_packet(packet) if packet.qos > 0 return packet end end |
#key=(*args) ⇒ Object
Set to a PEM-format client private key
207 208 209 210 |
# File 'lib/mqtt/client.rb', line 207 def key=(*args) cert, passphrase = args.flatten ssl_context.key = OpenSSL::PKey.read(cert, passphrase) end |
#key_file=(*args) ⇒ Object
Set a path to a file containing a PEM-format client private key
201 202 203 204 |
# File 'lib/mqtt/client.rb', line 201 def key_file=(*args) path, passphrase = args.flatten ssl_context.key = OpenSSL::PKey.read(File.binread(path), passphrase) end |
#publish(topic, payload = '', retain = false, qos = 0) ⇒ Object
Publish a message on a particular topic to the MQTT server.
326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 |
# File 'lib/mqtt/client.rb', line 326 def publish(topic, payload = '', retain = false, qos = 0) raise ArgumentError, 'Topic name cannot be nil' if topic.nil? raise ArgumentError, 'Topic name cannot be empty' if topic.empty? packet = MQTT::Packet::Publish.new( :id => next_packet_id, :qos => qos, :retain => retain, :topic => topic, :payload => payload ) queue = qos.zero? ? nil : wait_for_puback(packet.id) res = send_packet(packet) return unless queue deadline = current_time + @ack_timeout loop do response = queue.pop case response when :read_timeout return -1 if current_time > deadline when :close return -1 else @pubacks_semaphore.synchronize do @pubacks.delete packet.id end break end end res end |
#queue_empty? ⇒ Boolean
Returns true if the incoming message queue is empty.
442 443 444 |
# File 'lib/mqtt/client.rb', line 442 def queue_empty? @read_queue.empty? end |
#queue_length ⇒ Object
Returns the length of the incoming message queue.
447 448 449 |
# File 'lib/mqtt/client.rb', line 447 def queue_length @read_queue.length end |
#remote_host ⇒ Object
Please use #host instead
623 624 625 |
# File 'lib/mqtt/client.rb', line 623 def remote_host host end |
#remote_host=(args) ⇒ Object
Please use #host= instead
628 629 630 |
# File 'lib/mqtt/client.rb', line 628 def remote_host=(args) self.host = args end |
#remote_port ⇒ Object
Please use #port instead
633 634 635 |
# File 'lib/mqtt/client.rb', line 633 def remote_port port end |
#remote_port=(args) ⇒ Object
Please use #port= instead
638 639 640 |
# File 'lib/mqtt/client.rb', line 638 def remote_port=(args) self.port = args end |
#set_will(topic, payload, retain = false, qos = 0) ⇒ Object
Set the Will for the client
The will is a message that will be delivered by the server when the client dies. The Will must be set before establishing a connection to the server
222 223 224 225 226 227 |
# File 'lib/mqtt/client.rb', line 222 def set_will(topic, payload, retain = false, qos = 0) self.will_topic = topic self.will_payload = payload self.will_retain = retain self.will_qos = qos end |
#ssl_context ⇒ Object
Get the OpenSSL context, that is used if SSL/TLS is enabled
186 187 188 |
# File 'lib/mqtt/client.rb', line 186 def ssl_context @ssl_context ||= OpenSSL::SSL::SSLContext.new end |
#subscribe(*topics) ⇒ Object
Send a subscribe message for one or more topics on the MQTT server. The topics parameter should be one of the following:
-
String: subscribe to one topic with QoS 0
-
Array: subscribe to multiple topics with QoS 0
-
Hash: subscribe to multiple topics where the key is the topic and the value is the QoS level
For example:
client.subscribe( 'a/b' )
client.subscribe( 'a/b', 'c/d' )
client.subscribe( ['a/b',0], ['c/d',1] )
client.subscribe( 'a/b' => 0, 'c/d' => 1 )
376 377 378 379 380 381 382 |
# File 'lib/mqtt/client.rb', line 376 def subscribe(*topics) packet = MQTT::Packet::Subscribe.new( :id => next_packet_id, :topics => topics ) send_packet(packet) end |
#unsubscribe(*topics) ⇒ Object
Send a unsubscribe message for one or more topics on the MQTT server
457 458 459 460 461 462 463 464 465 |
# File 'lib/mqtt/client.rb', line 457 def unsubscribe(*topics) topics = topics.first if topics.is_a?(Enumerable) && topics.count == 1 packet = MQTT::Packet::Unsubscribe.new( :topics => topics, :id => next_packet_id ) send_packet(packet) end |