Class: PahoMqtt::Client

Inherits:
Object
  • Object
show all
Defined in:
lib/paho_mqtt/client.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(*args) ⇒ Client

Returns a new instance of Client.



52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
# File 'lib/paho_mqtt/client.rb', line 52

def initialize(*args)
  @last_ping_resp         = Time.now
  @last_packet_id         = 0
  @ssl_context            = nil
  @sender                 = nil
  @handler                = Handler.new
  @connection_helper      = nil
  @connection_state       = MQTT_CS_DISCONNECT
  @connection_state_mutex = Mutex.new
  @mqtt_thread            = nil
  @reconnect_thread       = nil
  @id_mutex               = Mutex.new
  @reconnect_limit        = 3
  @reconnect_delay        = 5

  if args.last.is_a?(Hash)
    attr = args.pop
  else
    attr = {}
  end

  CLIENT_ATTR_DEFAULTS.merge(attr).each_pair do |k,v|
    self.send("#{k}=", v)
  end

  if @ssl
    @ssl_context = OpenSSL::SSL::SSLContext.new
  end

  if @port.nil?
    if @ssl
      @port = DEFAULT_SSL_PORT
    else
      @port = DEFAULT_PORT
    end
  end

  if  @client_id.nil? || @client_id == ""
    @client_id = generate_client_id
  end
end

Instance Attribute Details

#ack_timeoutObject

Returns the value of attribute ack_timeout.



46
47
48
# File 'lib/paho_mqtt/client.rb', line 46

def ack_timeout
  @ack_timeout
end

#blockingObject

Returns the value of attribute blocking.



32
33
34
# File 'lib/paho_mqtt/client.rb', line 32

def blocking
  @blocking
end

#clean_sessionObject

Returns the value of attribute clean_session.



28
29
30
# File 'lib/paho_mqtt/client.rb', line 28

def clean_session
  @clean_session
end

#client_idObject

Returns the value of attribute client_id.



33
34
35
# File 'lib/paho_mqtt/client.rb', line 33

def client_id
  @client_id
end

#connection_stateObject (readonly)

Read Only attribute



49
50
51
# File 'lib/paho_mqtt/client.rb', line 49

def connection_state
  @connection_state
end

#hostObject

Connection related attributes:



25
26
27
# File 'lib/paho_mqtt/client.rb', line 25

def host
  @host
end

#keep_aliveObject

Timeout attributes:



45
46
47
# File 'lib/paho_mqtt/client.rb', line 45

def keep_alive
  @keep_alive
end

#mqtt_versionObject

Returns the value of attribute mqtt_version.



27
28
29
# File 'lib/paho_mqtt/client.rb', line 27

def mqtt_version
  @mqtt_version
end

#passwordObject

Returns the value of attribute password.



35
36
37
# File 'lib/paho_mqtt/client.rb', line 35

def password
  @password
end

#persistentObject

Returns the value of attribute persistent.



29
30
31
# File 'lib/paho_mqtt/client.rb', line 29

def persistent
  @persistent
end

#portObject

Returns the value of attribute port.



26
27
28
# File 'lib/paho_mqtt/client.rb', line 26

def port
  @port
end

#reconnect_delayObject

Returns the value of attribute reconnect_delay.



31
32
33
# File 'lib/paho_mqtt/client.rb', line 31

def reconnect_delay
  @reconnect_delay
end

#reconnect_limitObject

Returns the value of attribute reconnect_limit.



30
31
32
# File 'lib/paho_mqtt/client.rb', line 30

def reconnect_limit
  @reconnect_limit
end

#sslObject

Returns the value of attribute ssl.



36
37
38
# File 'lib/paho_mqtt/client.rb', line 36

def ssl
  @ssl
end

#ssl_contextObject (readonly)

Returns the value of attribute ssl_context.



50
51
52
# File 'lib/paho_mqtt/client.rb', line 50

def ssl_context
  @ssl_context
end

#usernameObject

Returns the value of attribute username.



34
35
36
# File 'lib/paho_mqtt/client.rb', line 34

def username
  @username
end

#will_payloadObject

Returns the value of attribute will_payload.



40
41
42
# File 'lib/paho_mqtt/client.rb', line 40

def will_payload
  @will_payload
end

#will_qosObject

Returns the value of attribute will_qos.



41
42
43
# File 'lib/paho_mqtt/client.rb', line 41

def will_qos
  @will_qos
end

#will_retainObject

Returns the value of attribute will_retain.



42
43
44
# File 'lib/paho_mqtt/client.rb', line 42

def will_retain
  @will_retain
end

#will_topicObject

Last will attributes:



39
40
41
# File 'lib/paho_mqtt/client.rb', line 39

def will_topic
  @will_topic
end

Instance Method Details

#add_topic_callback(topic, callback = nil, &block) ⇒ Object



269
270
271
# File 'lib/paho_mqtt/client.rb', line 269

def add_topic_callback(topic, callback=nil, &block)
  @handler.register_topic_callback(topic, callback, &block)
end

#config_ssl_context(cert_path, key_path, ca_path = nil) ⇒ Object



99
100
101
102
# File 'lib/paho_mqtt/client.rb', line 99

def config_ssl_context(cert_path, key_path, ca_path=nil)
  @ssl ||= true
  @ssl_context = SSLHelper.config_ssl_context(cert_path, key_path, ca_path)
end

#connect(host = @host, port = @port, keep_alive = @keep_alive, persistent = @persistent, blocking = @blocking) ⇒ Object



104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
# File 'lib/paho_mqtt/client.rb', line 104

def connect(host=@host, port=@port, keep_alive=@keep_alive, persistent=@persistent, blocking=@blocking)
  @persistent = persistent
  @blocking   = blocking
  @host       = host
  @port       = port.to_i
  @keep_alive = keep_alive
  @connection_state_mutex.synchronize do
    @connection_state = MQTT_CS_NEW
  end
  @mqtt_thread.kill unless @mqtt_thread.nil?

  init_connection unless reconnect?
  @connection_helper.send_connect(session_params)
  begin
    init_pubsub
    @connection_state = @connection_helper.do_connect(reconnect?)
    if connected?
      build_pubsub
      daemon_mode unless @blocking
    end
  rescue LowVersionException
    downgrade_version
  end
end

#connected?Boolean

Returns:

  • (Boolean)


146
147
148
# File 'lib/paho_mqtt/client.rb', line 146

def connected?
  @connection_state == MQTT_CS_CONNECTED
end

#daemon_modeObject



129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
# File 'lib/paho_mqtt/client.rb', line 129

def daemon_mode
  @mqtt_thread = Thread.new do
    @reconnect_thread.kill unless @reconnect_thread.nil? || !@reconnect_thread.alive?
    begin
      while connected? do
        mqtt_loop
      end
    rescue SystemCallError => e
      if @persistent
        reconnect
      else
        raise e
      end
    end
  end
end

#disconnect(explicit = true) ⇒ Object



218
219
220
221
222
223
224
225
226
227
228
# File 'lib/paho_mqtt/client.rb', line 218

def disconnect(explicit=true)
  @connection_helper.do_disconnect(@publisher, explicit, @mqtt_thread)
  @connection_state_mutex.synchronize do
    @connection_state = MQTT_CS_DISCONNECT
  end
  if explicit && @clean_session
    @last_packet_id = 0
    @subscriber.clear_queue
  end
  MQTT_ERR_SUCCESS
end

#generate_client_id(prefix = 'paho_ruby', lenght = 16) ⇒ Object



94
95
96
97
# File 'lib/paho_mqtt/client.rb', line 94

def generate_client_id(prefix='paho_ruby', lenght=16)
  charset = Array('A'..'Z') + Array('a'..'z') + Array('0'..'9')
  @client_id = prefix << Array.new(lenght) { charset.sample }.join
end

#loop_miscObject



189
190
191
192
193
194
195
196
# File 'lib/paho_mqtt/client.rb', line 189

def loop_misc
  if @connection_helper.check_keep_alive(@persistent, @handler.last_ping_resp, @keep_alive) == MQTT_CS_DISCONNECT
    reconnect if check_persistence
  end
  @publisher.check_waiting_publisher
  @subscriber.check_waiting_subscriber
  sleep SELECT_TIMEOUT
end

#loop_readObject



166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
# File 'lib/paho_mqtt/client.rb', line 166

def loop_read
  begin
    MAX_QUEUE.times do
      result = @handler.receive_packet
      break if result.nil?
    end
  rescue FullQueueException
    PahoMqtt.logger.warn("Early exit in reading loop. The maximum packets have been reach for #{packet.type_name}") if PahoMqtt.logger?
  rescue ReadingException
    if check_persistence
      reconnect
    else
      raise ReadingException
    end
  end
end

#loop_writeObject



154
155
156
157
158
159
160
161
162
163
164
# File 'lib/paho_mqtt/client.rb', line 154

def loop_write
  begin
    @sender.writing_loop
  rescue WritingException
    if check_persistence
      reconnect
    else
      raise WritingException
    end
  end
end

#mqtt_loopObject



183
184
185
186
187
# File 'lib/paho_mqtt/client.rb', line 183

def mqtt_loop
  loop_read
  loop_write
  loop_misc
end

#on_connack(&block) ⇒ Object



277
278
279
280
# File 'lib/paho_mqtt/client.rb', line 277

def on_connack(&block)
  @handler.on_connack = block if block_given?
  @handler.on_connack
end

#on_connack=(callback) ⇒ Object



317
318
319
# File 'lib/paho_mqtt/client.rb', line 317

def on_connack=(callback)
  @handler.on_connack = callback if callback.is_a?(Proc)
end

#on_message(&block) ⇒ Object



312
313
314
315
# File 'lib/paho_mqtt/client.rb', line 312

def on_message(&block)
  @handler.on_message = block if block_given?
  @handler.on_message
end

#on_message=(callback) ⇒ Object



345
346
347
# File 'lib/paho_mqtt/client.rb', line 345

def on_message=(callback)
  @handler.on_message = callback if callback.is_a?(Proc)
end

#on_puback(&block) ⇒ Object



292
293
294
295
# File 'lib/paho_mqtt/client.rb', line 292

def on_puback(&block)
  @handler.on_puback = block if block_given?
  @handler.on_puback
end

#on_puback=(callback) ⇒ Object



329
330
331
# File 'lib/paho_mqtt/client.rb', line 329

def on_puback=(callback)
  @handler.on_puback = callback if callback.is_a?(Proc)
end

#on_pubcomp(&block) ⇒ Object



307
308
309
310
# File 'lib/paho_mqtt/client.rb', line 307

def on_pubcomp(&block)
  @handler.on_pubcomp = block if block_given?
  @handler.on_pubcomp
end

#on_pubcomp=(callback) ⇒ Object



341
342
343
# File 'lib/paho_mqtt/client.rb', line 341

def on_pubcomp=(callback)
  @handler.on_pubcomp = callback if callback.is_a?(Proc)
end

#on_pubrec(&block) ⇒ Object



297
298
299
300
# File 'lib/paho_mqtt/client.rb', line 297

def on_pubrec(&block)
  @handler.on_pubrec = block if block_given?
  @handler.on_pubrec
end

#on_pubrec=(callback) ⇒ Object



333
334
335
# File 'lib/paho_mqtt/client.rb', line 333

def on_pubrec=(callback)
  @handler.on_pubrec = callback if callback.is_a?(Proc)
end

#on_pubrel(&block) ⇒ Object



302
303
304
305
# File 'lib/paho_mqtt/client.rb', line 302

def on_pubrel(&block)
  @handler.on_pubrel = block if block_given?
  @handler.on_pubrel
end

#on_pubrel=(callback) ⇒ Object



337
338
339
# File 'lib/paho_mqtt/client.rb', line 337

def on_pubrel=(callback)
  @handler.on_pubrel = callback if callback.is_a?(Proc)
end

#on_suback(&block) ⇒ Object



282
283
284
285
# File 'lib/paho_mqtt/client.rb', line 282

def on_suback(&block)
  @handler.on_suback = block if block_given?
  @handler.on_suback
end

#on_suback=(callback) ⇒ Object



321
322
323
# File 'lib/paho_mqtt/client.rb', line 321

def on_suback=(callback)
  @handler.on_suback = callback if callback.is_a?(Proc)
end

#on_unsuback(&block) ⇒ Object



287
288
289
290
# File 'lib/paho_mqtt/client.rb', line 287

def on_unsuback(&block)
  @handler.on_unsuback = block if block_given?
  @handler.on_unsuback
end

#on_unsuback=(callback) ⇒ Object



325
326
327
# File 'lib/paho_mqtt/client.rb', line 325

def on_unsuback=(callback)
  @handler.on_unsuback = callback if callback.is_a?(Proc)
end

#ping_hostObject



265
266
267
# File 'lib/paho_mqtt/client.rb', line 265

def ping_host
  @sender.send_pingreq
end

#publish(topic, payload = "", retain = false, qos = 0) ⇒ Object



230
231
232
233
234
235
236
237
# File 'lib/paho_mqtt/client.rb', line 230

def publish(topic, payload="", retain=false, qos=0)
  if topic == "" || !topic.is_a?(String)
    PahoMqtt.logger.error("Publish topics is invalid, not a string or empty.") if PahoMqtt.logger?
    raise ArgumentError
  end
  id = next_packet_id
  @publisher.send_publish(topic, payload, retain, qos, id)
end

#reconnectObject



198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
# File 'lib/paho_mqtt/client.rb', line 198

def reconnect
  @reconnect_thread = Thread.new do
    counter = 0
    while (@reconnect_limit >= counter || @reconnect_limit == -1) do
      counter += 1
      PahoMqtt.logger.debug("New reconnect attempt...") if PahoMqtt.logger?
      connect
      if connected?
        break
      else
        sleep @reconnect_delay
      end
    end
    unless connected?
      PahoMqtt.logger.error("Reconnection attempt counter is over. (#{@reconnect_limit} times)") if PahoMqtt.logger?
      disconnect(false)
    end
  end
end

#reconnect?Boolean

Returns:

  • (Boolean)


150
151
152
# File 'lib/paho_mqtt/client.rb', line 150

def reconnect?
  Thread.current == @reconnect_thread
end

#registered_callbackObject



349
350
351
# File 'lib/paho_mqtt/client.rb', line 349

def registered_callback
  @handler.registered_callback
end

#remove_topic_callback(topic) ⇒ Object



273
274
275
# File 'lib/paho_mqtt/client.rb', line 273

def remove_topic_callback(topic)
  @handler.clear_topic_callback(topic)
end

#subscribe(*topics) ⇒ Object



239
240
241
242
243
244
245
246
247
248
249
250
# File 'lib/paho_mqtt/client.rb', line 239

def subscribe(*topics)
  begin
    id = next_packet_id
    unless @subscriber.send_subscribe(topics, id) == PahoMqtt::MQTT_ERR_SUCCESS
      reconnect if check_persistence
    end
    MQTT_ERR_SUCCESS
  rescue ProtocolViolation
    PahoMqtt.logger.error("Subscribe topics need one topic or a list of topics.") if PahoMqtt.logger?
    raise ProtocolViolation
  end
end

#subscribed_topicsObject



353
354
355
# File 'lib/paho_mqtt/client.rb', line 353

def subscribed_topics
  @subscriber.subscribed_topics
end

#unsubscribe(*topics) ⇒ Object



252
253
254
255
256
257
258
259
260
261
262
263
# File 'lib/paho_mqtt/client.rb', line 252

def unsubscribe(*topics)
  begin
    id = next_packet_id
    unless @subscriber.send_unsubscribe(topics, id) == MQTT_ERR_SUCCESS
      reconnect if check_persistence
    end
    MQTT_ERR_SUCCESS
  rescue ProtocolViolation
    PahoMqtt.logger.error("Unsubscribe need at least one topic.") if PahoMqtt.logger?
    raise ProtocolViolation
  end
end