Class: PahoMqtt::Client
- Inherits:
-
Object
- Object
- PahoMqtt::Client
- Defined in:
- lib/paho.mqtt/paho_client.rb
Constant Summary collapse
- ATTR_DEFAULTS =
{ :logger => nil, :host => "", :port => nil, :mqtt_version => '3.1.1', :clean_session => true, :persistent => false, :client_id => nil, :username => nil, :password => nil, :ssl => false, :will_topic => nil, :will_payload => nil, :will_qos => 0, :will_retain => false, :keep_alive => 60, :ack_timeout => 5, :on_connack => nil, :on_suback => nil, :on_unsuback => nil, :on_puback => nil, :on_pubrel => nil, :on_pubrec => nil, :on_pubcomp => nil, :on_message => nil, }
Instance Attribute Summary collapse
-
#ack_timeout ⇒ Object
Returns the value of attribute ack_timeout.
-
#clean_session ⇒ Object
Returns the value of attribute clean_session.
-
#client_id ⇒ Object
Returns the value of attribute client_id.
-
#connection_state ⇒ Object
readonly
Returns the value of attribute connection_state.
-
#host ⇒ Object
Connection related attributes:.
-
#keep_alive ⇒ Object
Setting attributes:.
-
#logger ⇒ Object
Log file.
-
#mqtt_version ⇒ Object
Returns the value of attribute mqtt_version.
-
#on_connack(&block) ⇒ Object
Returns the value of attribute on_connack.
-
#on_message(&block) ⇒ Object
Callback attributes.
-
#on_puback(&block) ⇒ Object
Returns the value of attribute on_puback.
-
#on_pubcomp(&block) ⇒ Object
Returns the value of attribute on_pubcomp.
-
#on_pubrec(&block) ⇒ Object
Returns the value of attribute on_pubrec.
-
#on_pubrel(&block) ⇒ Object
Returns the value of attribute on_pubrel.
-
#on_suback(&block) ⇒ Object
Returns the value of attribute on_suback.
-
#on_unsuback(&block) ⇒ Object
Returns the value of attribute on_unsuback.
-
#password ⇒ Object
Returns the value of attribute password.
-
#persistent ⇒ Object
Returns the value of attribute persistent.
-
#port ⇒ Object
Returns the value of attribute port.
-
#registered_callback ⇒ Object
readonly
Read Only attribute.
-
#ssl ⇒ Object
Returns the value of attribute ssl.
-
#subscribed_topics ⇒ Object
readonly
Returns the value of attribute subscribed_topics.
-
#username ⇒ Object
Returns the value of attribute username.
-
#will_payload ⇒ Object
Returns the value of attribute will_payload.
-
#will_qos ⇒ Object
Returns the value of attribute will_qos.
-
#will_retain ⇒ Object
Returns the value of attribute will_retain.
-
#will_topic ⇒ Object
Last will attributes:.
Instance Method Summary collapse
- #add_topic_callback(topic, callback = nil, &block) ⇒ Object
- #cert=(cert_path) ⇒ Object
- #config_ssl_context(cert_path, key_path, ca_path = nil) ⇒ Object
- #config_will(topic, payload = "", retain = false, qos = 0) ⇒ Object
- #connect(host = @host, port = @port, keep_alive = @keep_alive, persistent = @persistent) ⇒ Object
- #connect_async(host, port = 1883, keep_alive) ⇒ Object
- #disconnect(explicit = true) ⇒ Object
- #generate_client_id(prefix = 'paho_ruby', lenght = 16) ⇒ Object
-
#initialize(*args) ⇒ Client
constructor
A new instance of Client.
- #key=(key_path, passphrase = nil) ⇒ Object
- #loop_misc ⇒ Object
- #loop_read(max_packet = 5) ⇒ Object
- #loop_write(max_packet = MAX_WRITING) ⇒ Object
- #mqtt_loop ⇒ Object
- #ping_host ⇒ Object
- #publish(topic, payload = "", retain = false, qos = 0) ⇒ Object
- #reconnect(retry_time = RECONNECT_RETRY_TIME, retry_tempo = RECONNECT_RETRY_TEMPO) ⇒ Object
- #remove_topic_callback(topic) ⇒ Object
- #root_ca=(ca_path) ⇒ Object
- #ssl_context ⇒ Object
- #subscribe(*topics) ⇒ Object
- #unsubscribe(*topics) ⇒ Object
Constructor Details
#initialize(*args) ⇒ Client
Returns a new instance of Client.
73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 |
# File 'lib/paho.mqtt/paho_client.rb', line 73 def initialize(*args) if args.last.is_a?(Hash) attr = args.pop else attr = {} end ATTR_DEFAULTS.merge(attr).each_pair do |k,v| self.send("#{k}=", v) end if @port.nil? @port = @ssl ? PahoMqtt::DEFAULT_SSL_PORT : PahoMqtt::DEFAULT_PORT end if @client_id.nil? || @client_id == "" @client_id = generate_client_id end @last_ping_req = Time.now @last_ping_resp = Time.now @last_packet_id = 0 @socket = nil @ssl_context = nil @writing_mutex = Mutex.new @writing_queue = [] @connection_state = MQTT_CS_DISCONNECT @connection_state_mutex = Mutex.new @subscribed_mutex = Mutex.new @subscribed_topics = [] @registered_callback = [] @waiting_suback = [] @suback_mutex = Mutex.new @waiting_unsuback = [] @unsuback_mutex = Mutex.new @mqtt_thread = nil @reconnect_thread = nil @puback_mutex = Mutex.new @pubrec_mutex = Mutex.new @pubrel_mutex = Mutex.new @pubcomp_mutex = Mutex.new @waiting_puback = [] @waiting_pubrec = [] @waiting_pubrel = [] @waiting_pubcomp = [] end |
Instance Attribute Details
#ack_timeout ⇒ Object
Returns the value of attribute ack_timeout.
28 29 30 |
# File 'lib/paho.mqtt/paho_client.rb', line 28 def ack_timeout @ack_timeout end |
#clean_session ⇒ Object
Returns the value of attribute clean_session.
14 15 16 |
# File 'lib/paho.mqtt/paho_client.rb', line 14 def clean_session @clean_session end |
#client_id ⇒ Object
Returns the value of attribute client_id.
15 16 17 |
# File 'lib/paho.mqtt/paho_client.rb', line 15 def client_id @client_id end |
#connection_state ⇒ Object (readonly)
Returns the value of attribute connection_state.
44 45 46 |
# File 'lib/paho.mqtt/paho_client.rb', line 44 def connection_state @connection_state end |
#host ⇒ Object
Connection related attributes:
11 12 13 |
# File 'lib/paho.mqtt/paho_client.rb', line 11 def host @host end |
#keep_alive ⇒ Object
Setting attributes:
27 28 29 |
# File 'lib/paho.mqtt/paho_client.rb', line 27 def keep_alive @keep_alive end |
#logger ⇒ Object
Log file
8 9 10 |
# File 'lib/paho.mqtt/paho_client.rb', line 8 def logger @logger end |
#mqtt_version ⇒ Object
Returns the value of attribute mqtt_version.
13 14 15 |
# File 'lib/paho.mqtt/paho_client.rb', line 13 def mqtt_version @mqtt_version end |
#on_connack(&block) ⇒ Object
Returns the value of attribute on_connack.
33 34 35 |
# File 'lib/paho.mqtt/paho_client.rb', line 33 def on_connack @on_connack end |
#on_message(&block) ⇒ Object
Callback attributes
32 33 34 |
# File 'lib/paho.mqtt/paho_client.rb', line 32 def end |
#on_puback(&block) ⇒ Object
Returns the value of attribute on_puback.
36 37 38 |
# File 'lib/paho.mqtt/paho_client.rb', line 36 def on_puback @on_puback end |
#on_pubcomp(&block) ⇒ Object
Returns the value of attribute on_pubcomp.
39 40 41 |
# File 'lib/paho.mqtt/paho_client.rb', line 39 def on_pubcomp @on_pubcomp end |
#on_pubrec(&block) ⇒ Object
Returns the value of attribute on_pubrec.
38 39 40 |
# File 'lib/paho.mqtt/paho_client.rb', line 38 def on_pubrec @on_pubrec end |
#on_pubrel(&block) ⇒ Object
Returns the value of attribute on_pubrel.
37 38 39 |
# File 'lib/paho.mqtt/paho_client.rb', line 37 def on_pubrel @on_pubrel end |
#on_suback(&block) ⇒ Object
Returns the value of attribute on_suback.
34 35 36 |
# File 'lib/paho.mqtt/paho_client.rb', line 34 def on_suback @on_suback end |
#on_unsuback(&block) ⇒ Object
Returns the value of attribute on_unsuback.
35 36 37 |
# File 'lib/paho.mqtt/paho_client.rb', line 35 def on_unsuback @on_unsuback end |
#password ⇒ Object
Returns the value of attribute password.
17 18 19 |
# File 'lib/paho.mqtt/paho_client.rb', line 17 def password @password end |
#persistent ⇒ Object
Returns the value of attribute persistent.
29 30 31 |
# File 'lib/paho.mqtt/paho_client.rb', line 29 def persistent @persistent end |
#port ⇒ Object
Returns the value of attribute port.
12 13 14 |
# File 'lib/paho.mqtt/paho_client.rb', line 12 def port @port end |
#registered_callback ⇒ Object (readonly)
Read Only attribute
42 43 44 |
# File 'lib/paho.mqtt/paho_client.rb', line 42 def registered_callback @registered_callback end |
#ssl ⇒ Object
Returns the value of attribute ssl.
18 19 20 |
# File 'lib/paho.mqtt/paho_client.rb', line 18 def ssl @ssl end |
#subscribed_topics ⇒ Object (readonly)
Returns the value of attribute subscribed_topics.
43 44 45 |
# File 'lib/paho.mqtt/paho_client.rb', line 43 def subscribed_topics @subscribed_topics end |
#username ⇒ Object
Returns the value of attribute username.
16 17 18 |
# File 'lib/paho.mqtt/paho_client.rb', line 16 def username @username end |
#will_payload ⇒ Object
Returns the value of attribute will_payload.
22 23 24 |
# File 'lib/paho.mqtt/paho_client.rb', line 22 def will_payload @will_payload end |
#will_qos ⇒ Object
Returns the value of attribute will_qos.
23 24 25 |
# File 'lib/paho.mqtt/paho_client.rb', line 23 def will_qos @will_qos end |
#will_retain ⇒ Object
Returns the value of attribute will_retain.
24 25 26 |
# File 'lib/paho.mqtt/paho_client.rb', line 24 def will_retain @will_retain end |
#will_topic ⇒ Object
Last will attributes:
21 22 23 |
# File 'lib/paho.mqtt/paho_client.rb', line 21 def will_topic @will_topic end |
Instance Method Details
#add_topic_callback(topic, callback = nil, &block) ⇒ Object
304 305 306 307 308 309 310 311 312 313 314 315 316 317 |
# File 'lib/paho.mqtt/paho_client.rb', line 304 def add_topic_callback(topic, callback=nil, &block) if topic.nil? @logger.error("The topics where the callback is trying to be registered have been found nil.") if @logger.is_a?(Logger) raise ParameterException end remove_topic_callback(topic) if block_given? @registered_callback.push([topic, block]) elsif !(callback.nil?) && callback.class == Proc @registered_callback.push([topic, callback]) end MQTT_ERR_SUCCESS end |
#cert=(cert_path) ⇒ Object
138 139 140 |
# File 'lib/paho.mqtt/paho_client.rb', line 138 def cert=(cert_path) ssl_context.cert = OpenSSL::X509::Certificate.new(File.read(cert_path)) end |
#config_ssl_context(cert_path, key_path, ca_path = nil) ⇒ Object
126 127 128 129 130 131 132 |
# File 'lib/paho.mqtt/paho_client.rb', line 126 def config_ssl_context(cert_path, key_path, ca_path=nil) @ssl ||= true @ssl_context = ssl_context self.cert = cert_path self.key = key_path self.root_ca = ca_path end |
#config_will(topic, payload = "", retain = false, qos = 0) ⇒ Object
153 154 155 156 157 158 |
# File 'lib/paho.mqtt/paho_client.rb', line 153 def config_will(topic, payload="", retain=false, qos=0) @will_topic = topic @will_payload = payload @will_retain = retain @will_qos = qos end |
#connect(host = @host, port = @port, keep_alive = @keep_alive, persistent = @persistent) ⇒ Object
160 161 162 163 164 165 166 |
# File 'lib/paho.mqtt/paho_client.rb', line 160 def connect(host=@host, port=@port, keep_alive=@keep_alive, persistent=@persistent) @persistent = persistent @connection_state_mutex.synchronize { @connection_state = MQTT_CS_NEW } connect_async(host, port, keep_alive) end |
#connect_async(host, port = 1883, keep_alive) ⇒ Object
168 169 170 171 172 173 174 175 176 177 |
# File 'lib/paho.mqtt/paho_client.rb', line 168 def connect_async(host, port=1883, keep_alive) @host = host @port = port.to_i @keep_alive = keep_alive @connection_state_mutex.synchronize { @connection_state = MQTT_CS_CONNECT_ASYNC } setup_connection end |
#disconnect(explicit = true) ⇒ Object
233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 |
# File 'lib/paho.mqtt/paho_client.rb', line 233 def disconnect(explicit=true) @logger.debug("Disconnecting from #{@host}") if @logger.is_a?(Logger) if explicit send_disconnect @mqtt_thread.kill if @mqtt_thread && @mqtt_thread.alive? @mqtt_thread.kill if @mqtt_thread.alive? @last_packet_id = 0 @writing_mutex.synchronize { @writing_queue = [] } @puback_mutex.synchronize { @waiting_puback = [] } @pubrec_mutex.synchronize { @waiting_pubrec = [] } @pubrel_mutex.synchronize { @waiting_pubrel = [] } @pubcomp_mutex.synchronize { @waiting_pubcomp = [] } end @socket.close unless @socket.nil? @socket = nil @connection_state_mutex.synchronize { @connection_state = MQTT_CS_DISCONNECT } MQTT_ERR_SUCCESS end |
#generate_client_id(prefix = 'paho_ruby', lenght = 16) ⇒ Object
121 122 123 124 |
# File 'lib/paho.mqtt/paho_client.rb', line 121 def generate_client_id(prefix='paho_ruby', lenght=16) charset = Array('A'..'Z') + Array('a'..'z') + Array('0'..'9') @client_id = prefix << Array.new(lenght) { charset.sample }.join end |
#key=(key_path, passphrase = nil) ⇒ Object
142 143 144 |
# File 'lib/paho.mqtt/paho_client.rb', line 142 def key=(key_path, passphrase=nil) ssl_context.key = OpenSSL::PKey::RSA.new(File.read(key_path), passphrase) end |
#loop_misc ⇒ Object
203 204 205 206 207 208 209 210 211 |
# File 'lib/paho.mqtt/paho_client.rb', line 203 def loop_misc check_keep_alive check_ack_alive(@waiting_puback, @puback_mutex, MAX_PUBACK) check_ack_alive(@waiting_pubrec, @pubrec_mutex, MAX_PUBREC) check_ack_alive(@waiting_pubrel, @pubrel_mutex, MAX_PUBREL) check_ack_alive(@waiting_pubcomp, @pubcomp_mutex, MAX_PUBCOMP) check_ack_alive(@waiting_suback, @suback_mutex, @waiting_suback.length) check_ack_alive(@waiting_unsuback, @unsuback_mutex, @waiting_unsuback.length) end |
#loop_read(max_packet = 5) ⇒ Object
190 191 192 193 194 |
# File 'lib/paho.mqtt/paho_client.rb', line 190 def loop_read(max_packet=5) max_packet.times do receive_packet end end |
#loop_write(max_packet = MAX_WRITING) ⇒ Object
180 181 182 183 184 185 186 187 188 |
# File 'lib/paho.mqtt/paho_client.rb', line 180 def loop_write(max_packet=MAX_WRITING) @writing_mutex.synchronize { cnt = 0 while !@writing_queue.empty? && cnt < max_packet do send_packet(@writing_queue.shift) cnt += 1 end } end |
#mqtt_loop ⇒ Object
196 197 198 199 200 201 |
# File 'lib/paho.mqtt/paho_client.rb', line 196 def mqtt_loop loop_read loop_write loop_misc sleep LOOP_TEMPO end |
#ping_host ⇒ Object
300 301 302 |
# File 'lib/paho.mqtt/paho_client.rb', line 300 def ping_host send_pingreq end |
#publish(topic, payload = "", retain = false, qos = 0) ⇒ Object
272 273 274 275 276 277 278 |
# File 'lib/paho.mqtt/paho_client.rb', line 272 def publish(topic, payload="", retain=false, qos=0) if topic == "" || !topic.is_a?(String) @logger.error("Publish topics is invalid, not a string or empty.") if @logger.is_a?(Logger) raise ParameterException end send_publish(topic, payload, retain, qos) end |
#reconnect(retry_time = RECONNECT_RETRY_TIME, retry_tempo = RECONNECT_RETRY_TEMPO) ⇒ Object
214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 |
# File 'lib/paho.mqtt/paho_client.rb', line 214 def reconnect(retry_time=RECONNECT_RETRY_TIME, retry_tempo=RECONNECT_RETRY_TEMPO) @reconnect_thread = Thread.new do retry_time.times do @logger.debug("New reconnect atempt...") if @logger.is_a?(Logger) setup_connection if @connection_state == MQTT_CS_CONNECTED break else sleep retry_tempo end end if @connection_state != MQTT_CS_CONNECTED @logger.error("Reconnection atempt counter is over.(#{RECONNECT_RETRY_TIME} times)") if @logger.is_a?(Logger) disconnect(false) exit end end end |
#remove_topic_callback(topic) ⇒ Object
319 320 321 322 323 324 325 326 |
# File 'lib/paho.mqtt/paho_client.rb', line 319 def remove_topic_callback(topic) if topic.nil? @logger.error("The topics where the callback is trying to be unregistered have been found nil.") if @logger.is_a?(Logger) raise ParameterException end @registered_callback.delete_if {|pair| pair.first == topic} MQTT_ERR_SUCCESS end |
#root_ca=(ca_path) ⇒ Object
146 147 148 149 150 151 |
# File 'lib/paho.mqtt/paho_client.rb', line 146 def root_ca=(ca_path) ssl_context.ca_file = ca_path unless @ca_path.nil? ssl_context.verify_mode = OpenSSL::SSL::VERIFY_PEER end end |
#ssl_context ⇒ Object
134 135 136 |
# File 'lib/paho.mqtt/paho_client.rb', line 134 def ssl_context @ssl_context ||= OpenSSL::SSL::SSLContext.new end |
#subscribe(*topics) ⇒ Object
280 281 282 283 284 285 286 287 288 |
# File 'lib/paho.mqtt/paho_client.rb', line 280 def subscribe(*topics) unless topics.length == 0 send_subscribe(topics) else @logger.error("Subscribe topics need one topic or a list of topics.") if @logger.is_a?(Logger) disconnect(false) raise ProtocolViolation end end |
#unsubscribe(*topics) ⇒ Object
290 291 292 293 294 295 296 297 298 |
# File 'lib/paho.mqtt/paho_client.rb', line 290 def unsubscribe(*topics) unless topics.length == 0 send_unsubscribe(topics) else @logger.error("Unsubscribe need at least one topics.") if @logger.is_a?(Logger) disconnect(false) raise ProtocolViolation end end |