Class: PahoMqtt::Handler
- Inherits:
-
Object
- Object
- PahoMqtt::Handler
- Defined in:
- lib/paho_mqtt/handler.rb
Instance Attribute Summary collapse
-
#clean_session ⇒ Object
Returns the value of attribute clean_session.
-
#last_ping_resp ⇒ Object
Returns the value of attribute last_ping_resp.
-
#registered_callback ⇒ Object
readonly
Returns the value of attribute registered_callback.
Instance Method Summary collapse
- #clean_session?(session_flag) ⇒ Boolean
- #clear_topic_callback(topic) ⇒ Object
- #config_pubsub(publisher, subscriber) ⇒ Object
- #handle_connack(packet) ⇒ Object
- #handle_connack_accepted(session_flag) ⇒ Object
- #handle_connack_error(return_code) ⇒ Object
- #handle_packet(packet) ⇒ Object
- #handle_pingresp(_packet) ⇒ Object
- #handle_puback(packet) ⇒ Object
- #handle_pubcomp(packet) ⇒ Object
- #handle_publish(packet) ⇒ Object
- #handle_pubrec(packet) ⇒ Object
- #handle_pubrel(packet) ⇒ Object
- #handle_suback(packet) ⇒ Object
- #handle_unsuback(packet) ⇒ Object
-
#initialize ⇒ Handler
constructor
A new instance of Handler.
- #new_session?(session_flag) ⇒ Boolean
- #old_session?(session_flag) ⇒ Boolean
- #on_connack(&block) ⇒ Object
- #on_connack=(callback) ⇒ Object
- #on_message(&block) ⇒ Object
- #on_message=(callback) ⇒ Object
- #on_puback(&block) ⇒ Object
- #on_puback=(callback) ⇒ Object
- #on_pubcomp(&block) ⇒ Object
- #on_pubcomp=(callback) ⇒ Object
- #on_pubrec(&block) ⇒ Object
- #on_pubrec=(callback) ⇒ Object
- #on_pubrel(&block) ⇒ Object
- #on_pubrel=(callback) ⇒ Object
- #on_suback(&block) ⇒ Object
- #on_suback=(callback) ⇒ Object
- #on_unsuback(&block) ⇒ Object
- #on_unsuback=(callback) ⇒ Object
- #packet_type(packet) ⇒ Object
- #receive_packet ⇒ Object
- #register_topic_callback(topic, callback, &block) ⇒ Object
- #socket=(socket) ⇒ Object
Constructor Details
#initialize ⇒ Handler
22 23 24 25 26 27 |
# File 'lib/paho_mqtt/handler.rb', line 22 def initialize @registered_callback = [] @last_ping_resp = -1 @publisher = nil @subscriber = nil end |
Instance Attribute Details
#clean_session ⇒ Object
Returns the value of attribute clean_session.
20 21 22 |
# File 'lib/paho_mqtt/handler.rb', line 20 def clean_session @clean_session end |
#last_ping_resp ⇒ Object
Returns the value of attribute last_ping_resp.
19 20 21 |
# File 'lib/paho_mqtt/handler.rb', line 19 def last_ping_resp @last_ping_resp end |
#registered_callback ⇒ Object (readonly)
Returns the value of attribute registered_callback.
18 19 20 |
# File 'lib/paho_mqtt/handler.rb', line 18 def registered_callback @registered_callback end |
Instance Method Details
#clean_session?(session_flag) ⇒ Boolean
106 107 108 109 110 |
# File 'lib/paho_mqtt/handler.rb', line 106 def clean_session?(session_flag) if @clean_session && !session_flag PahoMqtt.logger.debug("No previous session found by server, starting a new one.") if PahoMqtt.logger? end end |
#clear_topic_callback(topic) ⇒ Object
74 75 76 77 78 79 80 81 |
# File 'lib/paho_mqtt/handler.rb', line 74 def clear_topic_callback(topic) if topic.nil? PahoMqtt.logger.error("The topics where the callback is trying to be unregistered have been found nil.") if PahoMqtt.logger? raise ArgumentError end @registered_callback.delete_if {|pair| pair.first == topic} MQTT_ERR_SUCCESS end |
#config_pubsub(publisher, subscriber) ⇒ Object
29 30 31 32 |
# File 'lib/paho_mqtt/handler.rb', line 29 def config_pubsub(publisher, subscriber) @publisher = publisher @subscriber = subscriber end |
#handle_connack(packet) ⇒ Object
83 84 85 86 87 88 89 90 91 92 |
# File 'lib/paho_mqtt/handler.rb', line 83 def handle_connack(packet) if packet.return_code == 0x00 PahoMqtt.logger.debug("Connack receive and connection accepted.") if PahoMqtt.logger? handle_connack_accepted(packet.session_present) else handle_connack_error(packet.return_code) end @on_connack.call(packet) unless @on_connack.nil? MQTT_CS_CONNECTED end |
#handle_connack_accepted(session_flag) ⇒ Object
94 95 96 97 98 |
# File 'lib/paho_mqtt/handler.rb', line 94 def handle_connack_accepted(session_flag) clean_session?(session_flag) new_session?(session_flag) old_session?(session_flag) end |
#handle_connack_error(return_code) ⇒ Object
176 177 178 179 180 181 182 183 184 185 186 |
# File 'lib/paho_mqtt/handler.rb', line 176 def handle_connack_error(return_code) if return_code == 0x01 raise LowVersionException elsif CONNACK_ERROR_MESSAGE.has_key(return_code.to_sym) PahoMqtt.logger.warm(CONNACK_ERRO_MESSAGE[return_code]) MQTT_CS_DISCONNECTED else PahoMqtt.logger("Unknown return code for CONNACK packet: #{return_code}") raise PacketException end end |
#handle_packet(packet) ⇒ Object
54 55 56 57 58 |
# File 'lib/paho_mqtt/handler.rb', line 54 def handle_packet(packet) PahoMqtt.logger.info("New packet #{packet.class} recieved.") if PahoMqtt.logger? type = packet_type(packet) self.send("handle_#{type}", packet) end |
#handle_pingresp(_packet) ⇒ Object
118 119 120 |
# File 'lib/paho_mqtt/handler.rb', line 118 def handle_pingresp(_packet) @last_ping_resp = Time.now end |
#handle_puback(packet) ⇒ Object
148 149 150 151 152 153 |
# File 'lib/paho_mqtt/handler.rb', line 148 def handle_puback(packet) id = packet.id if @publisher.do_puback(id) == MQTT_ERR_SUCCESS @on_puback.call(packet) unless @on_puback.nil? end end |
#handle_pubcomp(packet) ⇒ Object
169 170 171 172 173 174 |
# File 'lib/paho_mqtt/handler.rb', line 169 def handle_pubcomp(packet) id = packet.id if @publisher.do_pubcomp(id) == MQTT_ERR_SUCCESS @on_pubcomp.call(packet) unless @on_pubcomp.nil? end end |
#handle_publish(packet) ⇒ Object
139 140 141 142 143 144 145 146 |
# File 'lib/paho_mqtt/handler.rb', line 139 def handle_publish(packet) id = packet.id qos = packet.qos if @publisher.do_publish(qos, id) == MQTT_ERR_SUCCESS .call(packet) unless .nil? @registered_callback.assoc(packet.topic).last.call(packet) if @registered_callback.any? { |pair| pair.first == packet.topic} end end |
#handle_pubrec(packet) ⇒ Object
155 156 157 158 159 160 |
# File 'lib/paho_mqtt/handler.rb', line 155 def handle_pubrec(packet) id = packet.id if @publisher.do_pubrec(id) == MQTT_ERR_SUCCESS @on_pubrec.call(packet) unless @on_pubrec.nil? end end |
#handle_pubrel(packet) ⇒ Object
162 163 164 165 166 167 |
# File 'lib/paho_mqtt/handler.rb', line 162 def handle_pubrel(packet) id = packet.id if @publisher.do_pubrel(id) == MQTT_ERR_SUCCESS @on_pubrel.call(packet) unless @on_pubrel.nil? end end |
#handle_suback(packet) ⇒ Object
122 123 124 125 126 127 128 129 |
# File 'lib/paho_mqtt/handler.rb', line 122 def handle_suback(packet) max_qos = packet.return_codes id = packet.id topics = [] if @subscriber.add_subscription(max_qos, id, topics) == MQTT_ERR_SUCCESS @on_suback.call(topics) unless @on_suback.nil? end end |
#handle_unsuback(packet) ⇒ Object
131 132 133 134 135 136 137 |
# File 'lib/paho_mqtt/handler.rb', line 131 def handle_unsuback(packet) id = packet.id topics = [] if @subscriber.remove_subscription(id, topics) == MQTT_ERR_SUCCESS @on_unsuback.call(topics) unless @on_unsuback.nil? end end |
#new_session?(session_flag) ⇒ Boolean
100 101 102 103 104 |
# File 'lib/paho_mqtt/handler.rb', line 100 def new_session?(session_flag) if !@clean_session && !session_flag PahoMqtt.logger.debug("New session created for the client") if PahoMqtt.logger? end end |
#old_session?(session_flag) ⇒ Boolean
112 113 114 115 116 |
# File 'lib/paho_mqtt/handler.rb', line 112 def old_session?(session_flag) if !@clean_session && session_flag PahoMqtt.logger.debug("Previous session restored by the server.") if PahoMqtt.logger? end end |
#on_connack(&block) ⇒ Object
188 189 190 191 |
# File 'lib/paho_mqtt/handler.rb', line 188 def on_connack(&block) @on_connack = block if block_given? @on_connack end |
#on_connack=(callback) ⇒ Object
228 229 230 |
# File 'lib/paho_mqtt/handler.rb', line 228 def on_connack=(callback) @on_connack = callback if callback.is_a?(Proc) end |
#on_message(&block) ⇒ Object
223 224 225 226 |
# File 'lib/paho_mqtt/handler.rb', line 223 def (&block) = block if block_given? end |
#on_message=(callback) ⇒ Object
256 257 258 |
# File 'lib/paho_mqtt/handler.rb', line 256 def (callback) = callback if callback.is_a?(Proc) end |
#on_puback(&block) ⇒ Object
203 204 205 206 |
# File 'lib/paho_mqtt/handler.rb', line 203 def on_puback(&block) @on_puback = block if block_given? @on_puback end |
#on_puback=(callback) ⇒ Object
240 241 242 |
# File 'lib/paho_mqtt/handler.rb', line 240 def on_puback=(callback) @on_puback = callback if callback.is_a?(Proc) end |
#on_pubcomp(&block) ⇒ Object
218 219 220 221 |
# File 'lib/paho_mqtt/handler.rb', line 218 def on_pubcomp(&block) @on_pubcomp = block if block_given? @on_pubcomp end |
#on_pubcomp=(callback) ⇒ Object
252 253 254 |
# File 'lib/paho_mqtt/handler.rb', line 252 def on_pubcomp=(callback) @on_pubcomp = callback if callback.is_a?(Proc) end |
#on_pubrec(&block) ⇒ Object
208 209 210 211 |
# File 'lib/paho_mqtt/handler.rb', line 208 def on_pubrec(&block) @on_pubrec = block if block_given? @on_pubrec end |
#on_pubrec=(callback) ⇒ Object
244 245 246 |
# File 'lib/paho_mqtt/handler.rb', line 244 def on_pubrec=(callback) @on_pubrec = callback if callback.is_a?(Proc) end |
#on_pubrel(&block) ⇒ Object
213 214 215 216 |
# File 'lib/paho_mqtt/handler.rb', line 213 def on_pubrel(&block) @on_pubrel = block if block_given? @on_pubrel end |
#on_pubrel=(callback) ⇒ Object
248 249 250 |
# File 'lib/paho_mqtt/handler.rb', line 248 def on_pubrel=(callback) @on_pubrel = callback if callback.is_a?(Proc) end |
#on_suback(&block) ⇒ Object
193 194 195 196 |
# File 'lib/paho_mqtt/handler.rb', line 193 def on_suback(&block) @on_suback = block if block_given? @on_suback end |
#on_suback=(callback) ⇒ Object
232 233 234 |
# File 'lib/paho_mqtt/handler.rb', line 232 def on_suback=(callback) @on_suback = callback if callback.is_a?(Proc) end |
#on_unsuback(&block) ⇒ Object
198 199 200 201 |
# File 'lib/paho_mqtt/handler.rb', line 198 def on_unsuback(&block) @on_unsuback = block if block_given? @on_unsuback end |
#on_unsuback=(callback) ⇒ Object
236 237 238 |
# File 'lib/paho_mqtt/handler.rb', line 236 def on_unsuback=(callback) @on_unsuback = callback if callback.is_a?(Proc) end |
#packet_type(packet) ⇒ Object
260 261 262 263 264 265 266 267 268 269 |
# File 'lib/paho_mqtt/handler.rb', line 260 def packet_type(packet) type = packet.class if PahoMqtt::PACKET_TYPES[3..13].include?(type) type.to_s.split('::').last.downcase else puts "Packet: #{packet.inspect}" PahoMqtt.logger.error("Received an unexpeceted packet: #{packet}") if PahoMqtt.logger? raise PacketException end end |
#receive_packet ⇒ Object
38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 |
# File 'lib/paho_mqtt/handler.rb', line 38 def receive_packet result = IO.select([@socket], [], [], SELECT_TIMEOUT) unless @socket.nil? || @socket.closed? unless result.nil? packet = PahoMqtt::Packet::Base.read(@socket) unless packet.nil? if packet.is_a?(PahoMqtt::Packet::Connack) @last_ping_resp = Time.now handle_connack(packet) else handle_packet(packet) @last_ping_resp = Time.now end end end end |
#register_topic_callback(topic, callback, &block) ⇒ Object
60 61 62 63 64 65 66 67 68 69 70 71 72 |
# File 'lib/paho_mqtt/handler.rb', line 60 def register_topic_callback(topic, callback, &block) if topic.nil? PahoMqtt.logger.error("The topics where the callback is trying to be registered have been found nil.") if PahoMqtt.logger? raise ArgumentError end clear_topic_callback(topic) if block_given? @registered_callback.push([topic, block]) elsif !(callback.nil?) && callback.is_a?(Proc) @registered_callback.push([topic, callback]) end MQTT_ERR_SUCCESS end |
#socket=(socket) ⇒ Object
34 35 36 |
# File 'lib/paho_mqtt/handler.rb', line 34 def socket=(socket) @socket = socket end |