Class: PahoMqtt::Client
- Inherits:
-
Object
- Object
- PahoMqtt::Client
- Defined in:
- lib/paho_mqtt/client.rb
Instance Attribute Summary collapse
-
#ack_timeout ⇒ Object
Returns the value of attribute ack_timeout.
-
#blocking ⇒ Object
Returns the value of attribute blocking.
-
#clean_session ⇒ Object
Returns the value of attribute clean_session.
-
#client_id ⇒ Object
Returns the value of attribute client_id.
-
#connection_state ⇒ Object
readonly
Read Only attribute.
-
#host ⇒ Object
Connection related attributes:.
-
#keep_alive ⇒ Object
Timeout attributes:.
-
#mqtt_version ⇒ Object
Returns the value of attribute mqtt_version.
-
#password ⇒ Object
Returns the value of attribute password.
-
#persistent ⇒ Object
Returns the value of attribute persistent.
-
#port ⇒ Object
Returns the value of attribute port.
-
#reconnect_delay ⇒ Object
Returns the value of attribute reconnect_delay.
-
#reconnect_limit ⇒ Object
Returns the value of attribute reconnect_limit.
-
#ssl ⇒ Object
Returns the value of attribute ssl.
-
#ssl_context ⇒ Object
readonly
Returns the value of attribute ssl_context.
-
#username ⇒ Object
Returns the value of attribute username.
-
#will_payload ⇒ Object
Returns the value of attribute will_payload.
-
#will_qos ⇒ Object
Returns the value of attribute will_qos.
-
#will_retain ⇒ Object
Returns the value of attribute will_retain.
-
#will_topic ⇒ Object
Last will attributes:.
Instance Method Summary collapse
- #add_topic_callback(topic, callback = nil, &block) ⇒ Object
- #config_ssl_context(cert_path, key_path, ca_path = nil) ⇒ Object
- #connect(host = @host, port = @port, keep_alive = @keep_alive, persistent = @persistent, blocking = @blocking) ⇒ Object
- #connected? ⇒ Boolean
- #daemon_mode ⇒ Object
- #disconnect(explicit = true) ⇒ Object
- #generate_client_id(prefix = 'paho_ruby', lenght = 16) ⇒ Object
-
#initialize(*args) ⇒ Client
constructor
A new instance of Client.
- #loop_misc ⇒ Object
- #loop_read ⇒ Object
- #loop_write ⇒ Object
- #mqtt_loop ⇒ Object
- #on_connack(&block) ⇒ Object
- #on_connack=(callback) ⇒ Object
- #on_message(&block) ⇒ Object
- #on_message=(callback) ⇒ Object
- #on_puback(&block) ⇒ Object
- #on_puback=(callback) ⇒ Object
- #on_pubcomp(&block) ⇒ Object
- #on_pubcomp=(callback) ⇒ Object
- #on_pubrec(&block) ⇒ Object
- #on_pubrec=(callback) ⇒ Object
- #on_pubrel(&block) ⇒ Object
- #on_pubrel=(callback) ⇒ Object
- #on_suback(&block) ⇒ Object
- #on_suback=(callback) ⇒ Object
- #on_unsuback(&block) ⇒ Object
- #on_unsuback=(callback) ⇒ Object
- #ping_host ⇒ Object
- #publish(topic, payload = "", retain = false, qos = 0) ⇒ Object
- #reconnect ⇒ Object
- #reconnect? ⇒ Boolean
- #registered_callback ⇒ Object
- #remove_topic_callback(topic) ⇒ Object
- #subscribe(*topics) ⇒ Object
- #subscribed_topics ⇒ Object
- #unsubscribe(*topics) ⇒ Object
Constructor Details
#initialize(*args) ⇒ Client
Returns a new instance of Client.
52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 |
# File 'lib/paho_mqtt/client.rb', line 52 def initialize(*args) @last_ping_resp = Time.now @last_packet_id = 0 @ssl_context = nil @sender = nil @handler = Handler.new @connection_helper = nil @connection_state = MQTT_CS_DISCONNECT @connection_state_mutex = Mutex.new @mqtt_thread = nil @reconnect_thread = nil @id_mutex = Mutex.new @reconnect_limit = 3 @reconnect_delay = 5 if args.last.is_a?(Hash) attr = args.pop else attr = {} end CLIENT_ATTR_DEFAULTS.merge(attr).each_pair do |k,v| self.send("#{k}=", v) end if @ssl @ssl_context = OpenSSL::SSL::SSLContext.new end if @port.nil? if @ssl @port = DEFAULT_SSL_PORT else @port = DEFAULT_PORT end end if @client_id.nil? || @client_id == "" @client_id = generate_client_id end end |
Instance Attribute Details
#ack_timeout ⇒ Object
Returns the value of attribute ack_timeout.
46 47 48 |
# File 'lib/paho_mqtt/client.rb', line 46 def ack_timeout @ack_timeout end |
#blocking ⇒ Object
Returns the value of attribute blocking.
32 33 34 |
# File 'lib/paho_mqtt/client.rb', line 32 def blocking @blocking end |
#clean_session ⇒ Object
Returns the value of attribute clean_session.
28 29 30 |
# File 'lib/paho_mqtt/client.rb', line 28 def clean_session @clean_session end |
#client_id ⇒ Object
Returns the value of attribute client_id.
33 34 35 |
# File 'lib/paho_mqtt/client.rb', line 33 def client_id @client_id end |
#connection_state ⇒ Object (readonly)
Read Only attribute
49 50 51 |
# File 'lib/paho_mqtt/client.rb', line 49 def connection_state @connection_state end |
#host ⇒ Object
Connection related attributes:
25 26 27 |
# File 'lib/paho_mqtt/client.rb', line 25 def host @host end |
#keep_alive ⇒ Object
Timeout attributes:
45 46 47 |
# File 'lib/paho_mqtt/client.rb', line 45 def keep_alive @keep_alive end |
#mqtt_version ⇒ Object
Returns the value of attribute mqtt_version.
27 28 29 |
# File 'lib/paho_mqtt/client.rb', line 27 def mqtt_version @mqtt_version end |
#password ⇒ Object
Returns the value of attribute password.
35 36 37 |
# File 'lib/paho_mqtt/client.rb', line 35 def password @password end |
#persistent ⇒ Object
Returns the value of attribute persistent.
29 30 31 |
# File 'lib/paho_mqtt/client.rb', line 29 def persistent @persistent end |
#port ⇒ Object
Returns the value of attribute port.
26 27 28 |
# File 'lib/paho_mqtt/client.rb', line 26 def port @port end |
#reconnect_delay ⇒ Object
Returns the value of attribute reconnect_delay.
31 32 33 |
# File 'lib/paho_mqtt/client.rb', line 31 def reconnect_delay @reconnect_delay end |
#reconnect_limit ⇒ Object
Returns the value of attribute reconnect_limit.
30 31 32 |
# File 'lib/paho_mqtt/client.rb', line 30 def reconnect_limit @reconnect_limit end |
#ssl ⇒ Object
Returns the value of attribute ssl.
36 37 38 |
# File 'lib/paho_mqtt/client.rb', line 36 def ssl @ssl end |
#ssl_context ⇒ Object (readonly)
Returns the value of attribute ssl_context.
50 51 52 |
# File 'lib/paho_mqtt/client.rb', line 50 def ssl_context @ssl_context end |
#username ⇒ Object
Returns the value of attribute username.
34 35 36 |
# File 'lib/paho_mqtt/client.rb', line 34 def username @username end |
#will_payload ⇒ Object
Returns the value of attribute will_payload.
40 41 42 |
# File 'lib/paho_mqtt/client.rb', line 40 def will_payload @will_payload end |
#will_qos ⇒ Object
Returns the value of attribute will_qos.
41 42 43 |
# File 'lib/paho_mqtt/client.rb', line 41 def will_qos @will_qos end |
#will_retain ⇒ Object
Returns the value of attribute will_retain.
42 43 44 |
# File 'lib/paho_mqtt/client.rb', line 42 def will_retain @will_retain end |
#will_topic ⇒ Object
Last will attributes:
39 40 41 |
# File 'lib/paho_mqtt/client.rb', line 39 def will_topic @will_topic end |
Instance Method Details
#add_topic_callback(topic, callback = nil, &block) ⇒ Object
269 270 271 |
# File 'lib/paho_mqtt/client.rb', line 269 def add_topic_callback(topic, callback=nil, &block) @handler.register_topic_callback(topic, callback, &block) end |
#config_ssl_context(cert_path, key_path, ca_path = nil) ⇒ Object
99 100 101 102 |
# File 'lib/paho_mqtt/client.rb', line 99 def config_ssl_context(cert_path, key_path, ca_path=nil) @ssl ||= true @ssl_context = SSLHelper.config_ssl_context(cert_path, key_path, ca_path) end |
#connect(host = @host, port = @port, keep_alive = @keep_alive, persistent = @persistent, blocking = @blocking) ⇒ Object
104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 |
# File 'lib/paho_mqtt/client.rb', line 104 def connect(host=@host, port=@port, keep_alive=@keep_alive, persistent=@persistent, blocking=@blocking) @persistent = persistent @blocking = blocking @host = host @port = port.to_i @keep_alive = keep_alive @connection_state_mutex.synchronize do @connection_state = MQTT_CS_NEW end @mqtt_thread.kill unless @mqtt_thread.nil? init_connection unless reconnect? @connection_helper.send_connect(session_params) begin init_pubsub @connection_state = @connection_helper.do_connect(reconnect?) if connected? build_pubsub daemon_mode unless @blocking end rescue LowVersionException downgrade_version end end |
#connected? ⇒ Boolean
146 147 148 |
# File 'lib/paho_mqtt/client.rb', line 146 def connected? @connection_state == MQTT_CS_CONNECTED end |
#daemon_mode ⇒ Object
129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 |
# File 'lib/paho_mqtt/client.rb', line 129 def daemon_mode @mqtt_thread = Thread.new do @reconnect_thread.kill unless @reconnect_thread.nil? || !@reconnect_thread.alive? begin while connected? do mqtt_loop end rescue SystemCallError => e if @persistent reconnect else raise e end end end end |
#disconnect(explicit = true) ⇒ Object
218 219 220 221 222 223 224 225 226 227 228 |
# File 'lib/paho_mqtt/client.rb', line 218 def disconnect(explicit=true) @connection_helper.do_disconnect(@publisher, explicit, @mqtt_thread) @connection_state_mutex.synchronize do @connection_state = MQTT_CS_DISCONNECT end if explicit && @clean_session @last_packet_id = 0 @subscriber.clear_queue end MQTT_ERR_SUCCESS end |
#generate_client_id(prefix = 'paho_ruby', lenght = 16) ⇒ Object
94 95 96 97 |
# File 'lib/paho_mqtt/client.rb', line 94 def generate_client_id(prefix='paho_ruby', lenght=16) charset = Array('A'..'Z') + Array('a'..'z') + Array('0'..'9') @client_id = prefix << Array.new(lenght) { charset.sample }.join end |
#loop_misc ⇒ Object
189 190 191 192 193 194 195 196 |
# File 'lib/paho_mqtt/client.rb', line 189 def loop_misc if @connection_helper.check_keep_alive(@persistent, @handler.last_ping_resp, @keep_alive) == MQTT_CS_DISCONNECT reconnect if check_persistence end @publisher.check_waiting_publisher @subscriber.check_waiting_subscriber sleep SELECT_TIMEOUT end |
#loop_read ⇒ Object
166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 |
# File 'lib/paho_mqtt/client.rb', line 166 def loop_read begin MAX_QUEUE.times do result = @handler.receive_packet break if result.nil? end rescue FullQueueException PahoMqtt.logger.warn("Early exit in reading loop. The maximum packets have been reach for #{packet.type_name}") if PahoMqtt.logger? rescue ReadingException if check_persistence reconnect else raise ReadingException end end end |
#loop_write ⇒ Object
154 155 156 157 158 159 160 161 162 163 164 |
# File 'lib/paho_mqtt/client.rb', line 154 def loop_write begin @sender.writing_loop rescue WritingException if check_persistence reconnect else raise WritingException end end end |
#mqtt_loop ⇒ Object
183 184 185 186 187 |
# File 'lib/paho_mqtt/client.rb', line 183 def mqtt_loop loop_read loop_write loop_misc end |
#on_connack(&block) ⇒ Object
277 278 279 280 |
# File 'lib/paho_mqtt/client.rb', line 277 def on_connack(&block) @handler.on_connack = block if block_given? @handler.on_connack end |
#on_connack=(callback) ⇒ Object
317 318 319 |
# File 'lib/paho_mqtt/client.rb', line 317 def on_connack=(callback) @handler.on_connack = callback if callback.is_a?(Proc) end |
#on_message(&block) ⇒ Object
312 313 314 315 |
# File 'lib/paho_mqtt/client.rb', line 312 def (&block) @handler. = block if block_given? @handler. end |
#on_message=(callback) ⇒ Object
345 346 347 |
# File 'lib/paho_mqtt/client.rb', line 345 def (callback) @handler. = callback if callback.is_a?(Proc) end |
#on_puback(&block) ⇒ Object
292 293 294 295 |
# File 'lib/paho_mqtt/client.rb', line 292 def on_puback(&block) @handler.on_puback = block if block_given? @handler.on_puback end |
#on_puback=(callback) ⇒ Object
329 330 331 |
# File 'lib/paho_mqtt/client.rb', line 329 def on_puback=(callback) @handler.on_puback = callback if callback.is_a?(Proc) end |
#on_pubcomp(&block) ⇒ Object
307 308 309 310 |
# File 'lib/paho_mqtt/client.rb', line 307 def on_pubcomp(&block) @handler.on_pubcomp = block if block_given? @handler.on_pubcomp end |
#on_pubcomp=(callback) ⇒ Object
341 342 343 |
# File 'lib/paho_mqtt/client.rb', line 341 def on_pubcomp=(callback) @handler.on_pubcomp = callback if callback.is_a?(Proc) end |
#on_pubrec(&block) ⇒ Object
297 298 299 300 |
# File 'lib/paho_mqtt/client.rb', line 297 def on_pubrec(&block) @handler.on_pubrec = block if block_given? @handler.on_pubrec end |
#on_pubrec=(callback) ⇒ Object
333 334 335 |
# File 'lib/paho_mqtt/client.rb', line 333 def on_pubrec=(callback) @handler.on_pubrec = callback if callback.is_a?(Proc) end |
#on_pubrel(&block) ⇒ Object
302 303 304 305 |
# File 'lib/paho_mqtt/client.rb', line 302 def on_pubrel(&block) @handler.on_pubrel = block if block_given? @handler.on_pubrel end |
#on_pubrel=(callback) ⇒ Object
337 338 339 |
# File 'lib/paho_mqtt/client.rb', line 337 def on_pubrel=(callback) @handler.on_pubrel = callback if callback.is_a?(Proc) end |
#on_suback(&block) ⇒ Object
282 283 284 285 |
# File 'lib/paho_mqtt/client.rb', line 282 def on_suback(&block) @handler.on_suback = block if block_given? @handler.on_suback end |
#on_suback=(callback) ⇒ Object
321 322 323 |
# File 'lib/paho_mqtt/client.rb', line 321 def on_suback=(callback) @handler.on_suback = callback if callback.is_a?(Proc) end |
#on_unsuback(&block) ⇒ Object
287 288 289 290 |
# File 'lib/paho_mqtt/client.rb', line 287 def on_unsuback(&block) @handler.on_unsuback = block if block_given? @handler.on_unsuback end |
#on_unsuback=(callback) ⇒ Object
325 326 327 |
# File 'lib/paho_mqtt/client.rb', line 325 def on_unsuback=(callback) @handler.on_unsuback = callback if callback.is_a?(Proc) end |
#ping_host ⇒ Object
265 266 267 |
# File 'lib/paho_mqtt/client.rb', line 265 def ping_host @sender.send_pingreq end |
#publish(topic, payload = "", retain = false, qos = 0) ⇒ Object
230 231 232 233 234 235 236 237 |
# File 'lib/paho_mqtt/client.rb', line 230 def publish(topic, payload="", retain=false, qos=0) if topic == "" || !topic.is_a?(String) PahoMqtt.logger.error("Publish topics is invalid, not a string or empty.") if PahoMqtt.logger? raise ArgumentError end id = next_packet_id @publisher.send_publish(topic, payload, retain, qos, id) end |
#reconnect ⇒ Object
198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 |
# File 'lib/paho_mqtt/client.rb', line 198 def reconnect @reconnect_thread = Thread.new do counter = 0 while (@reconnect_limit >= counter || @reconnect_limit == -1) do counter += 1 PahoMqtt.logger.debug("New reconnect attempt...") if PahoMqtt.logger? connect if connected? break else sleep @reconnect_delay end end unless connected? PahoMqtt.logger.error("Reconnection attempt counter is over. (#{@reconnect_limit} times)") if PahoMqtt.logger? disconnect(false) end end end |
#reconnect? ⇒ Boolean
150 151 152 |
# File 'lib/paho_mqtt/client.rb', line 150 def reconnect? Thread.current == @reconnect_thread end |
#registered_callback ⇒ Object
349 350 351 |
# File 'lib/paho_mqtt/client.rb', line 349 def registered_callback @handler.registered_callback end |
#remove_topic_callback(topic) ⇒ Object
273 274 275 |
# File 'lib/paho_mqtt/client.rb', line 273 def remove_topic_callback(topic) @handler.clear_topic_callback(topic) end |
#subscribe(*topics) ⇒ Object
239 240 241 242 243 244 245 246 247 248 249 250 |
# File 'lib/paho_mqtt/client.rb', line 239 def subscribe(*topics) begin id = next_packet_id unless @subscriber.send_subscribe(topics, id) == PahoMqtt::MQTT_ERR_SUCCESS reconnect if check_persistence end MQTT_ERR_SUCCESS rescue ProtocolViolation PahoMqtt.logger.error("Subscribe topics need one topic or a list of topics.") if PahoMqtt.logger? raise ProtocolViolation end end |
#subscribed_topics ⇒ Object
353 354 355 |
# File 'lib/paho_mqtt/client.rb', line 353 def subscribed_topics @subscriber.subscribed_topics end |
#unsubscribe(*topics) ⇒ Object
252 253 254 255 256 257 258 259 260 261 262 263 |
# File 'lib/paho_mqtt/client.rb', line 252 def unsubscribe(*topics) begin id = next_packet_id unless @subscriber.send_unsubscribe(topics, id) == MQTT_ERR_SUCCESS reconnect if check_persistence end MQTT_ERR_SUCCESS rescue ProtocolViolation PahoMqtt.logger.error("Unsubscribe need at least one topic.") if PahoMqtt.logger? raise ProtocolViolation end end |