Class: PahoMqtt::Client
- Inherits:
-
Object
- Object
- PahoMqtt::Client
- Defined in:
- lib/paho_mqtt/client.rb
Constant Summary collapse
- ATTR_DEFAULTS =
{ :logger => nil, :host => "", :port => nil, :mqtt_version => '3.1.1', :clean_session => true, :persistent => false, :blocking => false, :client_id => nil, :username => nil, :password => nil, :ssl => false, :will_topic => nil, :will_payload => nil, :will_qos => 0, :will_retain => false, :keep_alive => 60, :ack_timeout => 5, :on_connack => nil, :on_suback => nil, :on_unsuback => nil, :on_puback => nil, :on_pubrel => nil, :on_pubrec => nil, :on_pubcomp => nil, :on_message => nil, }
Instance Attribute Summary collapse
-
#ack_timeout ⇒ Object
Returns the value of attribute ack_timeout.
-
#blocking ⇒ Object
Returns the value of attribute blocking.
-
#clean_session ⇒ Object
Returns the value of attribute clean_session.
-
#client_id ⇒ Object
Returns the value of attribute client_id.
-
#connection_state ⇒ Object
readonly
Returns the value of attribute connection_state.
-
#host ⇒ Object
Connection related attributes:.
-
#keep_alive ⇒ Object
Timeout attributes:.
-
#logger ⇒ Object
Log file.
-
#mqtt_version ⇒ Object
Returns the value of attribute mqtt_version.
-
#on_connack(&block) ⇒ Object
Returns the value of attribute on_connack.
-
#on_message(&block) ⇒ Object
Callback attributes.
-
#on_puback(&block) ⇒ Object
Returns the value of attribute on_puback.
-
#on_pubcomp(&block) ⇒ Object
Returns the value of attribute on_pubcomp.
-
#on_pubrec(&block) ⇒ Object
Returns the value of attribute on_pubrec.
-
#on_pubrel(&block) ⇒ Object
Returns the value of attribute on_pubrel.
-
#on_suback(&block) ⇒ Object
Returns the value of attribute on_suback.
-
#on_unsuback(&block) ⇒ Object
Returns the value of attribute on_unsuback.
-
#password ⇒ Object
Returns the value of attribute password.
-
#persistent ⇒ Object
Returns the value of attribute persistent.
-
#port ⇒ Object
Returns the value of attribute port.
-
#registered_callback ⇒ Object
readonly
Read Only attribute.
-
#ssl ⇒ Object
Returns the value of attribute ssl.
-
#subscribed_topics ⇒ Object
readonly
Returns the value of attribute subscribed_topics.
-
#username ⇒ Object
Returns the value of attribute username.
-
#will_payload ⇒ Object
Returns the value of attribute will_payload.
-
#will_qos ⇒ Object
Returns the value of attribute will_qos.
-
#will_retain ⇒ Object
Returns the value of attribute will_retain.
-
#will_topic ⇒ Object
Last will attributes:.
Instance Method Summary collapse
- #add_topic_callback(topic, callback = nil, &block) ⇒ Object
- #config_ssl_context(cert_path, key_path, ca_path = nil) ⇒ Object
- #config_will(topic, payload = "", retain = false, qos = 0) ⇒ Object
- #connect(host = @host, port = @port, keep_alive = @keep_alive, persistent = @persistent, blocking = @blocking) ⇒ Object
- #connect_async(host, port = 1883, keep_alive) ⇒ Object
- #connected? ⇒ Boolean
- #disconnect(explicit = true) ⇒ Object
- #generate_client_id(prefix = 'paho_ruby', lenght = 16) ⇒ Object
-
#initialize(*args) ⇒ Client
constructor
A new instance of Client.
- #loop_misc ⇒ Object
- #loop_read(max_packet = MAX_READ) ⇒ Object
- #loop_write(max_packet = MAX_WRITING) ⇒ Object
- #mqtt_loop ⇒ Object
- #ping_host ⇒ Object
- #publish(topic, payload = "", retain = false, qos = 0) ⇒ Object
- #reconnect(retry_time = RECONNECT_RETRY_TIME, retry_tempo = RECONNECT_RETRY_TEMPO) ⇒ Object
- #remove_topic_callback(topic) ⇒ Object
- #ssl_context ⇒ Object
- #subscribe(*topics) ⇒ Object
- #unsubscribe(*topics) ⇒ Object
Constructor Details
#initialize(*args) ⇒ Client
Returns a new instance of Client.
75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 |
# File 'lib/paho_mqtt/client.rb', line 75 def initialize(*args) if args.last.is_a?(Hash) attr = args.pop else attr = {} end ATTR_DEFAULTS.merge(attr).each_pair do |k,v| self.send("#{k}=", v) end if @port.nil? @port if @ssl @port = PahoMqtt::DEFAULT_SSL_PORT else @port = PahoMqtt::DEFAULT_PORT end end if @client_id.nil? || @client_id == "" @client_id = generate_client_id end @last_ping_req = Time.now @last_ping_resp = Time.now @last_packet_id = 0 @socket = nil @ssl_context = nil @writing_mutex = Mutex.new @writing_queue = [] @connection_state = MQTT_CS_DISCONNECT @connection_state_mutex = Mutex.new @subscribed_mutex = Mutex.new @subscribed_topics = [] @registered_callback = [] @waiting_suback = [] @suback_mutex = Mutex.new @waiting_unsuback = [] @unsuback_mutex = Mutex.new @mqtt_thread = nil @reconnect_thread = nil @puback_mutex = Mutex.new @pubrec_mutex = Mutex.new @pubrel_mutex = Mutex.new @pubcomp_mutex = Mutex.new @waiting_puback = [] @waiting_pubrec = [] @waiting_pubrel = [] @waiting_pubcomp = [] end |
Instance Attribute Details
#ack_timeout ⇒ Object
Returns the value of attribute ack_timeout.
30 31 32 |
# File 'lib/paho_mqtt/client.rb', line 30 def ack_timeout @ack_timeout end |
#blocking ⇒ Object
Returns the value of attribute blocking.
16 17 18 |
# File 'lib/paho_mqtt/client.rb', line 16 def blocking @blocking end |
#clean_session ⇒ Object
Returns the value of attribute clean_session.
14 15 16 |
# File 'lib/paho_mqtt/client.rb', line 14 def clean_session @clean_session end |
#client_id ⇒ Object
Returns the value of attribute client_id.
17 18 19 |
# File 'lib/paho_mqtt/client.rb', line 17 def client_id @client_id end |
#connection_state ⇒ Object (readonly)
Returns the value of attribute connection_state.
45 46 47 |
# File 'lib/paho_mqtt/client.rb', line 45 def connection_state @connection_state end |
#host ⇒ Object
Connection related attributes:
11 12 13 |
# File 'lib/paho_mqtt/client.rb', line 11 def host @host end |
#keep_alive ⇒ Object
Timeout attributes:
29 30 31 |
# File 'lib/paho_mqtt/client.rb', line 29 def keep_alive @keep_alive end |
#logger ⇒ Object
Log file
8 9 10 |
# File 'lib/paho_mqtt/client.rb', line 8 def logger @logger end |
#mqtt_version ⇒ Object
Returns the value of attribute mqtt_version.
13 14 15 |
# File 'lib/paho_mqtt/client.rb', line 13 def mqtt_version @mqtt_version end |
#on_connack(&block) ⇒ Object
Returns the value of attribute on_connack.
34 35 36 |
# File 'lib/paho_mqtt/client.rb', line 34 def on_connack @on_connack end |
#on_message(&block) ⇒ Object
Callback attributes
33 34 35 |
# File 'lib/paho_mqtt/client.rb', line 33 def end |
#on_puback(&block) ⇒ Object
Returns the value of attribute on_puback.
37 38 39 |
# File 'lib/paho_mqtt/client.rb', line 37 def on_puback @on_puback end |
#on_pubcomp(&block) ⇒ Object
Returns the value of attribute on_pubcomp.
40 41 42 |
# File 'lib/paho_mqtt/client.rb', line 40 def on_pubcomp @on_pubcomp end |
#on_pubrec(&block) ⇒ Object
Returns the value of attribute on_pubrec.
39 40 41 |
# File 'lib/paho_mqtt/client.rb', line 39 def on_pubrec @on_pubrec end |
#on_pubrel(&block) ⇒ Object
Returns the value of attribute on_pubrel.
38 39 40 |
# File 'lib/paho_mqtt/client.rb', line 38 def on_pubrel @on_pubrel end |
#on_suback(&block) ⇒ Object
Returns the value of attribute on_suback.
35 36 37 |
# File 'lib/paho_mqtt/client.rb', line 35 def on_suback @on_suback end |
#on_unsuback(&block) ⇒ Object
Returns the value of attribute on_unsuback.
36 37 38 |
# File 'lib/paho_mqtt/client.rb', line 36 def on_unsuback @on_unsuback end |
#password ⇒ Object
Returns the value of attribute password.
19 20 21 |
# File 'lib/paho_mqtt/client.rb', line 19 def password @password end |
#persistent ⇒ Object
Returns the value of attribute persistent.
15 16 17 |
# File 'lib/paho_mqtt/client.rb', line 15 def persistent @persistent end |
#port ⇒ Object
Returns the value of attribute port.
12 13 14 |
# File 'lib/paho_mqtt/client.rb', line 12 def port @port end |
#registered_callback ⇒ Object (readonly)
Read Only attribute
43 44 45 |
# File 'lib/paho_mqtt/client.rb', line 43 def registered_callback @registered_callback end |
#ssl ⇒ Object
Returns the value of attribute ssl.
20 21 22 |
# File 'lib/paho_mqtt/client.rb', line 20 def ssl @ssl end |
#subscribed_topics ⇒ Object (readonly)
Returns the value of attribute subscribed_topics.
44 45 46 |
# File 'lib/paho_mqtt/client.rb', line 44 def subscribed_topics @subscribed_topics end |
#username ⇒ Object
Returns the value of attribute username.
18 19 20 |
# File 'lib/paho_mqtt/client.rb', line 18 def username @username end |
#will_payload ⇒ Object
Returns the value of attribute will_payload.
24 25 26 |
# File 'lib/paho_mqtt/client.rb', line 24 def will_payload @will_payload end |
#will_qos ⇒ Object
Returns the value of attribute will_qos.
25 26 27 |
# File 'lib/paho_mqtt/client.rb', line 25 def will_qos @will_qos end |
#will_retain ⇒ Object
Returns the value of attribute will_retain.
26 27 28 |
# File 'lib/paho_mqtt/client.rb', line 26 def will_retain @will_retain end |
#will_topic ⇒ Object
Last will attributes:
23 24 25 |
# File 'lib/paho_mqtt/client.rb', line 23 def will_topic @will_topic end |
Instance Method Details
#add_topic_callback(topic, callback = nil, &block) ⇒ Object
301 302 303 304 305 306 307 308 309 310 311 312 313 314 |
# File 'lib/paho_mqtt/client.rb', line 301 def add_topic_callback(topic, callback=nil, &block) if topic.nil? @logger.error("The topics where the callback is trying to be registered have been found nil.") if @logger.is_a?(Logger) raise ParameterException end remove_topic_callback(topic) if block_given? @registered_callback.push([topic, block]) elsif !(callback.nil?) && callback.class == Proc @registered_callback.push([topic, callback]) end MQTT_ERR_SUCCESS end |
#config_ssl_context(cert_path, key_path, ca_path = nil) ⇒ Object
137 138 139 140 141 142 143 |
# File 'lib/paho_mqtt/client.rb', line 137 def config_ssl_context(cert_path, key_path, ca_path=nil) @ssl ||= true @ssl_context = ssl_context self.cert = cert_path self.key = key_path self.root_ca = ca_path end |
#config_will(topic, payload = "", retain = false, qos = 0) ⇒ Object
145 146 147 148 149 150 |
# File 'lib/paho_mqtt/client.rb', line 145 def config_will(topic, payload="", retain=false, qos=0) @will_topic = topic @will_payload = payload @will_retain = retain @will_qos = qos end |
#connect(host = @host, port = @port, keep_alive = @keep_alive, persistent = @persistent, blocking = @blocking) ⇒ Object
152 153 154 155 156 157 158 159 160 161 162 163 |
# File 'lib/paho_mqtt/client.rb', line 152 def connect(host=@host, port=@port, keep_alive=@keep_alive, persistent=@persistent, blocking=@blocking) @persistent = persistent @blocking = blocking @host = host @port = port.to_i @keep_alive = keep_alive @connection_state_mutex.synchronize { @connection_state = MQTT_CS_NEW } setup_connection connect_async(host, port, keep_alive) unless @blocking end |
#connect_async(host, port = 1883, keep_alive) ⇒ Object
165 166 167 168 169 170 171 172 |
# File 'lib/paho_mqtt/client.rb', line 165 def connect_async(host, port=1883, keep_alive) @mqtt_thread = Thread.new do @reconnect_thread.kill unless @reconnect_thread.nil? || !@reconnect_thread.alive? while connected? do mqtt_loop end end end |
#connected? ⇒ Boolean
174 175 176 |
# File 'lib/paho_mqtt/client.rb', line 174 def connected? @connection_state == MQTT_CS_CONNECTED end |
#disconnect(explicit = true) ⇒ Object
230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 |
# File 'lib/paho_mqtt/client.rb', line 230 def disconnect(explicit=true) @logger.debug("Disconnecting from #{@host}") if @logger.is_a?(Logger) if explicit send_disconnect @mqtt_thread.kill if @mqtt_thread && @mqtt_thread.alive? @mqtt_thread.kill if @mqtt_thread.alive? @last_packet_id = 0 @writing_mutex.synchronize { @writing_queue = [] } @puback_mutex.synchronize { @waiting_puback = [] } @pubrec_mutex.synchronize { @waiting_pubrec = [] } @pubrel_mutex.synchronize { @waiting_pubrel = [] } @pubcomp_mutex.synchronize { @waiting_pubcomp = [] } end @socket.close unless @socket.nil? || @socket.closed? @socket = nil @connection_state_mutex.synchronize { @connection_state = MQTT_CS_DISCONNECT } MQTT_ERR_SUCCESS end |
#generate_client_id(prefix = 'paho_ruby', lenght = 16) ⇒ Object
128 129 130 131 |
# File 'lib/paho_mqtt/client.rb', line 128 def generate_client_id(prefix='paho_ruby', lenght=16) charset = Array('A'..'Z') + Array('a'..'z') + Array('0'..'9') @client_id = prefix << Array.new(lenght) { charset.sample }.join end |
#loop_misc ⇒ Object
201 202 203 204 205 206 207 208 209 |
# File 'lib/paho_mqtt/client.rb', line 201 def loop_misc check_keep_alive check_ack_alive(@waiting_puback, @puback_mutex, MAX_PUBACK) check_ack_alive(@waiting_pubrec, @pubrec_mutex, MAX_PUBREC) check_ack_alive(@waiting_pubrel, @pubrel_mutex, MAX_PUBREL) check_ack_alive(@waiting_pubcomp, @pubcomp_mutex, MAX_PUBCOMP) check_ack_alive(@waiting_suback, @suback_mutex, @waiting_suback.length) check_ack_alive(@waiting_unsuback, @unsuback_mutex, @waiting_unsuback.length) end |
#loop_read(max_packet = MAX_READ) ⇒ Object
188 189 190 191 192 |
# File 'lib/paho_mqtt/client.rb', line 188 def loop_read(max_packet=MAX_READ) max_packet.times do receive_packet end end |
#loop_write(max_packet = MAX_WRITING) ⇒ Object
178 179 180 181 182 183 184 185 186 |
# File 'lib/paho_mqtt/client.rb', line 178 def loop_write(max_packet=MAX_WRITING) @writing_mutex.synchronize { cnt = 0 while !@writing_queue.empty? && cnt < max_packet do send_packet(@writing_queue.shift) cnt += 1 end } end |
#mqtt_loop ⇒ Object
194 195 196 197 198 199 |
# File 'lib/paho_mqtt/client.rb', line 194 def mqtt_loop loop_read loop_write loop_misc sleep LOOP_TEMPO end |
#ping_host ⇒ Object
297 298 299 |
# File 'lib/paho_mqtt/client.rb', line 297 def ping_host send_pingreq end |
#publish(topic, payload = "", retain = false, qos = 0) ⇒ Object
269 270 271 272 273 274 275 |
# File 'lib/paho_mqtt/client.rb', line 269 def publish(topic, payload="", retain=false, qos=0) if topic == "" || !topic.is_a?(String) @logger.error("Publish topics is invalid, not a string or empty.") if @logger.is_a?(Logger) raise ParameterException end send_publish(topic, payload, retain, qos) end |
#reconnect(retry_time = RECONNECT_RETRY_TIME, retry_tempo = RECONNECT_RETRY_TEMPO) ⇒ Object
211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 |
# File 'lib/paho_mqtt/client.rb', line 211 def reconnect(retry_time=RECONNECT_RETRY_TIME, retry_tempo=RECONNECT_RETRY_TEMPO) @reconnect_thread = Thread.new do retry_time.times do @logger.debug("New reconnect atempt...") if @logger.is_a?(Logger) connect if connected? break else sleep retry_tempo end end unless connected? @logger.error("Reconnection atempt counter is over.(#{RECONNECT_RETRY_TIME} times)") if @logger.is_a?(Logger) disconnect(false) exit end end end |
#remove_topic_callback(topic) ⇒ Object
316 317 318 319 320 321 322 323 |
# File 'lib/paho_mqtt/client.rb', line 316 def remove_topic_callback(topic) if topic.nil? @logger.error("The topics where the callback is trying to be unregistered have been found nil.") if @logger.is_a?(Logger) raise ParameterException end @registered_callback.delete_if {|pair| pair.first == topic} MQTT_ERR_SUCCESS end |
#ssl_context ⇒ Object
133 134 135 |
# File 'lib/paho_mqtt/client.rb', line 133 def ssl_context @ssl_context ||= OpenSSL::SSL::SSLContext.new end |
#subscribe(*topics) ⇒ Object
277 278 279 280 281 282 283 284 285 |
# File 'lib/paho_mqtt/client.rb', line 277 def subscribe(*topics) unless valid_topics?(topics) == MQTT_ERR_FAIL send_subscribe(topics) else @logger.error("Subscribe topics need one topic or a list of topics.") if @logger.is_a?(Logger) disconnect(false) raise ProtocolViolation end end |
#unsubscribe(*topics) ⇒ Object
287 288 289 290 291 292 293 294 295 |
# File 'lib/paho_mqtt/client.rb', line 287 def unsubscribe(*topics) unless valid_topics?(topics) == MQTT_ERR_FAIL send_unsubscribe(topics) else @logger.error("Unsubscribe need at least one topics.") if @logger.is_a?(Logger) disconnect(false) raise ProtocolViolation end end |