Class: PahoMqtt::Client
- Inherits:
-
Object
- Object
- PahoMqtt::Client
- Defined in:
- lib/paho_mqtt/client.rb
Instance Attribute Summary collapse
-
#ack_timeout ⇒ Object
Returns the value of attribute ack_timeout.
-
#blocking ⇒ Object
Returns the value of attribute blocking.
-
#clean_session ⇒ Object
Returns the value of attribute clean_session.
-
#client_id ⇒ Object
Returns the value of attribute client_id.
-
#connection_state ⇒ Object
readonly
Read Only attribute.
-
#host ⇒ Object
Connection related attributes:.
-
#keep_alive ⇒ Object
Timeout attributes:.
-
#mqtt_version ⇒ Object
Returns the value of attribute mqtt_version.
-
#password ⇒ Object
Returns the value of attribute password.
-
#persistent ⇒ Object
Returns the value of attribute persistent.
-
#port ⇒ Object
Returns the value of attribute port.
-
#ssl ⇒ Object
Returns the value of attribute ssl.
-
#ssl_context ⇒ Object
readonly
Returns the value of attribute ssl_context.
-
#username ⇒ Object
Returns the value of attribute username.
-
#will_payload ⇒ Object
Returns the value of attribute will_payload.
-
#will_qos ⇒ Object
Returns the value of attribute will_qos.
-
#will_retain ⇒ Object
Returns the value of attribute will_retain.
-
#will_topic ⇒ Object
Last will attributes:.
Instance Method Summary collapse
- #add_topic_callback(topic, callback = nil, &block) ⇒ Object
- #config_ssl_context(cert_path, key_path, ca_path = nil) ⇒ Object
- #connect(host = @host, port = @port, keep_alive = @keep_alive, persistent = @persistent, blocking = @blocking) ⇒ Object
- #connected? ⇒ Boolean
- #daemon_mode ⇒ Object
- #disconnect(explicit = true) ⇒ Object
- #generate_client_id(prefix = 'paho_ruby', lenght = 16) ⇒ Object
-
#initialize(*args) ⇒ Client
constructor
A new instance of Client.
- #loop_misc ⇒ Object
- #loop_read(max_packet = MAX_READ) ⇒ Object
- #loop_write(max_packet = MAX_WRITING) ⇒ Object
- #mqtt_loop ⇒ Object
- #on_connack(&block) ⇒ Object
- #on_connack=(callback) ⇒ Object
- #on_message(&block) ⇒ Object
- #on_message=(callback) ⇒ Object
- #on_puback(&block) ⇒ Object
- #on_puback=(callback) ⇒ Object
- #on_pubcomp(&block) ⇒ Object
- #on_pubcomp=(callback) ⇒ Object
- #on_pubrec(&block) ⇒ Object
- #on_pubrec=(callback) ⇒ Object
- #on_pubrel(&block) ⇒ Object
- #on_pubrel=(callback) ⇒ Object
- #on_suback(&block) ⇒ Object
- #on_suback=(callback) ⇒ Object
- #on_unsuback(&block) ⇒ Object
- #on_unsuback=(callback) ⇒ Object
- #ping_host ⇒ Object
- #publish(topic, payload = "", retain = false, qos = 0) ⇒ Object
- #reconnect(retry_time = RECONNECT_RETRY_TIME, retry_tempo = RECONNECT_RETRY_TEMPO) ⇒ Object
- #reconnect? ⇒ Boolean
- #registered_callback ⇒ Object
- #remove_topic_callback(topic) ⇒ Object
- #subscribe(*topics) ⇒ Object
- #subscribed_topics ⇒ Object
- #unsubscribe(*topics) ⇒ Object
Constructor Details
#initialize(*args) ⇒ Client
Returns a new instance of Client.
50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 |
# File 'lib/paho_mqtt/client.rb', line 50 def initialize(*args) @last_ping_resp = Time.now @last_packet_id = 0 @ssl_context = nil @sender = nil @handler = Handler.new @connection_helper = nil @connection_state = MQTT_CS_DISCONNECT @connection_state_mutex = Mutex.new @mqtt_thread = nil @reconnect_thread = nil @id_mutex = Mutex.new if args.last.is_a?(Hash) attr = args.pop else attr = {} end CLIENT_ATTR_DEFAULTS.merge(attr).each_pair do |k,v| self.send("#{k}=", v) end if @port.nil? @port if @ssl @port = DEFAULT_SSL_PORT else @port = DEFAULT_PORT end end if @client_id.nil? || @client_id == "" @client_id = generate_client_id end end |
Instance Attribute Details
#ack_timeout ⇒ Object
Returns the value of attribute ack_timeout.
44 45 46 |
# File 'lib/paho_mqtt/client.rb', line 44 def ack_timeout @ack_timeout end |
#blocking ⇒ Object
Returns the value of attribute blocking.
30 31 32 |
# File 'lib/paho_mqtt/client.rb', line 30 def blocking @blocking end |
#clean_session ⇒ Object
Returns the value of attribute clean_session.
28 29 30 |
# File 'lib/paho_mqtt/client.rb', line 28 def clean_session @clean_session end |
#client_id ⇒ Object
Returns the value of attribute client_id.
31 32 33 |
# File 'lib/paho_mqtt/client.rb', line 31 def client_id @client_id end |
#connection_state ⇒ Object (readonly)
Read Only attribute
47 48 49 |
# File 'lib/paho_mqtt/client.rb', line 47 def connection_state @connection_state end |
#host ⇒ Object
Connection related attributes:
25 26 27 |
# File 'lib/paho_mqtt/client.rb', line 25 def host @host end |
#keep_alive ⇒ Object
Timeout attributes:
43 44 45 |
# File 'lib/paho_mqtt/client.rb', line 43 def keep_alive @keep_alive end |
#mqtt_version ⇒ Object
Returns the value of attribute mqtt_version.
27 28 29 |
# File 'lib/paho_mqtt/client.rb', line 27 def mqtt_version @mqtt_version end |
#password ⇒ Object
Returns the value of attribute password.
33 34 35 |
# File 'lib/paho_mqtt/client.rb', line 33 def password @password end |
#persistent ⇒ Object
Returns the value of attribute persistent.
29 30 31 |
# File 'lib/paho_mqtt/client.rb', line 29 def persistent @persistent end |
#port ⇒ Object
Returns the value of attribute port.
26 27 28 |
# File 'lib/paho_mqtt/client.rb', line 26 def port @port end |
#ssl ⇒ Object
Returns the value of attribute ssl.
34 35 36 |
# File 'lib/paho_mqtt/client.rb', line 34 def ssl @ssl end |
#ssl_context ⇒ Object (readonly)
Returns the value of attribute ssl_context.
48 49 50 |
# File 'lib/paho_mqtt/client.rb', line 48 def ssl_context @ssl_context end |
#username ⇒ Object
Returns the value of attribute username.
32 33 34 |
# File 'lib/paho_mqtt/client.rb', line 32 def username @username end |
#will_payload ⇒ Object
Returns the value of attribute will_payload.
38 39 40 |
# File 'lib/paho_mqtt/client.rb', line 38 def will_payload @will_payload end |
#will_qos ⇒ Object
Returns the value of attribute will_qos.
39 40 41 |
# File 'lib/paho_mqtt/client.rb', line 39 def will_qos @will_qos end |
#will_retain ⇒ Object
Returns the value of attribute will_retain.
40 41 42 |
# File 'lib/paho_mqtt/client.rb', line 40 def will_retain @will_retain end |
#will_topic ⇒ Object
Last will attributes:
37 38 39 |
# File 'lib/paho_mqtt/client.rb', line 37 def will_topic @will_topic end |
Instance Method Details
#add_topic_callback(topic, callback = nil, &block) ⇒ Object
246 247 248 |
# File 'lib/paho_mqtt/client.rb', line 246 def add_topic_callback(topic, callback=nil, &block) @handler.register_topic_callback(topic, callback, &block) end |
#config_ssl_context(cert_path, key_path, ca_path = nil) ⇒ Object
92 93 94 95 |
# File 'lib/paho_mqtt/client.rb', line 92 def config_ssl_context(cert_path, key_path, ca_path=nil) @ssl ||= true @ssl_context = SSLHelper.config_ssl_context(cert_path, key_path, ca_path) end |
#connect(host = @host, port = @port, keep_alive = @keep_alive, persistent = @persistent, blocking = @blocking) ⇒ Object
97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 |
# File 'lib/paho_mqtt/client.rb', line 97 def connect(host=@host, port=@port, keep_alive=@keep_alive, persistent=@persistent, blocking=@blocking) @persistent = persistent @blocking = blocking @host = host @port = port.to_i @keep_alive = keep_alive @connection_state_mutex.synchronize { @connection_state = MQTT_CS_NEW } @mqtt_thread.kill unless @mqtt_thread.nil? init_connection @connection_helper.send_connect(session_params) begin @connection_state = @connection_helper.do_connect(reconnect?) rescue LowVersionException downgrade_version end build_pubsub daemon_mode unless @blocking || !connected? end |
#connected? ⇒ Boolean
127 128 129 |
# File 'lib/paho_mqtt/client.rb', line 127 def connected? @connection_state == MQTT_CS_CONNECTED end |
#daemon_mode ⇒ Object
118 119 120 121 122 123 124 125 |
# File 'lib/paho_mqtt/client.rb', line 118 def daemon_mode @mqtt_thread = Thread.new do @reconnect_thread.kill unless @reconnect_thread.nil? || !@reconnect_thread.alive? while connected? do mqtt_loop end end end |
#disconnect(explicit = true) ⇒ Object
195 196 197 198 199 200 201 202 |
# File 'lib/paho_mqtt/client.rb', line 195 def disconnect(explicit=true) @last_packet_id = 0 if explicit @connection_helper.do_disconnect(@publisher, explicit, @mqtt_thread) @connection_state_mutex.synchronize { @connection_state = MQTT_CS_DISCONNECT } MQTT_ERR_SUCCESS end |
#generate_client_id(prefix = 'paho_ruby', lenght = 16) ⇒ Object
87 88 89 90 |
# File 'lib/paho_mqtt/client.rb', line 87 def generate_client_id(prefix='paho_ruby', lenght=16) charset = Array('A'..'Z') + Array('a'..'z') + Array('0'..'9') @client_id = prefix << Array.new(lenght) { charset.sample }.join end |
#loop_misc ⇒ Object
168 169 170 171 172 173 174 |
# File 'lib/paho_mqtt/client.rb', line 168 def loop_misc if @connection_helper.check_keep_alive(@persistent, @handler.last_ping_resp, @keep_alive) == MQTT_CS_DISCONNECT reconnect if check_persistence end @publisher.check_waiting_publisher @subscriber.check_waiting_subscriber end |
#loop_read(max_packet = MAX_READ) ⇒ Object
147 148 149 150 151 152 153 154 155 156 157 158 159 |
# File 'lib/paho_mqtt/client.rb', line 147 def loop_read(max_packet=MAX_READ) max_packet.times do begin @handler.receive_packet rescue ReadingException if check_persistence reconnect else raise ReadingException end end end end |
#loop_write(max_packet = MAX_WRITING) ⇒ Object
135 136 137 138 139 140 141 142 143 144 145 |
# File 'lib/paho_mqtt/client.rb', line 135 def loop_write(max_packet=MAX_WRITING) begin @sender.writing_loop(max_packet) rescue WritingException if check_persistence reconnect else raise WritingException end end end |
#mqtt_loop ⇒ Object
161 162 163 164 165 166 |
# File 'lib/paho_mqtt/client.rb', line 161 def mqtt_loop loop_read loop_write loop_misc sleep LOOP_TEMPO end |
#on_connack(&block) ⇒ Object
254 255 256 257 |
# File 'lib/paho_mqtt/client.rb', line 254 def on_connack(&block) @handler.on_connack = block if block_given? @handler.on_connack end |
#on_connack=(callback) ⇒ Object
294 295 296 |
# File 'lib/paho_mqtt/client.rb', line 294 def on_connack=(callback) @handler.on_connack = callback if callback.is_a?(Proc) end |
#on_message(&block) ⇒ Object
289 290 291 292 |
# File 'lib/paho_mqtt/client.rb', line 289 def (&block) @handler. = block if block_given? @handler. end |
#on_message=(callback) ⇒ Object
322 323 324 |
# File 'lib/paho_mqtt/client.rb', line 322 def (callback) @handler. = callback if callback.is_a?(Proc) end |
#on_puback(&block) ⇒ Object
269 270 271 272 |
# File 'lib/paho_mqtt/client.rb', line 269 def on_puback(&block) @handler.on_puback = block if block_given? @handler.on_puback end |
#on_puback=(callback) ⇒ Object
306 307 308 |
# File 'lib/paho_mqtt/client.rb', line 306 def on_puback=(callback) @handler.on_puback = callback if callback.is_a?(Proc) end |
#on_pubcomp(&block) ⇒ Object
284 285 286 287 |
# File 'lib/paho_mqtt/client.rb', line 284 def on_pubcomp(&block) @handler.on_pubcomp = block if block_given? @handler.on_pubcomp end |
#on_pubcomp=(callback) ⇒ Object
318 319 320 |
# File 'lib/paho_mqtt/client.rb', line 318 def on_pubcomp=(callback) @handler.on_pubcomp = callback if callback.is_a?(Proc) end |
#on_pubrec(&block) ⇒ Object
274 275 276 277 |
# File 'lib/paho_mqtt/client.rb', line 274 def on_pubrec(&block) @handler.on_pubrec = block if block_given? @handler.on_pubrec end |
#on_pubrec=(callback) ⇒ Object
310 311 312 |
# File 'lib/paho_mqtt/client.rb', line 310 def on_pubrec=(callback) @handler.on_pubrec = callback if callback.is_a?(Proc) end |
#on_pubrel(&block) ⇒ Object
279 280 281 282 |
# File 'lib/paho_mqtt/client.rb', line 279 def on_pubrel(&block) @handler.on_pubrel = block if block_given? @handler.on_pubrel end |
#on_pubrel=(callback) ⇒ Object
314 315 316 |
# File 'lib/paho_mqtt/client.rb', line 314 def on_pubrel=(callback) @handler.on_pubrel = callback if callback.is_a?(Proc) end |
#on_suback(&block) ⇒ Object
259 260 261 262 |
# File 'lib/paho_mqtt/client.rb', line 259 def on_suback(&block) @handler.on_suback = block if block_given? @handler.on_suback end |
#on_suback=(callback) ⇒ Object
298 299 300 |
# File 'lib/paho_mqtt/client.rb', line 298 def on_suback=(callback) @handler.on_suback = callback if callback.is_a?(Proc) end |
#on_unsuback(&block) ⇒ Object
264 265 266 267 |
# File 'lib/paho_mqtt/client.rb', line 264 def on_unsuback(&block) @handler.on_unsuback = block if block_given? @handler.on_unsuback end |
#on_unsuback=(callback) ⇒ Object
302 303 304 |
# File 'lib/paho_mqtt/client.rb', line 302 def on_unsuback=(callback) @handler.on_unsuback = callback if callback.is_a?(Proc) end |
#ping_host ⇒ Object
242 243 244 |
# File 'lib/paho_mqtt/client.rb', line 242 def ping_host @sender.send_pingreq end |
#publish(topic, payload = "", retain = false, qos = 0) ⇒ Object
204 205 206 207 208 209 210 211 212 |
# File 'lib/paho_mqtt/client.rb', line 204 def publish(topic, payload="", retain=false, qos=0) if topic == "" || !topic.is_a?(String) PahoMqtt.logger.error("Publish topics is invalid, not a string or empty.") if PahoMqtt.logger? raise ArgumentError end id = next_packet_id @publisher.send_publish(topic, payload, retain, qos, id) MQTT_ERR_SUCCESS end |
#reconnect(retry_time = RECONNECT_RETRY_TIME, retry_tempo = RECONNECT_RETRY_TEMPO) ⇒ Object
176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 |
# File 'lib/paho_mqtt/client.rb', line 176 def reconnect(retry_time=RECONNECT_RETRY_TIME, retry_tempo=RECONNECT_RETRY_TEMPO) @reconnect_thread = Thread.new do retry_time.times do PahoMqtt.logger.debug("New reconnect atempt...") if PahoMqtt.logger? connect if connected? break else sleep retry_tempo end end unless connected? PahoMqtt.logger.error("Reconnection atempt counter is over.(#{RECONNECT_RETRY_TIME} times)") if PahoMqtt.logger? disconnect(false) exit end end end |
#reconnect? ⇒ Boolean
131 132 133 |
# File 'lib/paho_mqtt/client.rb', line 131 def reconnect? Thread.current == @reconnect_thread end |
#registered_callback ⇒ Object
326 327 328 |
# File 'lib/paho_mqtt/client.rb', line 326 def registered_callback @handler.registered_callback end |
#remove_topic_callback(topic) ⇒ Object
250 251 252 |
# File 'lib/paho_mqtt/client.rb', line 250 def remove_topic_callback(topic) @handler.clear_topic_callback(topic) end |
#subscribe(*topics) ⇒ Object
214 215 216 217 218 219 220 221 222 223 224 225 226 |
# File 'lib/paho_mqtt/client.rb', line 214 def subscribe(*topics) begin id = next_packet_id unless @subscriber.send_subscribe(topics, id) == PahoMqtt::MQTT_ERR_SUCCESS reconnect if check_persistence end MQTT_ERR_SUCCESS rescue ProtocolViolation PahoMqtt.logger.error("Subscribe topics need one topic or a list of topics.") if PahoMqtt.logger? disconnect(false) raise ProtocolViolation end end |
#subscribed_topics ⇒ Object
330 331 332 |
# File 'lib/paho_mqtt/client.rb', line 330 def subscribed_topics @subscriber.subscribed_topics end |
#unsubscribe(*topics) ⇒ Object
228 229 230 231 232 233 234 235 236 237 238 239 240 |
# File 'lib/paho_mqtt/client.rb', line 228 def unsubscribe(*topics) begin id = next_packet_id unless @subscriber.send_unsubscribe(topics, id) == MQTT_ERR_SUCCESS reconnect if check_persistence end MQTT_ERR_SUCCESS rescue ProtocolViolation PahoMqtt.logger.error("Unsubscribe need at least one topics.") if PahoMqtt.logger? disconnect(false) raise ProtocolViolation end end |