Class: PahoMqtt::Handler

Inherits:
Object
  • Object
show all
Defined in:
lib/paho_mqtt/handler.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeHandler

Returns a new instance of Handler.



22
23
24
25
26
27
# File 'lib/paho_mqtt/handler.rb', line 22

def initialize
  @registered_callback = []
  @last_ping_resp = -1
  @publisher = nil
  @subscriber = nil
end

Instance Attribute Details

#clean_sessionObject

Returns the value of attribute clean_session.



20
21
22
# File 'lib/paho_mqtt/handler.rb', line 20

def clean_session
  @clean_session
end

#last_ping_respObject

Returns the value of attribute last_ping_resp.



19
20
21
# File 'lib/paho_mqtt/handler.rb', line 19

def last_ping_resp
  @last_ping_resp
end

#registered_callbackObject (readonly)

Returns the value of attribute registered_callback.



18
19
20
# File 'lib/paho_mqtt/handler.rb', line 18

def registered_callback
  @registered_callback
end

Instance Method Details

#check_callback(packet) ⇒ Object



273
274
275
276
277
278
279
280
281
282
283
# File 'lib/paho_mqtt/handler.rb', line 273

def check_callback(packet)
  callbacks = []
  @registered_callback.each do |reccord|
    callbacks.push(reccord.last) if PahoMqtt.match_filter(packet.topic, reccord.first)
  end
  unless callbacks.empty?
    callbacks.each do |callback|
        callback.call(packet)
    end
  end
end

#clean_session?(session_flag) ⇒ Boolean

Returns:

  • (Boolean)


106
107
108
109
110
# File 'lib/paho_mqtt/handler.rb', line 106

def clean_session?(session_flag)
  if @clean_session && !session_flag
    PahoMqtt.logger.debug("No previous session found by server, starting a new one.") if PahoMqtt.logger?
  end
end

#clear_topic_callback(topic) ⇒ Object



74
75
76
77
78
79
80
81
# File 'lib/paho_mqtt/handler.rb', line 74

def clear_topic_callback(topic)
  if topic.nil?
    PahoMqtt.logger.error("The topics where the callback is trying to be unregistered have been found nil.") if PahoMqtt.logger?
    raise ArgumentError
  end
  @registered_callback.delete_if {|pair| pair.first == topic}
  MQTT_ERR_SUCCESS
end

#config_pubsub(publisher, subscriber) ⇒ Object



29
30
31
32
# File 'lib/paho_mqtt/handler.rb', line 29

def config_pubsub(publisher, subscriber)
  @publisher = publisher
  @subscriber = subscriber
end

#handle_connack(packet) ⇒ Object



83
84
85
86
87
88
89
90
91
92
# File 'lib/paho_mqtt/handler.rb', line 83

def handle_connack(packet)
  if packet.return_code == 0x00
    PahoMqtt.logger.debug("Connack receive and connection accepted.") if PahoMqtt.logger?
    handle_connack_accepted(packet.session_present)
  else
    handle_connack_error(packet.return_code)
  end
  @on_connack.call(packet) unless @on_connack.nil?
  MQTT_CS_CONNECTED
end

#handle_connack_accepted(session_flag) ⇒ Object



94
95
96
97
98
# File 'lib/paho_mqtt/handler.rb', line 94

def handle_connack_accepted(session_flag)
  clean_session?(session_flag)
  new_session?(session_flag)
  old_session?(session_flag)
end

#handle_connack_error(return_code) ⇒ Object



178
179
180
181
182
183
184
185
186
187
188
# File 'lib/paho_mqtt/handler.rb', line 178

def handle_connack_error(return_code)
  if return_code == 0x01
    raise LowVersionException
  elsif CONNACK_ERROR_MESSAGE.has_key(return_code.to_sym)
    PahoMqtt.logger.warm(CONNACK_ERRO_MESSAGE[return_code])
    MQTT_CS_DISCONNECTED
  else
    PahoMqtt.logger("Unknown return code for CONNACK packet: #{return_code}")
    raise PacketException
  end
end

#handle_packet(packet) ⇒ Object



54
55
56
57
58
# File 'lib/paho_mqtt/handler.rb', line 54

def handle_packet(packet)
  PahoMqtt.logger.info("New packet #{packet.class} recieved.") if PahoMqtt.logger?
  type = packet_type(packet)
  self.send("handle_#{type}", packet)
end

#handle_pingresp(_packet) ⇒ Object



118
119
120
# File 'lib/paho_mqtt/handler.rb', line 118

def handle_pingresp(_packet)
  @last_ping_resp = Time.now
end

#handle_puback(packet) ⇒ Object



150
151
152
153
154
155
# File 'lib/paho_mqtt/handler.rb', line 150

def handle_puback(packet)
  id = packet.id
  if @publisher.do_puback(id) == MQTT_ERR_SUCCESS
    @on_puback.call(packet) unless @on_puback.nil?
  end
end

#handle_pubcomp(packet) ⇒ Object



171
172
173
174
175
176
# File 'lib/paho_mqtt/handler.rb', line 171

def handle_pubcomp(packet)
  id = packet.id
  if @publisher.do_pubcomp(id) == MQTT_ERR_SUCCESS
    @on_pubcomp.call(packet) unless @on_pubcomp.nil?
  end
end

#handle_publish(packet) ⇒ Object



141
142
143
144
145
146
147
148
# File 'lib/paho_mqtt/handler.rb', line 141

def handle_publish(packet)
  id = packet.id
  qos = packet.qos
  if @publisher.do_publish(qos, id) == MQTT_ERR_SUCCESS
    @on_message.call(packet) unless @on_message.nil?
    check_callback(packet)
  end
end

#handle_pubrec(packet) ⇒ Object



157
158
159
160
161
162
# File 'lib/paho_mqtt/handler.rb', line 157

def handle_pubrec(packet)
  id = packet.id
  if @publisher.do_pubrec(id) == MQTT_ERR_SUCCESS
    @on_pubrec.call(packet) unless @on_pubrec.nil?
  end
end

#handle_pubrel(packet) ⇒ Object



164
165
166
167
168
169
# File 'lib/paho_mqtt/handler.rb', line 164

def handle_pubrel(packet)
  id = packet.id
  if @publisher.do_pubrel(id) == MQTT_ERR_SUCCESS
    @on_pubrel.call(packet) unless @on_pubrel.nil?
  end
end

#handle_suback(packet) ⇒ Object



122
123
124
125
126
127
128
129
130
# File 'lib/paho_mqtt/handler.rb', line 122

def handle_suback(packet)
  max_qos = packet.return_codes
  id = packet.id
  topics = []
  topics = @subscriber.add_subscription(max_qos, id, topics)
  unless topics.empty?
    @on_suback.call(topics) unless @on_suback.nil?
  end
end

#handle_unsuback(packet) ⇒ Object



132
133
134
135
136
137
138
139
# File 'lib/paho_mqtt/handler.rb', line 132

def handle_unsuback(packet)
  id = packet.id
  topics = []
  topics = @subscriber.remove_subscription(id, topics)
  unless topics.empty?
    @on_unsuback.call(topics) unless @on_unsuback.nil?
  end
end

#new_session?(session_flag) ⇒ Boolean

Returns:

  • (Boolean)


100
101
102
103
104
# File 'lib/paho_mqtt/handler.rb', line 100

def new_session?(session_flag)
  if !@clean_session && !session_flag
    PahoMqtt.logger.debug("New session created for the client") if PahoMqtt.logger?
  end
end

#old_session?(session_flag) ⇒ Boolean

Returns:

  • (Boolean)


112
113
114
115
116
# File 'lib/paho_mqtt/handler.rb', line 112

def old_session?(session_flag)
  if !@clean_session && session_flag
    PahoMqtt.logger.debug("Previous session restored by the server.") if PahoMqtt.logger?
  end
end

#on_connack(&block) ⇒ Object



190
191
192
193
# File 'lib/paho_mqtt/handler.rb', line 190

def on_connack(&block)
  @on_connack = block if block_given?
  @on_connack
end

#on_connack=(callback) ⇒ Object



230
231
232
# File 'lib/paho_mqtt/handler.rb', line 230

def on_connack=(callback)
  @on_connack = callback if callback.is_a?(Proc)
end

#on_message(&block) ⇒ Object



225
226
227
228
# File 'lib/paho_mqtt/handler.rb', line 225

def on_message(&block)
  @on_message = block if block_given?
  @on_message
end

#on_message=(callback) ⇒ Object



258
259
260
# File 'lib/paho_mqtt/handler.rb', line 258

def on_message=(callback)
  @on_message = callback if callback.is_a?(Proc)
end

#on_puback(&block) ⇒ Object



205
206
207
208
# File 'lib/paho_mqtt/handler.rb', line 205

def on_puback(&block)
  @on_puback = block if block_given?
  @on_puback
end

#on_puback=(callback) ⇒ Object



242
243
244
# File 'lib/paho_mqtt/handler.rb', line 242

def on_puback=(callback)
  @on_puback = callback if callback.is_a?(Proc)
end

#on_pubcomp(&block) ⇒ Object



220
221
222
223
# File 'lib/paho_mqtt/handler.rb', line 220

def on_pubcomp(&block)
  @on_pubcomp = block if block_given?
  @on_pubcomp
end

#on_pubcomp=(callback) ⇒ Object



254
255
256
# File 'lib/paho_mqtt/handler.rb', line 254

def on_pubcomp=(callback)
  @on_pubcomp = callback if callback.is_a?(Proc)
end

#on_pubrec(&block) ⇒ Object



210
211
212
213
# File 'lib/paho_mqtt/handler.rb', line 210

def on_pubrec(&block)
  @on_pubrec = block if block_given?
  @on_pubrec
end

#on_pubrec=(callback) ⇒ Object



246
247
248
# File 'lib/paho_mqtt/handler.rb', line 246

def on_pubrec=(callback)
  @on_pubrec = callback if callback.is_a?(Proc)
end

#on_pubrel(&block) ⇒ Object



215
216
217
218
# File 'lib/paho_mqtt/handler.rb', line 215

def on_pubrel(&block)
  @on_pubrel = block if block_given?
  @on_pubrel
end

#on_pubrel=(callback) ⇒ Object



250
251
252
# File 'lib/paho_mqtt/handler.rb', line 250

def on_pubrel=(callback)
  @on_pubrel = callback if callback.is_a?(Proc)
end

#on_suback(&block) ⇒ Object



195
196
197
198
# File 'lib/paho_mqtt/handler.rb', line 195

def on_suback(&block)
  @on_suback = block if block_given?
  @on_suback
end

#on_suback=(callback) ⇒ Object



234
235
236
# File 'lib/paho_mqtt/handler.rb', line 234

def on_suback=(callback)
  @on_suback = callback if callback.is_a?(Proc)
end

#on_unsuback(&block) ⇒ Object



200
201
202
203
# File 'lib/paho_mqtt/handler.rb', line 200

def on_unsuback(&block)
  @on_unsuback = block if block_given?
  @on_unsuback
end

#on_unsuback=(callback) ⇒ Object



238
239
240
# File 'lib/paho_mqtt/handler.rb', line 238

def on_unsuback=(callback)
  @on_unsuback = callback if callback.is_a?(Proc)
end

#packet_type(packet) ⇒ Object



262
263
264
265
266
267
268
269
270
271
# File 'lib/paho_mqtt/handler.rb', line 262

def packet_type(packet)
  type = packet.class
  if PahoMqtt::PACKET_TYPES[3..13].include?(type)
    type.to_s.split('::').last.downcase
  else
    puts "Packet: #{packet.inspect}"
    PahoMqtt.logger.error("Received an unexpeceted packet: #{packet}") if PahoMqtt.logger?
     raise PacketException
  end
end

#receive_packetObject



38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
# File 'lib/paho_mqtt/handler.rb', line 38

def receive_packet
  result = IO.select([@socket], [], [], SELECT_TIMEOUT) unless @socket.nil? || @socket.closed?
  unless result.nil?
    packet = PahoMqtt::Packet::Base.read(@socket)
    unless packet.nil?
      if packet.is_a?(PahoMqtt::Packet::Connack)
        @last_ping_resp = Time.now
        handle_connack(packet)
      else
        handle_packet(packet)
        @last_ping_resp = Time.now
      end
    end
  end
end

#register_topic_callback(topic, callback, &block) ⇒ Object



60
61
62
63
64
65
66
67
68
69
70
71
72
# File 'lib/paho_mqtt/handler.rb', line 60

def register_topic_callback(topic, callback, &block)
  if topic.nil?
    PahoMqtt.logger.error("The topics where the callback is trying to be registered have been found nil.") if PahoMqtt.logger?
    raise ArgumentError
  end
  clear_topic_callback(topic)
  if block_given?
    @registered_callback.push([topic, block])
  elsif !(callback.nil?) && callback.is_a?(Proc)
    @registered_callback.push([topic, callback])
  end
  MQTT_ERR_SUCCESS
end

#socket=(socket) ⇒ Object



34
35
36
# File 'lib/paho_mqtt/handler.rb', line 34

def socket=(socket)
  @socket = socket
end