Class: PahoMqtt::Handler
- Inherits:
-
Object
- Object
- PahoMqtt::Handler
- Defined in:
- lib/paho_mqtt/handler.rb
Instance Attribute Summary collapse
-
#clean_session ⇒ Object
Returns the value of attribute clean_session.
-
#last_ping_resp ⇒ Object
Returns the value of attribute last_ping_resp.
-
#registered_callback ⇒ Object
readonly
Returns the value of attribute registered_callback.
Instance Method Summary collapse
- #check_callback(packet) ⇒ Object
- #clean_session?(session_flag) ⇒ Boolean
- #clear_topic_callback(topic) ⇒ Object
- #config_pubsub(publisher, subscriber) ⇒ Object
- #handle_connack(packet) ⇒ Object
- #handle_connack_accepted(session_flag) ⇒ Object
- #handle_connack_error(return_code) ⇒ Object
- #handle_packet(packet) ⇒ Object
- #handle_pingresp(_packet) ⇒ Object
- #handle_puback(packet) ⇒ Object
- #handle_pubcomp(packet) ⇒ Object
- #handle_publish(packet) ⇒ Object
- #handle_pubrec(packet) ⇒ Object
- #handle_pubrel(packet) ⇒ Object
- #handle_suback(packet) ⇒ Object
- #handle_unsuback(packet) ⇒ Object
-
#initialize ⇒ Handler
constructor
A new instance of Handler.
- #new_session?(session_flag) ⇒ Boolean
- #old_session?(session_flag) ⇒ Boolean
- #on_connack(&block) ⇒ Object
- #on_connack=(callback) ⇒ Object
- #on_message(&block) ⇒ Object
- #on_message=(callback) ⇒ Object
- #on_puback(&block) ⇒ Object
- #on_puback=(callback) ⇒ Object
- #on_pubcomp(&block) ⇒ Object
- #on_pubcomp=(callback) ⇒ Object
- #on_pubrec(&block) ⇒ Object
- #on_pubrec=(callback) ⇒ Object
- #on_pubrel(&block) ⇒ Object
- #on_pubrel=(callback) ⇒ Object
- #on_suback(&block) ⇒ Object
- #on_suback=(callback) ⇒ Object
- #on_unsuback(&block) ⇒ Object
- #on_unsuback=(callback) ⇒ Object
- #packet_type(packet) ⇒ Object
- #receive_packet ⇒ Object
- #register_topic_callback(topic, callback, &block) ⇒ Object
- #socket=(socket) ⇒ Object
Constructor Details
#initialize ⇒ Handler
Returns a new instance of Handler.
22 23 24 25 26 27 |
# File 'lib/paho_mqtt/handler.rb', line 22 def initialize @registered_callback = [] @last_ping_resp = -1 @publisher = nil @subscriber = nil end |
Instance Attribute Details
#clean_session ⇒ Object
Returns the value of attribute clean_session.
20 21 22 |
# File 'lib/paho_mqtt/handler.rb', line 20 def clean_session @clean_session end |
#last_ping_resp ⇒ Object
Returns the value of attribute last_ping_resp.
19 20 21 |
# File 'lib/paho_mqtt/handler.rb', line 19 def last_ping_resp @last_ping_resp end |
#registered_callback ⇒ Object (readonly)
Returns the value of attribute registered_callback.
18 19 20 |
# File 'lib/paho_mqtt/handler.rb', line 18 def registered_callback @registered_callback end |
Instance Method Details
#check_callback(packet) ⇒ Object
274 275 276 277 278 279 280 281 282 283 284 |
# File 'lib/paho_mqtt/handler.rb', line 274 def check_callback(packet) callbacks = [] @registered_callback.each do |reccord| callbacks.push(reccord.last) if PahoMqtt.match_filter(packet.topic, reccord.first) end unless callbacks.empty? callbacks.each do |callback| callback.call(packet) end end end |
#clean_session?(session_flag) ⇒ Boolean
106 107 108 109 110 |
# File 'lib/paho_mqtt/handler.rb', line 106 def clean_session?(session_flag) if @clean_session && !session_flag PahoMqtt.logger.debug("No previous session found by server, starting a new one.") if PahoMqtt.logger? end end |
#clear_topic_callback(topic) ⇒ Object
74 75 76 77 78 79 80 81 |
# File 'lib/paho_mqtt/handler.rb', line 74 def clear_topic_callback(topic) if topic.nil? PahoMqtt.logger.error("The topics where the callback is trying to be unregistered have been found nil.") if PahoMqtt.logger? raise ArgumentError end @registered_callback.delete_if {|pair| pair.first == topic} MQTT_ERR_SUCCESS end |
#config_pubsub(publisher, subscriber) ⇒ Object
29 30 31 32 |
# File 'lib/paho_mqtt/handler.rb', line 29 def config_pubsub(publisher, subscriber) @publisher = publisher @subscriber = subscriber end |
#handle_connack(packet) ⇒ Object
83 84 85 86 87 88 89 90 91 92 |
# File 'lib/paho_mqtt/handler.rb', line 83 def handle_connack(packet) if packet.return_code == 0x00 PahoMqtt.logger.debug("Connack receive and connection accepted.") if PahoMqtt.logger? handle_connack_accepted(packet.session_present) else handle_connack_error(packet.return_code) end @on_connack.call(packet) unless @on_connack.nil? MQTT_CS_CONNECTED end |
#handle_connack_accepted(session_flag) ⇒ Object
94 95 96 97 98 |
# File 'lib/paho_mqtt/handler.rb', line 94 def handle_connack_accepted(session_flag) clean_session?(session_flag) new_session?(session_flag) old_session?(session_flag) end |
#handle_connack_error(return_code) ⇒ Object
179 180 181 182 183 184 185 186 187 188 189 |
# File 'lib/paho_mqtt/handler.rb', line 179 def handle_connack_error(return_code) if return_code == 0x01 raise LowVersionException elsif CONNACK_ERROR_MESSAGE.has_key(return_code.to_sym) PahoMqtt.logger.warm(CONNACK_ERRO_MESSAGE[return_code]) MQTT_CS_DISCONNECTED else PahoMqtt.logger("Unknown return code for CONNACK packet: #{return_code}") raise PacketException end end |
#handle_packet(packet) ⇒ Object
54 55 56 57 58 |
# File 'lib/paho_mqtt/handler.rb', line 54 def handle_packet(packet) PahoMqtt.logger.info("New packet #{packet.class} recieved.") if PahoMqtt.logger? type = packet_type(packet) self.send("handle_#{type}", packet) end |
#handle_pingresp(_packet) ⇒ Object
118 119 120 |
# File 'lib/paho_mqtt/handler.rb', line 118 def handle_pingresp(_packet) @last_ping_resp = Time.now end |
#handle_puback(packet) ⇒ Object
151 152 153 154 155 156 |
# File 'lib/paho_mqtt/handler.rb', line 151 def handle_puback(packet) id = packet.id if @publisher.do_puback(id) == MQTT_ERR_SUCCESS @on_puback.call(packet) unless @on_puback.nil? end end |
#handle_pubcomp(packet) ⇒ Object
172 173 174 175 176 177 |
# File 'lib/paho_mqtt/handler.rb', line 172 def handle_pubcomp(packet) id = packet.id if @publisher.do_pubcomp(id) == MQTT_ERR_SUCCESS @on_pubcomp.call(packet) unless @on_pubcomp.nil? end end |
#handle_publish(packet) ⇒ Object
142 143 144 145 146 147 148 149 |
# File 'lib/paho_mqtt/handler.rb', line 142 def handle_publish(packet) id = packet.id qos = packet.qos if @publisher.do_publish(qos, id) == MQTT_ERR_SUCCESS .call(packet) unless .nil? check_callback(packet) end end |
#handle_pubrec(packet) ⇒ Object
158 159 160 161 162 163 |
# File 'lib/paho_mqtt/handler.rb', line 158 def handle_pubrec(packet) id = packet.id if @publisher.do_pubrec(id) == MQTT_ERR_SUCCESS @on_pubrec.call(packet) unless @on_pubrec.nil? end end |
#handle_pubrel(packet) ⇒ Object
165 166 167 168 169 170 |
# File 'lib/paho_mqtt/handler.rb', line 165 def handle_pubrel(packet) id = packet.id if @publisher.do_pubrel(id) == MQTT_ERR_SUCCESS @on_pubrel.call(packet) unless @on_pubrel.nil? end end |
#handle_suback(packet) ⇒ Object
122 123 124 125 126 127 128 129 130 |
# File 'lib/paho_mqtt/handler.rb', line 122 def handle_suback(packet) max_qos = packet.return_codes id = packet.id topics = [] topics = @subscriber.add_subscription(max_qos, id, topics) unless topics.empty? @on_suback.call(topics) unless @on_suback.nil? end end |
#handle_unsuback(packet) ⇒ Object
132 133 134 135 136 137 138 139 140 |
# File 'lib/paho_mqtt/handler.rb', line 132 def handle_unsuback(packet) id = packet.id topics = [] topics = @subscriber.remove_subscription(id, topics) unless topcis.empty? puts "PAHO TOPICS: #{topics}" @on_unsuback.call(topics) unless @on_unsuback.nil? end end |
#new_session?(session_flag) ⇒ Boolean
100 101 102 103 104 |
# File 'lib/paho_mqtt/handler.rb', line 100 def new_session?(session_flag) if !@clean_session && !session_flag PahoMqtt.logger.debug("New session created for the client") if PahoMqtt.logger? end end |
#old_session?(session_flag) ⇒ Boolean
112 113 114 115 116 |
# File 'lib/paho_mqtt/handler.rb', line 112 def old_session?(session_flag) if !@clean_session && session_flag PahoMqtt.logger.debug("Previous session restored by the server.") if PahoMqtt.logger? end end |
#on_connack(&block) ⇒ Object
191 192 193 194 |
# File 'lib/paho_mqtt/handler.rb', line 191 def on_connack(&block) @on_connack = block if block_given? @on_connack end |
#on_connack=(callback) ⇒ Object
231 232 233 |
# File 'lib/paho_mqtt/handler.rb', line 231 def on_connack=(callback) @on_connack = callback if callback.is_a?(Proc) end |
#on_message(&block) ⇒ Object
226 227 228 229 |
# File 'lib/paho_mqtt/handler.rb', line 226 def (&block) = block if block_given? end |
#on_message=(callback) ⇒ Object
259 260 261 |
# File 'lib/paho_mqtt/handler.rb', line 259 def (callback) = callback if callback.is_a?(Proc) end |
#on_puback(&block) ⇒ Object
206 207 208 209 |
# File 'lib/paho_mqtt/handler.rb', line 206 def on_puback(&block) @on_puback = block if block_given? @on_puback end |
#on_puback=(callback) ⇒ Object
243 244 245 |
# File 'lib/paho_mqtt/handler.rb', line 243 def on_puback=(callback) @on_puback = callback if callback.is_a?(Proc) end |
#on_pubcomp(&block) ⇒ Object
221 222 223 224 |
# File 'lib/paho_mqtt/handler.rb', line 221 def on_pubcomp(&block) @on_pubcomp = block if block_given? @on_pubcomp end |
#on_pubcomp=(callback) ⇒ Object
255 256 257 |
# File 'lib/paho_mqtt/handler.rb', line 255 def on_pubcomp=(callback) @on_pubcomp = callback if callback.is_a?(Proc) end |
#on_pubrec(&block) ⇒ Object
211 212 213 214 |
# File 'lib/paho_mqtt/handler.rb', line 211 def on_pubrec(&block) @on_pubrec = block if block_given? @on_pubrec end |
#on_pubrec=(callback) ⇒ Object
247 248 249 |
# File 'lib/paho_mqtt/handler.rb', line 247 def on_pubrec=(callback) @on_pubrec = callback if callback.is_a?(Proc) end |
#on_pubrel(&block) ⇒ Object
216 217 218 219 |
# File 'lib/paho_mqtt/handler.rb', line 216 def on_pubrel(&block) @on_pubrel = block if block_given? @on_pubrel end |
#on_pubrel=(callback) ⇒ Object
251 252 253 |
# File 'lib/paho_mqtt/handler.rb', line 251 def on_pubrel=(callback) @on_pubrel = callback if callback.is_a?(Proc) end |
#on_suback(&block) ⇒ Object
196 197 198 199 |
# File 'lib/paho_mqtt/handler.rb', line 196 def on_suback(&block) @on_suback = block if block_given? @on_suback end |
#on_suback=(callback) ⇒ Object
235 236 237 |
# File 'lib/paho_mqtt/handler.rb', line 235 def on_suback=(callback) @on_suback = callback if callback.is_a?(Proc) end |
#on_unsuback(&block) ⇒ Object
201 202 203 204 |
# File 'lib/paho_mqtt/handler.rb', line 201 def on_unsuback(&block) @on_unsuback = block if block_given? @on_unsuback end |
#on_unsuback=(callback) ⇒ Object
239 240 241 |
# File 'lib/paho_mqtt/handler.rb', line 239 def on_unsuback=(callback) @on_unsuback = callback if callback.is_a?(Proc) end |
#packet_type(packet) ⇒ Object
263 264 265 266 267 268 269 270 271 272 |
# File 'lib/paho_mqtt/handler.rb', line 263 def packet_type(packet) type = packet.class if PahoMqtt::PACKET_TYPES[3..13].include?(type) type.to_s.split('::').last.downcase else puts "Packet: #{packet.inspect}" PahoMqtt.logger.error("Received an unexpeceted packet: #{packet}") if PahoMqtt.logger? raise PacketException end end |
#receive_packet ⇒ Object
38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 |
# File 'lib/paho_mqtt/handler.rb', line 38 def receive_packet result = IO.select([@socket], [], [], SELECT_TIMEOUT) unless @socket.nil? || @socket.closed? unless result.nil? packet = PahoMqtt::Packet::Base.read(@socket) unless packet.nil? if packet.is_a?(PahoMqtt::Packet::Connack) @last_ping_resp = Time.now handle_connack(packet) else handle_packet(packet) @last_ping_resp = Time.now end end end end |
#register_topic_callback(topic, callback, &block) ⇒ Object
60 61 62 63 64 65 66 67 68 69 70 71 72 |
# File 'lib/paho_mqtt/handler.rb', line 60 def register_topic_callback(topic, callback, &block) if topic.nil? PahoMqtt.logger.error("The topics where the callback is trying to be registered have been found nil.") if PahoMqtt.logger? raise ArgumentError end clear_topic_callback(topic) if block_given? @registered_callback.push([topic, block]) elsif !(callback.nil?) && callback.is_a?(Proc) @registered_callback.push([topic, callback]) end MQTT_ERR_SUCCESS end |
#socket=(socket) ⇒ Object
34 35 36 |
# File 'lib/paho_mqtt/handler.rb', line 34 def socket=(socket) @socket = socket end |