Class: PahoMqtt::Client

Inherits:
Object
  • Object
show all
Defined in:
lib/paho_mqtt/client.rb

Constant Summary collapse

ATTR_DEFAULTS =
{
  :logger => nil,
  :host => "",
  :port => nil,
  :mqtt_version => '3.1.1',
  :clean_session => true,
  :persistent => false,
  :blocking => false,
  :client_id => nil,
  :username => nil,
  :password => nil,
  :ssl => false,
  :will_topic => nil,
  :will_payload => nil,
  :will_qos => 0,
  :will_retain => false,
  :keep_alive => 60,
  :ack_timeout => 5,
  :on_connack => nil,
  :on_suback => nil,
  :on_unsuback => nil,
  :on_puback => nil,
  :on_pubrel => nil,
  :on_pubrec => nil,
  :on_pubcomp => nil,
  :on_message => nil,
}

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(*args) ⇒ Client

Returns a new instance of Client.



75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
# File 'lib/paho_mqtt/client.rb', line 75

def initialize(*args)
  if args.last.is_a?(Hash)
    attr = args.pop
  else
    attr = {}
  end

  ATTR_DEFAULTS.merge(attr).each_pair do |k,v|
    self.send("#{k}=", v)
  end

  if @port.nil?
    @port
    if @ssl
      @port = PahoMqtt::DEFAULT_SSL_PORT
    else
      @port = PahoMqtt::DEFAULT_PORT
    end
  end

  if  @client_id.nil? || @client_id == ""
    @client_id = generate_client_id
  end

  @last_ping_req = Time.now
  @last_ping_resp = Time.now
  @last_packet_id = 0
  @socket = nil
  @ssl_context = nil
  @writing_mutex = Mutex.new
  @writing_queue = []
  @connection_state = MQTT_CS_DISCONNECT
  @connection_state_mutex = Mutex.new
  @subscribed_mutex = Mutex.new
  @subscribed_topics = []
  @registered_callback = []
  @waiting_suback = []
  @suback_mutex = Mutex.new
  @waiting_unsuback = []
  @unsuback_mutex = Mutex.new
  @mqtt_thread = nil
  @reconnect_thread = nil

  @puback_mutex = Mutex.new
  @pubrec_mutex = Mutex.new
  @pubrel_mutex = Mutex.new
  @pubcomp_mutex = Mutex.new
  @waiting_puback = []
  @waiting_pubrec = []
  @waiting_pubrel = []
  @waiting_pubcomp = []
end

Instance Attribute Details

#ack_timeoutObject

Returns the value of attribute ack_timeout.



30
31
32
# File 'lib/paho_mqtt/client.rb', line 30

def ack_timeout
  @ack_timeout
end

#blockingObject

Returns the value of attribute blocking.



16
17
18
# File 'lib/paho_mqtt/client.rb', line 16

def blocking
  @blocking
end

#clean_sessionObject

Returns the value of attribute clean_session.



14
15
16
# File 'lib/paho_mqtt/client.rb', line 14

def clean_session
  @clean_session
end

#client_idObject

Returns the value of attribute client_id.



17
18
19
# File 'lib/paho_mqtt/client.rb', line 17

def client_id
  @client_id
end

#connection_stateObject (readonly)

Returns the value of attribute connection_state.



45
46
47
# File 'lib/paho_mqtt/client.rb', line 45

def connection_state
  @connection_state
end

#hostObject

Connection related attributes:



11
12
13
# File 'lib/paho_mqtt/client.rb', line 11

def host
  @host
end

#keep_aliveObject

Timeout attributes:



29
30
31
# File 'lib/paho_mqtt/client.rb', line 29

def keep_alive
  @keep_alive
end

#loggerObject

Log file



8
9
10
# File 'lib/paho_mqtt/client.rb', line 8

def logger
  @logger
end

#mqtt_versionObject

Returns the value of attribute mqtt_version.



13
14
15
# File 'lib/paho_mqtt/client.rb', line 13

def mqtt_version
  @mqtt_version
end

#on_connack(&block) ⇒ Object

Returns the value of attribute on_connack.



34
35
36
# File 'lib/paho_mqtt/client.rb', line 34

def on_connack
  @on_connack
end

#on_message(&block) ⇒ Object

Callback attributes



33
34
35
# File 'lib/paho_mqtt/client.rb', line 33

def on_message
  @on_message
end

#on_puback(&block) ⇒ Object

Returns the value of attribute on_puback.



37
38
39
# File 'lib/paho_mqtt/client.rb', line 37

def on_puback
  @on_puback
end

#on_pubcomp(&block) ⇒ Object

Returns the value of attribute on_pubcomp.



40
41
42
# File 'lib/paho_mqtt/client.rb', line 40

def on_pubcomp
  @on_pubcomp
end

#on_pubrec(&block) ⇒ Object

Returns the value of attribute on_pubrec.



39
40
41
# File 'lib/paho_mqtt/client.rb', line 39

def on_pubrec
  @on_pubrec
end

#on_pubrel(&block) ⇒ Object

Returns the value of attribute on_pubrel.



38
39
40
# File 'lib/paho_mqtt/client.rb', line 38

def on_pubrel
  @on_pubrel
end

#on_suback(&block) ⇒ Object

Returns the value of attribute on_suback.



35
36
37
# File 'lib/paho_mqtt/client.rb', line 35

def on_suback
  @on_suback
end

#on_unsuback(&block) ⇒ Object

Returns the value of attribute on_unsuback.



36
37
38
# File 'lib/paho_mqtt/client.rb', line 36

def on_unsuback
  @on_unsuback
end

#passwordObject

Returns the value of attribute password.



19
20
21
# File 'lib/paho_mqtt/client.rb', line 19

def password
  @password
end

#persistentObject

Returns the value of attribute persistent.



15
16
17
# File 'lib/paho_mqtt/client.rb', line 15

def persistent
  @persistent
end

#portObject

Returns the value of attribute port.



12
13
14
# File 'lib/paho_mqtt/client.rb', line 12

def port
  @port
end

#registered_callbackObject (readonly)

Read Only attribute



43
44
45
# File 'lib/paho_mqtt/client.rb', line 43

def registered_callback
  @registered_callback
end

#sslObject

Returns the value of attribute ssl.



20
21
22
# File 'lib/paho_mqtt/client.rb', line 20

def ssl
  @ssl
end

#subscribed_topicsObject (readonly)

Returns the value of attribute subscribed_topics.



44
45
46
# File 'lib/paho_mqtt/client.rb', line 44

def subscribed_topics
  @subscribed_topics
end

#usernameObject

Returns the value of attribute username.



18
19
20
# File 'lib/paho_mqtt/client.rb', line 18

def username
  @username
end

#will_payloadObject

Returns the value of attribute will_payload.



24
25
26
# File 'lib/paho_mqtt/client.rb', line 24

def will_payload
  @will_payload
end

#will_qosObject

Returns the value of attribute will_qos.



25
26
27
# File 'lib/paho_mqtt/client.rb', line 25

def will_qos
  @will_qos
end

#will_retainObject

Returns the value of attribute will_retain.



26
27
28
# File 'lib/paho_mqtt/client.rb', line 26

def will_retain
  @will_retain
end

#will_topicObject

Last will attributes:



23
24
25
# File 'lib/paho_mqtt/client.rb', line 23

def will_topic
  @will_topic
end

Instance Method Details

#add_topic_callback(topic, callback = nil, &block) ⇒ Object



301
302
303
304
305
306
307
308
309
310
311
312
313
314
# File 'lib/paho_mqtt/client.rb', line 301

def add_topic_callback(topic, callback=nil, &block)
  if topic.nil?
    @logger.error("The topics where the callback is trying to be registered have been found nil.") if @logger.is_a?(Logger)
    raise ParameterException
  end
  remove_topic_callback(topic)

  if block_given?
    @registered_callback.push([topic, block])
  elsif !(callback.nil?) && callback.class == Proc
    @registered_callback.push([topic, callback])
  end
  MQTT_ERR_SUCCESS
end

#config_ssl_context(cert_path, key_path, ca_path = nil) ⇒ Object



137
138
139
140
141
142
143
# File 'lib/paho_mqtt/client.rb', line 137

def config_ssl_context(cert_path, key_path, ca_path=nil)
  @ssl ||= true
  @ssl_context = ssl_context
  self.cert = cert_path
  self.key = key_path
  self.root_ca = ca_path
end

#config_will(topic, payload = "", retain = false, qos = 0) ⇒ Object



145
146
147
148
149
150
# File 'lib/paho_mqtt/client.rb', line 145

def config_will(topic, payload="", retain=false, qos=0)
  @will_topic = topic
  @will_payload = payload
  @will_retain = retain
  @will_qos = qos
end

#connect(host = @host, port = @port, keep_alive = @keep_alive, persistent = @persistent, blocking = @blocking) ⇒ Object



152
153
154
155
156
157
158
159
160
161
162
163
# File 'lib/paho_mqtt/client.rb', line 152

def connect(host=@host, port=@port, keep_alive=@keep_alive, persistent=@persistent, blocking=@blocking)
  @persistent = persistent
  @blocking = blocking
  @host = host
  @port = port.to_i
  @keep_alive = keep_alive
  @connection_state_mutex.synchronize {
    @connection_state = MQTT_CS_NEW
  }
  setup_connection
  connect_async(host, port, keep_alive) unless @blocking
end

#connect_async(host, port = 1883, keep_alive) ⇒ Object



165
166
167
168
169
170
171
172
# File 'lib/paho_mqtt/client.rb', line 165

def connect_async(host, port=1883, keep_alive)
  @mqtt_thread = Thread.new do
    @reconnect_thread.kill unless @reconnect_thread.nil? || !@reconnect_thread.alive?
    while connected? do
      mqtt_loop
    end
  end
end

#connected?Boolean

Returns:

  • (Boolean)


174
175
176
# File 'lib/paho_mqtt/client.rb', line 174

def connected?
  @connection_state == MQTT_CS_CONNECTED
end

#disconnect(explicit = true) ⇒ Object



230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
# File 'lib/paho_mqtt/client.rb', line 230

def disconnect(explicit=true)
  @logger.debug("Disconnecting from #{@host}") if @logger.is_a?(Logger)

  if explicit
    send_disconnect
    @mqtt_thread.kill if @mqtt_thread && @mqtt_thread.alive?
    @mqtt_thread.kill if @mqtt_thread.alive?
    @last_packet_id = 0

    @writing_mutex.synchronize {
      @writing_queue = []
    }

    @puback_mutex.synchronize {
      @waiting_puback = []
    }

    @pubrec_mutex.synchronize {
      @waiting_pubrec = []
    }

    @pubrel_mutex.synchronize {
      @waiting_pubrel = []
    }

    @pubcomp_mutex.synchronize {
      @waiting_pubcomp = []
    }
  end

  @socket.close unless @socket.nil? || @socket.closed?
  @socket = nil

  @connection_state_mutex.synchronize {
    @connection_state = MQTT_CS_DISCONNECT
  }
  MQTT_ERR_SUCCESS
end

#generate_client_id(prefix = 'paho_ruby', lenght = 16) ⇒ Object



128
129
130
131
# File 'lib/paho_mqtt/client.rb', line 128

def generate_client_id(prefix='paho_ruby', lenght=16)
  charset = Array('A'..'Z') + Array('a'..'z') + Array('0'..'9')
  @client_id = prefix << Array.new(lenght) { charset.sample }.join
end

#loop_miscObject



201
202
203
204
205
206
207
208
209
# File 'lib/paho_mqtt/client.rb', line 201

def loop_misc
  check_keep_alive
  check_ack_alive(@waiting_puback, @puback_mutex, MAX_PUBACK)
  check_ack_alive(@waiting_pubrec, @pubrec_mutex, MAX_PUBREC)
  check_ack_alive(@waiting_pubrel, @pubrel_mutex, MAX_PUBREL)
  check_ack_alive(@waiting_pubcomp, @pubcomp_mutex, MAX_PUBCOMP)
  check_ack_alive(@waiting_suback, @suback_mutex, @waiting_suback.length)
  check_ack_alive(@waiting_unsuback, @unsuback_mutex, @waiting_unsuback.length)
end

#loop_read(max_packet = MAX_READ) ⇒ Object



188
189
190
191
192
# File 'lib/paho_mqtt/client.rb', line 188

def loop_read(max_packet=MAX_READ)
  max_packet.times do
    receive_packet
  end
end

#loop_write(max_packet = MAX_WRITING) ⇒ Object



178
179
180
181
182
183
184
185
186
# File 'lib/paho_mqtt/client.rb', line 178

def loop_write(max_packet=MAX_WRITING)
  @writing_mutex.synchronize {
    cnt = 0
    while !@writing_queue.empty? && cnt < max_packet do
      send_packet(@writing_queue.shift)
      cnt += 1
    end
  }
end

#mqtt_loopObject



194
195
196
197
198
199
# File 'lib/paho_mqtt/client.rb', line 194

def mqtt_loop
  loop_read
  loop_write
  loop_misc
  sleep LOOP_TEMPO
end

#ping_hostObject



297
298
299
# File 'lib/paho_mqtt/client.rb', line 297

def ping_host
  send_pingreq
end

#publish(topic, payload = "", retain = false, qos = 0) ⇒ Object



269
270
271
272
273
274
275
# File 'lib/paho_mqtt/client.rb', line 269

def publish(topic, payload="", retain=false, qos=0)
  if topic == "" || !topic.is_a?(String)
    @logger.error("Publish topics is invalid, not a string or empty.") if @logger.is_a?(Logger)
    raise ParameterException
  end
  send_publish(topic, payload, retain, qos)
end

#reconnect(retry_time = RECONNECT_RETRY_TIME, retry_tempo = RECONNECT_RETRY_TEMPO) ⇒ Object



211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
# File 'lib/paho_mqtt/client.rb', line 211

def reconnect(retry_time=RECONNECT_RETRY_TIME, retry_tempo=RECONNECT_RETRY_TEMPO)
  @reconnect_thread = Thread.new do
    retry_time.times do
      @logger.debug("New reconnect atempt...") if @logger.is_a?(Logger)
      connect
      if connected?
        break
      else
        sleep retry_tempo
      end
    end
    unless connected?
    @logger.error("Reconnection atempt counter is over.(#{RECONNECT_RETRY_TIME} times)") if @logger.is_a?(Logger)
      disconnect(false)
      exit
    end
  end
end

#remove_topic_callback(topic) ⇒ Object



316
317
318
319
320
321
322
323
# File 'lib/paho_mqtt/client.rb', line 316

def remove_topic_callback(topic)
  if topic.nil?
    @logger.error("The topics where the callback is trying to be unregistered have been found nil.") if @logger.is_a?(Logger)
    raise ParameterException
  end
  @registered_callback.delete_if {|pair| pair.first == topic}
  MQTT_ERR_SUCCESS
end

#ssl_contextObject



133
134
135
# File 'lib/paho_mqtt/client.rb', line 133

def ssl_context
  @ssl_context ||= OpenSSL::SSL::SSLContext.new
end

#subscribe(*topics) ⇒ Object



277
278
279
280
281
282
283
284
285
# File 'lib/paho_mqtt/client.rb', line 277

def subscribe(*topics)
  unless valid_topics?(topics) == MQTT_ERR_FAIL
    send_subscribe(topics)
  else
    @logger.error("Subscribe topics need one topic or a list of topics.") if @logger.is_a?(Logger)
    disconnect(false)
    raise ProtocolViolation
  end
end

#unsubscribe(*topics) ⇒ Object



287
288
289
290
291
292
293
294
295
# File 'lib/paho_mqtt/client.rb', line 287

def unsubscribe(*topics)
  unless valid_topics?(topics) == MQTT_ERR_FAIL
    send_unsubscribe(topics)
  else
    @logger.error("Unsubscribe need at least one topics.") if @logger.is_a?(Logger)
    disconnect(false)
    raise ProtocolViolation
  end
end