Class: PahoMqtt::Handler
- Inherits:
-
Object
- Object
- PahoMqtt::Handler
- Defined in:
- lib/paho_mqtt/handler.rb
Instance Attribute Summary collapse
-
#clean_session ⇒ Object
Returns the value of attribute clean_session.
-
#last_ping_resp ⇒ Object
Returns the value of attribute last_ping_resp.
-
#registered_callback ⇒ Object
readonly
Returns the value of attribute registered_callback.
Instance Method Summary collapse
- #check_callback(packet) ⇒ Object
- #clean_session?(session_flag) ⇒ Boolean
- #clear_topic_callback(topic) ⇒ Object
- #config_pubsub(publisher, subscriber) ⇒ Object
- #handle_connack(packet) ⇒ Object
- #handle_connack_accepted(session_flag) ⇒ Object
- #handle_packet(packet) ⇒ Object
- #handle_pingresp(_packet) ⇒ Object
- #handle_puback(packet) ⇒ Object
- #handle_pubcomp(packet) ⇒ Object
- #handle_publish(packet) ⇒ Object
- #handle_pubrec(packet) ⇒ Object
- #handle_pubrel(packet) ⇒ Object
- #handle_suback(packet) ⇒ Object
- #handle_unsuback(packet) ⇒ Object
-
#initialize ⇒ Handler
constructor
A new instance of Handler.
- #new_session?(session_flag) ⇒ Boolean
- #old_session?(session_flag) ⇒ Boolean
- #on_connack(&block) ⇒ Object
- #on_connack=(callback) ⇒ Object
- #on_message(&block) ⇒ Object
- #on_message=(callback) ⇒ Object
- #on_puback(&block) ⇒ Object
- #on_puback=(callback) ⇒ Object
- #on_pubcomp(&block) ⇒ Object
- #on_pubcomp=(callback) ⇒ Object
- #on_pubrec(&block) ⇒ Object
- #on_pubrec=(callback) ⇒ Object
- #on_pubrel(&block) ⇒ Object
- #on_pubrel=(callback) ⇒ Object
- #on_suback(&block) ⇒ Object
- #on_suback=(callback) ⇒ Object
- #on_unsuback(&block) ⇒ Object
- #on_unsuback=(callback) ⇒ Object
- #packet_type(packet) ⇒ Object
- #receive_packet ⇒ Object
- #register_topic_callback(topic, callback, &block) ⇒ Object
- #socket=(socket) ⇒ Object
Constructor Details
#initialize ⇒ Handler
Returns a new instance of Handler.
22 23 24 25 26 27 |
# File 'lib/paho_mqtt/handler.rb', line 22 def initialize @registered_callback = [] @last_ping_resp = -1 @publisher = nil @subscriber = nil end |
Instance Attribute Details
#clean_session ⇒ Object
Returns the value of attribute clean_session.
20 21 22 |
# File 'lib/paho_mqtt/handler.rb', line 20 def clean_session @clean_session end |
#last_ping_resp ⇒ Object
Returns the value of attribute last_ping_resp.
19 20 21 |
# File 'lib/paho_mqtt/handler.rb', line 19 def last_ping_resp @last_ping_resp end |
#registered_callback ⇒ Object (readonly)
Returns the value of attribute registered_callback.
18 19 20 |
# File 'lib/paho_mqtt/handler.rb', line 18 def registered_callback @registered_callback end |
Instance Method Details
#check_callback(packet) ⇒ Object
261 262 263 264 265 266 267 268 269 270 271 |
# File 'lib/paho_mqtt/handler.rb', line 261 def check_callback(packet) callbacks = [] @registered_callback.each do |reccord| callbacks.push(reccord.last) if PahoMqtt.match_filter(packet.topic, reccord.first) end unless callbacks.empty? callbacks.each do |callback| callback.call(packet) end end end |
#clean_session?(session_flag) ⇒ Boolean
107 108 109 110 111 |
# File 'lib/paho_mqtt/handler.rb', line 107 def clean_session?(session_flag) if @clean_session && !session_flag PahoMqtt.logger.debug("No previous session found by server, starting a new one.") if PahoMqtt.logger? end end |
#clear_topic_callback(topic) ⇒ Object
74 75 76 77 78 79 80 81 |
# File 'lib/paho_mqtt/handler.rb', line 74 def clear_topic_callback(topic) if topic.nil? PahoMqtt.logger.error("The topics where the callback is trying to be unregistered have been found nil.") if PahoMqtt.logger? raise ArgumentError end @registered_callback.delete_if { |pair| pair.first == topic } MQTT_ERR_SUCCESS end |
#config_pubsub(publisher, subscriber) ⇒ Object
29 30 31 32 |
# File 'lib/paho_mqtt/handler.rb', line 29 def config_pubsub(publisher, subscriber) @publisher = publisher @subscriber = subscriber end |
#handle_connack(packet) ⇒ Object
83 84 85 86 87 88 89 90 91 92 93 |
# File 'lib/paho_mqtt/handler.rb', line 83 def handle_connack(packet) if packet.return_code == 0x00 PahoMqtt.logger.debug(packet.return_msg) if PahoMqtt.logger? handle_connack_accepted(packet.session_present) else PahoMqtt.logger.warm(packet.return_msg) if PahoMqtt.logger? MQTT_CS_DISCONNECTED end @on_connack.call(packet) unless @on_connack.nil? MQTT_CS_CONNECTED end |
#handle_connack_accepted(session_flag) ⇒ Object
95 96 97 98 99 |
# File 'lib/paho_mqtt/handler.rb', line 95 def handle_connack_accepted(session_flag) clean_session?(session_flag) new_session?(session_flag) old_session?(session_flag) end |
#handle_packet(packet) ⇒ Object
54 55 56 57 58 |
# File 'lib/paho_mqtt/handler.rb', line 54 def handle_packet(packet) PahoMqtt.logger.info("New packet #{packet.class} received.") if PahoMqtt.logger? type = packet_type(packet) self.send("handle_#{type}", packet) end |
#handle_pingresp(_packet) ⇒ Object
119 120 121 |
# File 'lib/paho_mqtt/handler.rb', line 119 def handle_pingresp(_packet) @last_ping_resp = Time.now end |
#handle_puback(packet) ⇒ Object
151 152 153 154 155 156 |
# File 'lib/paho_mqtt/handler.rb', line 151 def handle_puback(packet) id = packet.id if @publisher.do_puback(id) == MQTT_ERR_SUCCESS @on_puback.call(packet) unless @on_puback.nil? end end |
#handle_pubcomp(packet) ⇒ Object
172 173 174 175 176 177 |
# File 'lib/paho_mqtt/handler.rb', line 172 def handle_pubcomp(packet) id = packet.id if @publisher.do_pubcomp(id) == MQTT_ERR_SUCCESS @on_pubcomp.call(packet) unless @on_pubcomp.nil? end end |
#handle_publish(packet) ⇒ Object
142 143 144 145 146 147 148 149 |
# File 'lib/paho_mqtt/handler.rb', line 142 def handle_publish(packet) id = packet.id qos = packet.qos if @publisher.do_publish(qos, id) == MQTT_ERR_SUCCESS .call(packet) unless .nil? check_callback(packet) end end |
#handle_pubrec(packet) ⇒ Object
158 159 160 161 162 163 |
# File 'lib/paho_mqtt/handler.rb', line 158 def handle_pubrec(packet) id = packet.id if @publisher.do_pubrec(id) == MQTT_ERR_SUCCESS @on_pubrec.call(packet) unless @on_pubrec.nil? end end |
#handle_pubrel(packet) ⇒ Object
165 166 167 168 169 170 |
# File 'lib/paho_mqtt/handler.rb', line 165 def handle_pubrel(packet) id = packet.id if @publisher.do_pubrel(id) == MQTT_ERR_SUCCESS @on_pubrel.call(packet) unless @on_pubrel.nil? end end |
#handle_suback(packet) ⇒ Object
123 124 125 126 127 128 129 130 131 |
# File 'lib/paho_mqtt/handler.rb', line 123 def handle_suback(packet) max_qos = packet.return_codes id = packet.id topics = [] topics = @subscriber.add_subscription(max_qos, id, topics) unless topics.empty? @on_suback.call(topics) unless @on_suback.nil? end end |
#handle_unsuback(packet) ⇒ Object
133 134 135 136 137 138 139 140 |
# File 'lib/paho_mqtt/handler.rb', line 133 def handle_unsuback(packet) id = packet.id topics = [] topics = @subscriber.remove_subscription(id, topics) unless topics.empty? @on_unsuback.call(topics) unless @on_unsuback.nil? end end |
#new_session?(session_flag) ⇒ Boolean
101 102 103 104 105 |
# File 'lib/paho_mqtt/handler.rb', line 101 def new_session?(session_flag) if !@clean_session && !session_flag PahoMqtt.logger.debug("New session created for the client.") if PahoMqtt.logger? end end |
#old_session?(session_flag) ⇒ Boolean
113 114 115 116 117 |
# File 'lib/paho_mqtt/handler.rb', line 113 def old_session?(session_flag) if !@clean_session && session_flag PahoMqtt.logger.debug("Previous session restored by the server.") if PahoMqtt.logger? end end |
#on_connack(&block) ⇒ Object
179 180 181 182 |
# File 'lib/paho_mqtt/handler.rb', line 179 def on_connack(&block) @on_connack = block if block_given? @on_connack end |
#on_connack=(callback) ⇒ Object
219 220 221 |
# File 'lib/paho_mqtt/handler.rb', line 219 def on_connack=(callback) @on_connack = callback if callback.is_a?(Proc) end |
#on_message(&block) ⇒ Object
214 215 216 217 |
# File 'lib/paho_mqtt/handler.rb', line 214 def (&block) = block if block_given? end |
#on_message=(callback) ⇒ Object
247 248 249 |
# File 'lib/paho_mqtt/handler.rb', line 247 def (callback) = callback if callback.is_a?(Proc) end |
#on_puback(&block) ⇒ Object
194 195 196 197 |
# File 'lib/paho_mqtt/handler.rb', line 194 def on_puback(&block) @on_puback = block if block_given? @on_puback end |
#on_puback=(callback) ⇒ Object
231 232 233 |
# File 'lib/paho_mqtt/handler.rb', line 231 def on_puback=(callback) @on_puback = callback if callback.is_a?(Proc) end |
#on_pubcomp(&block) ⇒ Object
209 210 211 212 |
# File 'lib/paho_mqtt/handler.rb', line 209 def on_pubcomp(&block) @on_pubcomp = block if block_given? @on_pubcomp end |
#on_pubcomp=(callback) ⇒ Object
243 244 245 |
# File 'lib/paho_mqtt/handler.rb', line 243 def on_pubcomp=(callback) @on_pubcomp = callback if callback.is_a?(Proc) end |
#on_pubrec(&block) ⇒ Object
199 200 201 202 |
# File 'lib/paho_mqtt/handler.rb', line 199 def on_pubrec(&block) @on_pubrec = block if block_given? @on_pubrec end |
#on_pubrec=(callback) ⇒ Object
235 236 237 |
# File 'lib/paho_mqtt/handler.rb', line 235 def on_pubrec=(callback) @on_pubrec = callback if callback.is_a?(Proc) end |
#on_pubrel(&block) ⇒ Object
204 205 206 207 |
# File 'lib/paho_mqtt/handler.rb', line 204 def on_pubrel(&block) @on_pubrel = block if block_given? @on_pubrel end |
#on_pubrel=(callback) ⇒ Object
239 240 241 |
# File 'lib/paho_mqtt/handler.rb', line 239 def on_pubrel=(callback) @on_pubrel = callback if callback.is_a?(Proc) end |
#on_suback(&block) ⇒ Object
184 185 186 187 |
# File 'lib/paho_mqtt/handler.rb', line 184 def on_suback(&block) @on_suback = block if block_given? @on_suback end |
#on_suback=(callback) ⇒ Object
223 224 225 |
# File 'lib/paho_mqtt/handler.rb', line 223 def on_suback=(callback) @on_suback = callback if callback.is_a?(Proc) end |
#on_unsuback(&block) ⇒ Object
189 190 191 192 |
# File 'lib/paho_mqtt/handler.rb', line 189 def on_unsuback(&block) @on_unsuback = block if block_given? @on_unsuback end |
#on_unsuback=(callback) ⇒ Object
227 228 229 |
# File 'lib/paho_mqtt/handler.rb', line 227 def on_unsuback=(callback) @on_unsuback = callback if callback.is_a?(Proc) end |
#packet_type(packet) ⇒ Object
251 252 253 254 255 256 257 258 259 |
# File 'lib/paho_mqtt/handler.rb', line 251 def packet_type(packet) type = packet.class if PahoMqtt::PACKET_TYPES[3..13].include?(type) type.to_s.split('::').last.downcase else PahoMqtt.logger.error("Received an unexpeceted packet: #{packet}.") if PahoMqtt.logger? raise PacketException.new('Invalid packet type id') end end |
#receive_packet ⇒ Object
38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 |
# File 'lib/paho_mqtt/handler.rb', line 38 def receive_packet result = IO.select([@socket], [], [], SELECT_TIMEOUT) unless @socket.nil? || @socket.closed? unless result.nil? packet = PahoMqtt::Packet::Base.read(@socket) unless packet.nil? if packet.is_a?(PahoMqtt::Packet::Connack) @last_ping_resp = Time.now handle_connack(packet) else handle_packet(packet) @last_ping_resp = Time.now end end end end |
#register_topic_callback(topic, callback, &block) ⇒ Object
60 61 62 63 64 65 66 67 68 69 70 71 72 |
# File 'lib/paho_mqtt/handler.rb', line 60 def register_topic_callback(topic, callback, &block) if topic.nil? PahoMqtt.logger.error("The topics where the callback is trying to be registered have been found nil.") if PahoMqtt.logger? raise ArgumentError end clear_topic_callback(topic) if block_given? @registered_callback.push([topic, block]) elsif !(callback.nil?) && callback.is_a?(Proc) @registered_callback.push([topic, callback]) end MQTT_ERR_SUCCESS end |
#socket=(socket) ⇒ Object
34 35 36 |
# File 'lib/paho_mqtt/handler.rb', line 34 def socket=(socket) @socket = socket end |