Class: PahoMqtt::Client
- Inherits:
-
Object
- Object
- PahoMqtt::Client
- Defined in:
- lib/paho.mqtt/paho_client.rb
Constant Summary collapse
- MAX_PUBACK =
MAX size of queue
20- MAX_PUBREC =
20- MAX_PUBREL =
20- MAX_PUBCOMP =
20- MAX_WRITING =
MAX_PUBACK + MAX_PUBREC + MAX_PUBREL + MAX_PUBCOMP
- MQTT_CS_NEW =
Connection states values
0- MQTT_CS_CONNECTED =
1- MQTT_CS_DISCONNECT =
2- MQTT_CS_CONNECT_ASYNC =
3- MQTT_ERR_AGAIN =
Error values
-1
- MQTT_ERR_SUCCESS =
0- MQTT_ERR_NOMEM =
1- MQTT_ERR_PROTOCOL =
2- MQTT_ERR_INVAL =
3- MQTT_ERR_NO_CONN =
4- MQTT_ERR_CONN_REFUSED =
5- MQTT_ERR_NOT_FOUND =
6- MQTT_ERR_CONN_LOST =
7- MQTT_ERR_TLS =
8- MQTT_ERR_PAYLOAD_SIZE =
9- MQTT_ERR_NOT_SUPPORTED =
10- MQTT_ERR_AUTH =
11- MQTT_ERR_ACL_DENIED =
12- MQTT_ERR_UNKNOWN =
13- MQTT_ERR_ERRNO =
14- ATTR_DEFAULTS =
{ :host => "", :port => nil, :mqtt_version => '3.1.1', :clean_session => true, :persistent => false, :client_id => nil, :username => nil, :password => nil, :ssl => false, :will_topic => nil, :will_payload => nil, :will_qos => 0, :will_retain => false, :keep_alive => 10, :ack_timeout => 5, :on_connack => nil, :on_suback => nil, :on_unsuback => nil, :on_puback => nil, :on_pubrel => nil, :on_pubrec => nil, :on_pubcomp => nil, :on_message => nil, }
Instance Attribute Summary collapse
-
#ack_timeout ⇒ Object
Returns the value of attribute ack_timeout.
-
#clean_session ⇒ Object
Returns the value of attribute clean_session.
-
#client_id ⇒ Object
Returns the value of attribute client_id.
-
#connection_state ⇒ Object
readonly
Returns the value of attribute connection_state.
-
#host ⇒ Object
Connection related attributes:.
-
#keep_alive ⇒ Object
Setting attributes:.
-
#mqtt_version ⇒ Object
Returns the value of attribute mqtt_version.
-
#on_connack ⇒ Object
writeonly
Sets the attribute on_connack.
-
#on_message ⇒ Object
writeonly
Callback attributes.
-
#on_puback ⇒ Object
writeonly
Sets the attribute on_puback.
-
#on_pubcomp ⇒ Object
writeonly
Sets the attribute on_pubcomp.
-
#on_pubrec ⇒ Object
writeonly
Sets the attribute on_pubrec.
-
#on_pubrel ⇒ Object
writeonly
Sets the attribute on_pubrel.
-
#on_suback ⇒ Object
writeonly
Sets the attribute on_suback.
-
#on_unsuback ⇒ Object
writeonly
Sets the attribute on_unsuback.
-
#password ⇒ Object
Returns the value of attribute password.
-
#persistent ⇒ Object
Returns the value of attribute persistent.
-
#port ⇒ Object
Returns the value of attribute port.
-
#registered_callback ⇒ Object
readonly
Read Only attribute.
-
#ssl ⇒ Object
Returns the value of attribute ssl.
-
#subscribed_topics ⇒ Object
readonly
Returns the value of attribute subscribed_topics.
-
#username ⇒ Object
Returns the value of attribute username.
-
#will_payload ⇒ Object
Returns the value of attribute will_payload.
-
#will_qos ⇒ Object
Returns the value of attribute will_qos.
-
#will_retain ⇒ Object
Returns the value of attribute will_retain.
-
#will_topic ⇒ Object
Last will attributes:.
Instance Method Summary collapse
- #append_to_writing(packet) ⇒ Object
- #cert=(cert_path) ⇒ Object
- #check_ack_alive(queue, mutex, max_packet) ⇒ Object
- #check_keep_alive ⇒ Object
- #config_all_message_queue ⇒ Object
- #config_message_queue(queue, mutex, max_packet) ⇒ Object
- #config_socket ⇒ Object
- #config_ssl_context(cert_path, key_path, ca_path = nil) ⇒ Object
- #config_subscription ⇒ Object
- #config_will(topic, payload = "", retain = false, qos = 0) ⇒ Object
- #connect(host = @host, port = @port, keep_alive = @keep_alive, persistent = @persistent) ⇒ Object
- #connect_async(host, port = 1883, keep_alive) ⇒ Object
- #disconnect(explicit = true) ⇒ Object
- #generate_client_id(prefix = 'paho_ruby', lenght = 16) ⇒ Object
-
#initialize(*args) ⇒ Client
constructor
A new instance of Client.
- #key=(key_path, passphrase = nil) ⇒ Object
- #loop_misc ⇒ Object
- #loop_read(max_packet = 5) ⇒ Object
- #loop_write(max_packet = MAX_WRITING) ⇒ Object
- #mqtt_loop ⇒ Object
- #next_packet_id ⇒ Object
- #publish(topic, payload = "", retain = false, qos = 0) ⇒ Object
- #reconnect(retry_time = 3, retry_tempo = 3) ⇒ Object
- #root_ca=(ca_path) ⇒ Object
- #setup_connection ⇒ Object
- #ssl_context ⇒ Object
- #subscribe(*topics) ⇒ Object
- #unsubscribe(topics) ⇒ Object
Constructor Details
#initialize(*args) ⇒ Client
Returns a new instance of Client.
106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 |
# File 'lib/paho.mqtt/paho_client.rb', line 106 def initialize(*args) if args.last.is_a?(Hash) attr = args.pop else attr = {} end ATTR_DEFAULTS.merge(attr).each_pair do |k,v| self.send("#{k}=", v) end if @port.nil? @port = @ssl ? PahoMqtt::DEFAULT_SSL_PORT : PahoMqtt::DEFAULT_PORT end if @client_id.nil? || @client_id == "" @client_id = generate_client_id end @last_ping_req = Time.now @last_ping_resp = Time.now @last_packet_id = 0 @socket = nil @ssl_context = nil @writing_mutex = Mutex.new @writing_queue = [] @connection_state = MQTT_CS_DISCONNECT @connection_state_mutex = Mutex.new @subscribed_mutex = Mutex.new @subscribed_topics = [] @registered_callback = [] @waiting_suback = [] @suback_mutex = Mutex.new @waiting_unsuback = [] @unsuback_mutex = Mutex.new @mqtt_thread = nil @reconnect_thread = nil @puback_mutex = Mutex.new @pubrec_mutex = Mutex.new @pubrel_mutex = Mutex.new @pubcomp_mutex = Mutex.new @waiting_puback = [] @waiting_pubrec = [] @waiting_pubrel = [] @waiting_pubcomp = [] end |
Instance Attribute Details
#ack_timeout ⇒ Object
Returns the value of attribute ack_timeout.
62 63 64 |
# File 'lib/paho.mqtt/paho_client.rb', line 62 def ack_timeout @ack_timeout end |
#clean_session ⇒ Object
Returns the value of attribute clean_session.
48 49 50 |
# File 'lib/paho.mqtt/paho_client.rb', line 48 def clean_session @clean_session end |
#client_id ⇒ Object
Returns the value of attribute client_id.
49 50 51 |
# File 'lib/paho.mqtt/paho_client.rb', line 49 def client_id @client_id end |
#connection_state ⇒ Object (readonly)
Returns the value of attribute connection_state.
78 79 80 |
# File 'lib/paho.mqtt/paho_client.rb', line 78 def connection_state @connection_state end |
#host ⇒ Object
Connection related attributes:
45 46 47 |
# File 'lib/paho.mqtt/paho_client.rb', line 45 def host @host end |
#keep_alive ⇒ Object
Setting attributes:
61 62 63 |
# File 'lib/paho.mqtt/paho_client.rb', line 61 def keep_alive @keep_alive end |
#mqtt_version ⇒ Object
Returns the value of attribute mqtt_version.
47 48 49 |
# File 'lib/paho.mqtt/paho_client.rb', line 47 def mqtt_version @mqtt_version end |
#on_connack=(value) ⇒ Object
Sets the attribute on_connack
67 68 69 |
# File 'lib/paho.mqtt/paho_client.rb', line 67 def on_connack=(value) @on_connack = value end |
#on_message=(value) ⇒ Object
Callback attributes
66 67 68 |
# File 'lib/paho.mqtt/paho_client.rb', line 66 def (value) @on_message = value end |
#on_puback=(value) ⇒ Object
Sets the attribute on_puback
70 71 72 |
# File 'lib/paho.mqtt/paho_client.rb', line 70 def on_puback=(value) @on_puback = value end |
#on_pubcomp=(value) ⇒ Object
Sets the attribute on_pubcomp
73 74 75 |
# File 'lib/paho.mqtt/paho_client.rb', line 73 def on_pubcomp=(value) @on_pubcomp = value end |
#on_pubrec=(value) ⇒ Object
Sets the attribute on_pubrec
72 73 74 |
# File 'lib/paho.mqtt/paho_client.rb', line 72 def on_pubrec=(value) @on_pubrec = value end |
#on_pubrel=(value) ⇒ Object
Sets the attribute on_pubrel
71 72 73 |
# File 'lib/paho.mqtt/paho_client.rb', line 71 def on_pubrel=(value) @on_pubrel = value end |
#on_suback=(value) ⇒ Object
Sets the attribute on_suback
68 69 70 |
# File 'lib/paho.mqtt/paho_client.rb', line 68 def on_suback=(value) @on_suback = value end |
#on_unsuback=(value) ⇒ Object
Sets the attribute on_unsuback
69 70 71 |
# File 'lib/paho.mqtt/paho_client.rb', line 69 def on_unsuback=(value) @on_unsuback = value end |
#password ⇒ Object
Returns the value of attribute password.
51 52 53 |
# File 'lib/paho.mqtt/paho_client.rb', line 51 def password @password end |
#persistent ⇒ Object
Returns the value of attribute persistent.
63 64 65 |
# File 'lib/paho.mqtt/paho_client.rb', line 63 def persistent @persistent end |
#port ⇒ Object
Returns the value of attribute port.
46 47 48 |
# File 'lib/paho.mqtt/paho_client.rb', line 46 def port @port end |
#registered_callback ⇒ Object (readonly)
Read Only attribute
76 77 78 |
# File 'lib/paho.mqtt/paho_client.rb', line 76 def registered_callback @registered_callback end |
#ssl ⇒ Object
Returns the value of attribute ssl.
52 53 54 |
# File 'lib/paho.mqtt/paho_client.rb', line 52 def ssl @ssl end |
#subscribed_topics ⇒ Object (readonly)
Returns the value of attribute subscribed_topics.
77 78 79 |
# File 'lib/paho.mqtt/paho_client.rb', line 77 def subscribed_topics @subscribed_topics end |
#username ⇒ Object
Returns the value of attribute username.
50 51 52 |
# File 'lib/paho.mqtt/paho_client.rb', line 50 def username @username end |
#will_payload ⇒ Object
Returns the value of attribute will_payload.
56 57 58 |
# File 'lib/paho.mqtt/paho_client.rb', line 56 def will_payload @will_payload end |
#will_qos ⇒ Object
Returns the value of attribute will_qos.
57 58 59 |
# File 'lib/paho.mqtt/paho_client.rb', line 57 def will_qos @will_qos end |
#will_retain ⇒ Object
Returns the value of attribute will_retain.
58 59 60 |
# File 'lib/paho.mqtt/paho_client.rb', line 58 def will_retain @will_retain end |
#will_topic ⇒ Object
Last will attributes:
55 56 57 |
# File 'lib/paho.mqtt/paho_client.rb', line 55 def will_topic @will_topic end |
Instance Method Details
#append_to_writing(packet) ⇒ Object
367 368 369 370 371 372 |
# File 'lib/paho.mqtt/paho_client.rb', line 367 def append_to_writing(packet) @writing_mutex.synchronize { @writing_queue.push(packet) } MQTT_ERR_SUCCESS end |
#cert=(cert_path) ⇒ Object
198 199 200 |
# File 'lib/paho.mqtt/paho_client.rb', line 198 def cert=(cert_path) ssl_context.cert = OpenSSL::X509::Certificate.new(File.read(cert_path)) end |
#check_ack_alive(queue, mutex, max_packet) ⇒ Object
350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 |
# File 'lib/paho.mqtt/paho_client.rb', line 350 def check_ack_alive(queue, mutex, max_packet) mutex.synchronize { now = Time.now cnt = 0 queue.each do |pck| if now >= pck[:timestamp] + @ack_timeout pck[:packet].dup ||= true unless pck[:packet].class == PahoMqtt::Packet::Subscribe || pck[:packet].class == PahoMqtt::Packet::Unsubscribe unless cnt > max_packet append_to_writing(pck[:packet]) pck[:timestamp] = now cnt += 1 end end end } end |
#check_keep_alive ⇒ Object
314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 |
# File 'lib/paho.mqtt/paho_client.rb', line 314 def check_keep_alive if @keep_alive >= 0 && @connection_state == MQTT_CS_CONNECTED now = Time.now timeout_req = (@last_ping_req + (@keep_alive * 0.7).ceil) if timeout_req <= now && @persistent send_pingreq @last_ping_req = now end timeout_resp = @last_ping_resp + (@keep_alive * 1.1).ceil if timeout_resp <= now # TODO => MOVE TO LOGGER #puts "Didn't get answer from server for a long time, trying to reconnect." disconnect(false) reconnect(RECONNECT_RETRY_TIME, RECONNECT_RETRY_TEMPO) if @persistent end end end |
#config_all_message_queue ⇒ Object
392 393 394 395 396 397 |
# File 'lib/paho.mqtt/paho_client.rb', line 392 def (@waiting_puback, @puback_mutex, MAX_PUBACK) (@waiting_pubrec, @pubrec_mutex, MAX_PUBREC) (@waiting_pubrel, @pubrel_mutex, MAX_PUBREL) (@waiting_pubcomp, @pubcomp_mutex, MAX_PUBCOMP) end |
#config_message_queue(queue, mutex, max_packet) ⇒ Object
399 400 401 402 403 404 405 406 407 408 409 410 |
# File 'lib/paho.mqtt/paho_client.rb', line 399 def (queue, mutex, max_packet) mutex.synchronize { cnt = 0 queue.each do |pck| pck[:packet].dup ||= true if cnt <= max_packet append_to_writing(pck) cnt += 1 end end } end |
#config_socket ⇒ Object
171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 |
# File 'lib/paho.mqtt/paho_client.rb', line 171 def config_socket unless @socket.nil? @socket.close @socket = nil end unless @host.nil? || @port < 0 tcp_socket = TCPSocket.new(@host, @port) end if @ssl unless @ssl_context.nil? @socket = OpenSSL::SSL::SSLSocket.new(tcp_socket, @ssl_context) @socket.sync_close = true @socket.connect else raise "SSL context should be defined and set to open SSLSocket" end else @socket = tcp_socket end end |
#config_ssl_context(cert_path, key_path, ca_path = nil) ⇒ Object
163 164 165 166 167 168 169 |
# File 'lib/paho.mqtt/paho_client.rb', line 163 def config_ssl_context(cert_path, key_path, ca_path=nil) @ssl ||= true @ssl_context = ssl_context self.cert = cert_path self.key = key_path self.root_ca = ca_path end |
#config_subscription ⇒ Object
374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 |
# File 'lib/paho.mqtt/paho_client.rb', line 374 def config_subscription unless @subscribed_topics == [] new_id = next_packet_id packet = PahoMqtt::Packet::Subscribe.new( :id => new_id, :topics => @subscribed_topics ) @subscribed_mutex.synchronize { @subscribed_topics = [] } @suback_mutex.synchronize { @waiting_suback.push({ :id => new_id, :packet => packet, :timestamp => Time.now }) } send_packet(packet) end MQTT_ERR_SUCCESS end |
#config_will(topic, payload = "", retain = false, qos = 0) ⇒ Object
213 214 215 216 217 218 |
# File 'lib/paho.mqtt/paho_client.rb', line 213 def config_will(topic, payload="", retain=false, qos=0) @will_topic = topic @will_payload = payload @will_retain = retain @will_qos = qos end |
#connect(host = @host, port = @port, keep_alive = @keep_alive, persistent = @persistent) ⇒ Object
220 221 222 223 224 225 226 |
# File 'lib/paho.mqtt/paho_client.rb', line 220 def connect(host=@host, port=@port, keep_alive=@keep_alive, persistent=@persistent) @persistent = persistent @connection_state_mutex.synchronize { @connection_state = MQTT_CS_NEW } connect_async(host, port, keep_alive) end |
#connect_async(host, port = 1883, keep_alive) ⇒ Object
228 229 230 231 232 233 234 235 236 237 |
# File 'lib/paho.mqtt/paho_client.rb', line 228 def connect_async(host, port=1883, keep_alive) @host = host @port = port.to_i @keep_alive = keep_alive @connection_state_mutex.synchronize { @connection_state = MQTT_CS_CONNECT_ASYNC } setup_connection end |
#disconnect(explicit = true) ⇒ Object
412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 |
# File 'lib/paho.mqtt/paho_client.rb', line 412 def disconnect(explicit=true) # TODO => MOVE TO LOGGER # puts "Disconnecting" if explicit send_disconnect @mqtt_thread.kill if @mqtt_thread && @mqtt_thread.alive? @mqtt_thread.kill if @mqtt_thread.alive? @socket.close unless @socket.nil? @socket = nil end @connection_state_mutex.synchronize { @connection_state = MQTT_CS_DISCONNECT } @writing_mutex.synchronize { @writing_queue = [] } @puback_mutex.synchronize { @waiting_puback = [] } @pubrec_mutex.synchronize { @waiting_pubrec = [] } @pubrel_mutex.synchronize { @waiting_pubrel = [] } @pubcomp_mutex.synchronize { @waiting_pubcomp = [] } @last_packet_id = 0 MQTT_ERR_SUCCESS end |
#generate_client_id(prefix = 'paho_ruby', lenght = 16) ⇒ Object
154 155 156 157 |
# File 'lib/paho.mqtt/paho_client.rb', line 154 def generate_client_id(prefix='paho_ruby', lenght=16) charset = Array('A'..'Z') + Array('a'..'z') + Array('0'..'9') @client_id = prefix << Array.new(lenght) { charset.sample }.join end |
#key=(key_path, passphrase = nil) ⇒ Object
202 203 204 |
# File 'lib/paho.mqtt/paho_client.rb', line 202 def key=(key_path, passphrase=nil) ssl_context.key = OpenSSL::PKey::RSA.new(File.read(key_path), passphrase) end |
#loop_misc ⇒ Object
304 305 306 307 308 309 310 311 312 |
# File 'lib/paho.mqtt/paho_client.rb', line 304 def loop_misc check_keep_alive check_ack_alive(@waiting_puback, @puback_mutex, MAX_PUBACK) check_ack_alive(@waiting_pubrec, @pubrec_mutex, MAX_PUBREC) check_ack_alive(@waiting_pubrel, @pubrel_mutex, MAX_PUBREL) check_ack_alive(@waiting_pubcomp, @pubcomp_mutex, MAX_PUBCOMP) check_ack_alive(@waiting_suback, @suback_mutex, @waiting_suback.length) check_ack_alive(@waiting_unsuback, @unsuback_mutex, @waiting_unsuback.length) end |
#loop_read(max_packet = 5) ⇒ Object
291 292 293 294 295 |
# File 'lib/paho.mqtt/paho_client.rb', line 291 def loop_read(max_packet=5) max_packet.times do receive_packet end end |
#loop_write(max_packet = MAX_WRITING) ⇒ Object
281 282 283 284 285 286 287 288 289 |
# File 'lib/paho.mqtt/paho_client.rb', line 281 def loop_write(max_packet=MAX_WRITING) @writing_mutex.synchronize { cnt = 0 while !@writing_queue.empty? && cnt < max_packet do send_packet(@writing_queue.shift) cnt += 1 end } end |
#mqtt_loop ⇒ Object
297 298 299 300 301 302 |
# File 'lib/paho.mqtt/paho_client.rb', line 297 def mqtt_loop loop_read loop_write loop_misc sleep LOOP_TEMPO end |
#next_packet_id ⇒ Object
159 160 161 |
# File 'lib/paho.mqtt/paho_client.rb', line 159 def next_packet_id @last_packet_id = ( @last_packet_id || 0 ).next end |
#publish(topic, payload = "", retain = false, qos = 0) ⇒ Object
453 454 455 456 457 458 |
# File 'lib/paho.mqtt/paho_client.rb', line 453 def publish(topic, payload="", retain=false, qos=0) if topic == "" || !topic.is_a?(String) raise "Publish error, topic is empty or invalid" end send_publish(topic, payload, retain, qos) end |
#reconnect(retry_time = 3, retry_tempo = 3) ⇒ Object
334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 |
# File 'lib/paho.mqtt/paho_client.rb', line 334 def reconnect(retry_time=3, retry_tempo=3) @reconnect_thread = Thread.new do retry_time.times do # TODO => MOVE TO LOGGER #puts "Retrying to connect" setup_connection if @connection_state == MQTT_CS_CONNECTED break else sleep retry_tempo end end raise "Reconnection retry counter is over (#{RECONNECT_RETRY_TIME}), could not reconnect to the server." end end |
#root_ca=(ca_path) ⇒ Object
206 207 208 209 210 211 |
# File 'lib/paho.mqtt/paho_client.rb', line 206 def root_ca=(ca_path) ssl_context.ca_file = ca_path unless @ca_path.nil? ssl_context.verify_mode = OpenSSL::SSL::VERIFY_PEER end end |
#setup_connection ⇒ Object
239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 |
# File 'lib/paho.mqtt/paho_client.rb', line 239 def setup_connection @mqtt_thread.kill unless @mqtt_thread.nil? if @host.nil? || @host == "" raise "Connection Failed, host cannot be nil or empty" end if @port.to_i <= 0 raise "Connection Failed port cannot be 0 >=" end @socket.close unless @socket.nil? @socket = nil @last_ping_req = Time.now @last_ping_resp = Time.now # TODO => MOVE TO LOGGER # puts "Try to connect to #{@host}" config_socket send_connect # Waiting a Connack packet for "ack_timeout" second from the remote connect_timeout = Time.now + @ack_timeout while (Time.now <= connect_timeout) && (@connection_state != MQTT_CS_CONNECTED) do receive_packet end if @connection_state != MQTT_CS_CONNECTED # TODO => MOVE TO LOGGER # puts "Didn't receive Connack answer from server #{@host}" else config_subscription @mqtt_thread = Thread.new do @reconnect_thread.kill unless @reconnect_thread.nil? || !@reconnect_thread.alive? while @connection_state == MQTT_CS_CONNECTED do mqtt_loop end end end end |
#ssl_context ⇒ Object
194 195 196 |
# File 'lib/paho.mqtt/paho_client.rb', line 194 def ssl_context @ssl_context ||= OpenSSL::SSL::SSLContext.new end |
#subscribe(*topics) ⇒ Object
460 461 462 463 464 465 466 |
# File 'lib/paho.mqtt/paho_client.rb', line 460 def subscribe(*topics) unless topics.length == 0 send_subscribe(topics) else raise "Protocol Violation, subscribe topics list must not be empty." end end |
#unsubscribe(topics) ⇒ Object
468 469 470 471 472 473 474 |
# File 'lib/paho.mqtt/paho_client.rb', line 468 def unsubscribe(topics) unless topics.length == 0 send_unsubscribe(topics) else raise "Protocol Violation, unsubscribe topics list must not be empty." end end |