Class: PahoMqtt::Client

Inherits:
Object
  • Object
show all
Defined in:
lib/paho.mqtt/paho_client.rb

Constant Summary collapse

ATTR_DEFAULTS =
{
  :logger => nil,
  :host => "",
  :port => nil,
  :mqtt_version => '3.1.1',
  :clean_session => true,
  :persistent => false,
  :client_id => nil,
  :username => nil,
  :password => nil,
  :ssl => false,
  :will_topic => nil,
  :will_payload => nil,
  :will_qos => 0,
  :will_retain => false,
  :keep_alive => 60,
  :ack_timeout => 5,
  :on_connack => nil,
  :on_suback => nil,
  :on_unsuback => nil,
  :on_puback => nil,
  :on_pubrel => nil,
  :on_pubrec => nil,
  :on_pubcomp => nil,
  :on_message => nil,
}

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(*args) ⇒ Client

Returns a new instance of Client.



73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
# File 'lib/paho.mqtt/paho_client.rb', line 73

def initialize(*args)
  if args.last.is_a?(Hash)
    attr = args.pop
  else
    attr = {}
  end

  ATTR_DEFAULTS.merge(attr).each_pair do |k,v|
    self.send("#{k}=", v)
  end

  if @port.nil?
    @port = @ssl ? PahoMqtt::DEFAULT_SSL_PORT : PahoMqtt::DEFAULT_PORT
  end

  if  @client_id.nil? || @client_id == ""
    @client_id = generate_client_id
  end

  @last_ping_req = Time.now
  @last_ping_resp = Time.now
  @last_packet_id = 0
  @socket = nil
  @ssl_context = nil
  @writing_mutex = Mutex.new
  @writing_queue = []
  @connection_state = MQTT_CS_DISCONNECT
  @connection_state_mutex = Mutex.new
  @subscribed_mutex = Mutex.new
  @subscribed_topics = []
  @registered_callback = []
  @waiting_suback = []
  @suback_mutex = Mutex.new
  @waiting_unsuback = []
  @unsuback_mutex = Mutex.new
  @mqtt_thread = nil
  @reconnect_thread = nil

  @puback_mutex = Mutex.new
  @pubrec_mutex = Mutex.new
  @pubrel_mutex = Mutex.new
  @pubcomp_mutex = Mutex.new
  @waiting_puback = []
  @waiting_pubrec = []
  @waiting_pubrel = []
  @waiting_pubcomp = []
end

Instance Attribute Details

#ack_timeoutObject

Returns the value of attribute ack_timeout.



28
29
30
# File 'lib/paho.mqtt/paho_client.rb', line 28

def ack_timeout
  @ack_timeout
end

#clean_sessionObject

Returns the value of attribute clean_session.



14
15
16
# File 'lib/paho.mqtt/paho_client.rb', line 14

def clean_session
  @clean_session
end

#client_idObject

Returns the value of attribute client_id.



15
16
17
# File 'lib/paho.mqtt/paho_client.rb', line 15

def client_id
  @client_id
end

#connection_stateObject (readonly)

Returns the value of attribute connection_state.



44
45
46
# File 'lib/paho.mqtt/paho_client.rb', line 44

def connection_state
  @connection_state
end

#hostObject

Connection related attributes:



11
12
13
# File 'lib/paho.mqtt/paho_client.rb', line 11

def host
  @host
end

#keep_aliveObject

Setting attributes:



27
28
29
# File 'lib/paho.mqtt/paho_client.rb', line 27

def keep_alive
  @keep_alive
end

#loggerObject

Log file



8
9
10
# File 'lib/paho.mqtt/paho_client.rb', line 8

def logger
  @logger
end

#mqtt_versionObject

Returns the value of attribute mqtt_version.



13
14
15
# File 'lib/paho.mqtt/paho_client.rb', line 13

def mqtt_version
  @mqtt_version
end

#on_connack(&block) ⇒ Object

Returns the value of attribute on_connack.



33
34
35
# File 'lib/paho.mqtt/paho_client.rb', line 33

def on_connack
  @on_connack
end

#on_message(&block) ⇒ Object

Callback attributes



32
33
34
# File 'lib/paho.mqtt/paho_client.rb', line 32

def on_message
  @on_message
end

#on_puback(&block) ⇒ Object

Returns the value of attribute on_puback.



36
37
38
# File 'lib/paho.mqtt/paho_client.rb', line 36

def on_puback
  @on_puback
end

#on_pubcomp(&block) ⇒ Object

Returns the value of attribute on_pubcomp.



39
40
41
# File 'lib/paho.mqtt/paho_client.rb', line 39

def on_pubcomp
  @on_pubcomp
end

#on_pubrec(&block) ⇒ Object

Returns the value of attribute on_pubrec.



38
39
40
# File 'lib/paho.mqtt/paho_client.rb', line 38

def on_pubrec
  @on_pubrec
end

#on_pubrel(&block) ⇒ Object

Returns the value of attribute on_pubrel.



37
38
39
# File 'lib/paho.mqtt/paho_client.rb', line 37

def on_pubrel
  @on_pubrel
end

#on_suback(&block) ⇒ Object

Returns the value of attribute on_suback.



34
35
36
# File 'lib/paho.mqtt/paho_client.rb', line 34

def on_suback
  @on_suback
end

#on_unsuback(&block) ⇒ Object

Returns the value of attribute on_unsuback.



35
36
37
# File 'lib/paho.mqtt/paho_client.rb', line 35

def on_unsuback
  @on_unsuback
end

#passwordObject

Returns the value of attribute password.



17
18
19
# File 'lib/paho.mqtt/paho_client.rb', line 17

def password
  @password
end

#persistentObject

Returns the value of attribute persistent.



29
30
31
# File 'lib/paho.mqtt/paho_client.rb', line 29

def persistent
  @persistent
end

#portObject

Returns the value of attribute port.



12
13
14
# File 'lib/paho.mqtt/paho_client.rb', line 12

def port
  @port
end

#registered_callbackObject (readonly)

Read Only attribute



42
43
44
# File 'lib/paho.mqtt/paho_client.rb', line 42

def registered_callback
  @registered_callback
end

#sslObject

Returns the value of attribute ssl.



18
19
20
# File 'lib/paho.mqtt/paho_client.rb', line 18

def ssl
  @ssl
end

#subscribed_topicsObject (readonly)

Returns the value of attribute subscribed_topics.



43
44
45
# File 'lib/paho.mqtt/paho_client.rb', line 43

def subscribed_topics
  @subscribed_topics
end

#usernameObject

Returns the value of attribute username.



16
17
18
# File 'lib/paho.mqtt/paho_client.rb', line 16

def username
  @username
end

#will_payloadObject

Returns the value of attribute will_payload.



22
23
24
# File 'lib/paho.mqtt/paho_client.rb', line 22

def will_payload
  @will_payload
end

#will_qosObject

Returns the value of attribute will_qos.



23
24
25
# File 'lib/paho.mqtt/paho_client.rb', line 23

def will_qos
  @will_qos
end

#will_retainObject

Returns the value of attribute will_retain.



24
25
26
# File 'lib/paho.mqtt/paho_client.rb', line 24

def will_retain
  @will_retain
end

#will_topicObject

Last will attributes:



21
22
23
# File 'lib/paho.mqtt/paho_client.rb', line 21

def will_topic
  @will_topic
end

Instance Method Details

#add_topic_callback(topic, callback = nil, &block) ⇒ Object



304
305
306
307
308
309
310
311
312
313
314
315
316
317
# File 'lib/paho.mqtt/paho_client.rb', line 304

def add_topic_callback(topic, callback=nil, &block)
  if topic.nil?
    @logger.error("The topics where the callback is trying to be registered have been found nil.") if @logger.is_a?(Logger)
    raise ParameterException
  end
  remove_topic_callback(topic)

  if block_given?
    @registered_callback.push([topic, block])
  elsif !(callback.nil?) && callback.class == Proc
    @registered_callback.push([topic, callback])
  end
  MQTT_ERR_SUCCESS
end

#cert=(cert_path) ⇒ Object



138
139
140
# File 'lib/paho.mqtt/paho_client.rb', line 138

def cert=(cert_path)
  ssl_context.cert = OpenSSL::X509::Certificate.new(File.read(cert_path))
end

#config_ssl_context(cert_path, key_path, ca_path = nil) ⇒ Object



126
127
128
129
130
131
132
# File 'lib/paho.mqtt/paho_client.rb', line 126

def config_ssl_context(cert_path, key_path, ca_path=nil)
  @ssl ||= true
  @ssl_context = ssl_context
  self.cert = cert_path
  self.key = key_path
  self.root_ca = ca_path
end

#config_will(topic, payload = "", retain = false, qos = 0) ⇒ Object



153
154
155
156
157
158
# File 'lib/paho.mqtt/paho_client.rb', line 153

def config_will(topic, payload="", retain=false, qos=0)
  @will_topic = topic
  @will_payload = payload
  @will_retain = retain
  @will_qos = qos
end

#connect(host = @host, port = @port, keep_alive = @keep_alive, persistent = @persistent) ⇒ Object



160
161
162
163
164
165
166
# File 'lib/paho.mqtt/paho_client.rb', line 160

def connect(host=@host, port=@port, keep_alive=@keep_alive, persistent=@persistent)
  @persistent = persistent
  @connection_state_mutex.synchronize {
    @connection_state = MQTT_CS_NEW
  }
  connect_async(host, port, keep_alive)
end

#connect_async(host, port = 1883, keep_alive) ⇒ Object



168
169
170
171
172
173
174
175
176
177
# File 'lib/paho.mqtt/paho_client.rb', line 168

def connect_async(host, port=1883, keep_alive)
  @host = host
  @port = port.to_i
  @keep_alive = keep_alive

  @connection_state_mutex.synchronize {
    @connection_state = MQTT_CS_CONNECT_ASYNC
  }
  setup_connection
end

#disconnect(explicit = true) ⇒ Object



233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
# File 'lib/paho.mqtt/paho_client.rb', line 233

def disconnect(explicit=true)
  @logger.debug("Disconnecting from #{@host}") if @logger.is_a?(Logger)

  if explicit
    send_disconnect
    @mqtt_thread.kill if @mqtt_thread && @mqtt_thread.alive?
    @mqtt_thread.kill if @mqtt_thread.alive?
    @last_packet_id = 0

    @writing_mutex.synchronize {
      @writing_queue = []
    }

    @puback_mutex.synchronize {
      @waiting_puback = []
    }

    @pubrec_mutex.synchronize {
      @waiting_pubrec = []
    }

    @pubrel_mutex.synchronize {
      @waiting_pubrel = []
    }

    @pubcomp_mutex.synchronize {
      @waiting_pubcomp = []
    }
  end

  @socket.close unless @socket.nil?
  @socket = nil

  @connection_state_mutex.synchronize {
    @connection_state = MQTT_CS_DISCONNECT
  }
  MQTT_ERR_SUCCESS
end

#generate_client_id(prefix = 'paho_ruby', lenght = 16) ⇒ Object



121
122
123
124
# File 'lib/paho.mqtt/paho_client.rb', line 121

def generate_client_id(prefix='paho_ruby', lenght=16)
  charset = Array('A'..'Z') + Array('a'..'z') + Array('0'..'9')
  @client_id = prefix << Array.new(lenght) { charset.sample }.join
end

#key=(key_path, passphrase = nil) ⇒ Object



142
143
144
# File 'lib/paho.mqtt/paho_client.rb', line 142

def key=(key_path, passphrase=nil)
  ssl_context.key = OpenSSL::PKey::RSA.new(File.read(key_path), passphrase)
end

#loop_miscObject



203
204
205
206
207
208
209
210
211
# File 'lib/paho.mqtt/paho_client.rb', line 203

def loop_misc
  check_keep_alive
  check_ack_alive(@waiting_puback, @puback_mutex, MAX_PUBACK)
  check_ack_alive(@waiting_pubrec, @pubrec_mutex, MAX_PUBREC)
  check_ack_alive(@waiting_pubrel, @pubrel_mutex, MAX_PUBREL)
  check_ack_alive(@waiting_pubcomp, @pubcomp_mutex, MAX_PUBCOMP)
  check_ack_alive(@waiting_suback, @suback_mutex, @waiting_suback.length)
  check_ack_alive(@waiting_unsuback, @unsuback_mutex, @waiting_unsuback.length)
end

#loop_read(max_packet = 5) ⇒ Object



190
191
192
193
194
# File 'lib/paho.mqtt/paho_client.rb', line 190

def loop_read(max_packet=5)
  max_packet.times do
    receive_packet
  end
end

#loop_write(max_packet = MAX_WRITING) ⇒ Object



180
181
182
183
184
185
186
187
188
# File 'lib/paho.mqtt/paho_client.rb', line 180

def loop_write(max_packet=MAX_WRITING)
  @writing_mutex.synchronize {
    cnt = 0
    while !@writing_queue.empty? && cnt < max_packet do
      send_packet(@writing_queue.shift)
      cnt += 1
    end
  }
end

#mqtt_loopObject



196
197
198
199
200
201
# File 'lib/paho.mqtt/paho_client.rb', line 196

def mqtt_loop
  loop_read
  loop_write
  loop_misc
  sleep LOOP_TEMPO
end

#ping_hostObject



300
301
302
# File 'lib/paho.mqtt/paho_client.rb', line 300

def ping_host
  send_pingreq
end

#publish(topic, payload = "", retain = false, qos = 0) ⇒ Object



272
273
274
275
276
277
278
# File 'lib/paho.mqtt/paho_client.rb', line 272

def publish(topic, payload="", retain=false, qos=0)
  if topic == "" || !topic.is_a?(String)
    @logger.error("Publish topics is invalid, not a string or empty.") if @logger.is_a?(Logger)
    raise ParameterException
  end
  send_publish(topic, payload, retain, qos)
end

#reconnect(retry_time = RECONNECT_RETRY_TIME, retry_tempo = RECONNECT_RETRY_TEMPO) ⇒ Object



214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
# File 'lib/paho.mqtt/paho_client.rb', line 214

def reconnect(retry_time=RECONNECT_RETRY_TIME, retry_tempo=RECONNECT_RETRY_TEMPO)
  @reconnect_thread = Thread.new do
    retry_time.times do
      @logger.debug("New reconnect atempt...") if @logger.is_a?(Logger)
      setup_connection
      if @connection_state == MQTT_CS_CONNECTED
        break
      else
        sleep retry_tempo
      end
    end
    if @connection_state != MQTT_CS_CONNECTED
      @logger.error("Reconnection atempt counter is over.(#{RECONNECT_RETRY_TIME} times)") if @logger.is_a?(Logger)
      disconnect(false)
      exit
    end
  end
end

#remove_topic_callback(topic) ⇒ Object



319
320
321
322
323
324
325
326
# File 'lib/paho.mqtt/paho_client.rb', line 319

def remove_topic_callback(topic)
  if topic.nil?
    @logger.error("The topics where the callback is trying to be unregistered have been found nil.") if @logger.is_a?(Logger)
    raise ParameterException
  end
  @registered_callback.delete_if {|pair| pair.first == topic}
  MQTT_ERR_SUCCESS
end

#root_ca=(ca_path) ⇒ Object



146
147
148
149
150
151
# File 'lib/paho.mqtt/paho_client.rb', line 146

def root_ca=(ca_path)
  ssl_context.ca_file = ca_path
  unless @ca_path.nil?
    ssl_context.verify_mode = OpenSSL::SSL::VERIFY_PEER
  end
end

#ssl_contextObject



134
135
136
# File 'lib/paho.mqtt/paho_client.rb', line 134

def ssl_context
  @ssl_context ||= OpenSSL::SSL::SSLContext.new
end

#subscribe(*topics) ⇒ Object



280
281
282
283
284
285
286
287
288
# File 'lib/paho.mqtt/paho_client.rb', line 280

def subscribe(*topics)
  unless topics.length == 0
    send_subscribe(topics)
  else
    @logger.error("Subscribe topics need one topic or a list of topics.") if @logger.is_a?(Logger)
    disconnect(false)
    raise ProtocolViolation
  end
end

#unsubscribe(*topics) ⇒ Object



290
291
292
293
294
295
296
297
298
# File 'lib/paho.mqtt/paho_client.rb', line 290

def unsubscribe(*topics)
  unless topics.length == 0
    send_unsubscribe(topics)
  else
    @logger.error("Unsubscribe need at least one topics.") if @logger.is_a?(Logger)
    disconnect(false)
    raise ProtocolViolation
  end
end