Class: PahoMqtt::Client

Inherits:
Object
  • Object
show all
Defined in:
lib/paho_mqtt/client.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(*args) ⇒ Client

Returns a new instance of Client.



50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
# File 'lib/paho_mqtt/client.rb', line 50

def initialize(*args)
  @last_ping_resp = Time.now
  @last_packet_id = 0
  @ssl_context = nil
  @sender = nil
  @handler = Handler.new
  @connection_helper = nil
  @connection_state = MQTT_CS_DISCONNECT
  @connection_state_mutex = Mutex.new
  @mqtt_thread = nil
  @reconnect_thread = nil
  @id_mutex = Mutex.new

  if args.last.is_a?(Hash)
    attr = args.pop
  else
    attr = {}
  end

  CLIENT_ATTR_DEFAULTS.merge(attr).each_pair do |k,v|
    self.send("#{k}=", v)
  end

  if @ssl
    @ssl_context = OpenSSL::SSL::SSLContext.new
  end

  if @port.nil?
    if @ssl
      @port = DEFAULT_SSL_PORT
    else
      @port = DEFAULT_PORT
    end
  end

  if  @client_id.nil? || @client_id == ""
    @client_id = generate_client_id
  end
end

Instance Attribute Details

#ack_timeoutObject

Returns the value of attribute ack_timeout.



44
45
46
# File 'lib/paho_mqtt/client.rb', line 44

def ack_timeout
  @ack_timeout
end

#blockingObject

Returns the value of attribute blocking.



30
31
32
# File 'lib/paho_mqtt/client.rb', line 30

def blocking
  @blocking
end

#clean_sessionObject

Returns the value of attribute clean_session.



28
29
30
# File 'lib/paho_mqtt/client.rb', line 28

def clean_session
  @clean_session
end

#client_idObject

Returns the value of attribute client_id.



31
32
33
# File 'lib/paho_mqtt/client.rb', line 31

def client_id
  @client_id
end

#connection_stateObject (readonly)

Read Only attribute



47
48
49
# File 'lib/paho_mqtt/client.rb', line 47

def connection_state
  @connection_state
end

#hostObject

Connection related attributes:



25
26
27
# File 'lib/paho_mqtt/client.rb', line 25

def host
  @host
end

#keep_aliveObject

Timeout attributes:



43
44
45
# File 'lib/paho_mqtt/client.rb', line 43

def keep_alive
  @keep_alive
end

#mqtt_versionObject

Returns the value of attribute mqtt_version.



27
28
29
# File 'lib/paho_mqtt/client.rb', line 27

def mqtt_version
  @mqtt_version
end

#passwordObject

Returns the value of attribute password.



33
34
35
# File 'lib/paho_mqtt/client.rb', line 33

def password
  @password
end

#persistentObject

Returns the value of attribute persistent.



29
30
31
# File 'lib/paho_mqtt/client.rb', line 29

def persistent
  @persistent
end

#portObject

Returns the value of attribute port.



26
27
28
# File 'lib/paho_mqtt/client.rb', line 26

def port
  @port
end

#sslObject

Returns the value of attribute ssl.



34
35
36
# File 'lib/paho_mqtt/client.rb', line 34

def ssl
  @ssl
end

#ssl_contextObject (readonly)

Returns the value of attribute ssl_context.



48
49
50
# File 'lib/paho_mqtt/client.rb', line 48

def ssl_context
  @ssl_context
end

#usernameObject

Returns the value of attribute username.



32
33
34
# File 'lib/paho_mqtt/client.rb', line 32

def username
  @username
end

#will_payloadObject

Returns the value of attribute will_payload.



38
39
40
# File 'lib/paho_mqtt/client.rb', line 38

def will_payload
  @will_payload
end

#will_qosObject

Returns the value of attribute will_qos.



39
40
41
# File 'lib/paho_mqtt/client.rb', line 39

def will_qos
  @will_qos
end

#will_retainObject

Returns the value of attribute will_retain.



40
41
42
# File 'lib/paho_mqtt/client.rb', line 40

def will_retain
  @will_retain
end

#will_topicObject

Last will attributes:



37
38
39
# File 'lib/paho_mqtt/client.rb', line 37

def will_topic
  @will_topic
end

Instance Method Details

#add_topic_callback(topic, callback = nil, &block) ⇒ Object



258
259
260
# File 'lib/paho_mqtt/client.rb', line 258

def add_topic_callback(topic, callback=nil, &block)
  @handler.register_topic_callback(topic, callback, &block)
end

#config_ssl_context(cert_path, key_path, ca_path = nil) ⇒ Object



95
96
97
98
# File 'lib/paho_mqtt/client.rb', line 95

def config_ssl_context(cert_path, key_path, ca_path=nil)
  @ssl ||= true
  @ssl_context = SSLHelper.config_ssl_context(cert_path, key_path, ca_path)
end

#connect(host = @host, port = @port, keep_alive = @keep_alive, persistent = @persistent, blocking = @blocking) ⇒ Object



100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
# File 'lib/paho_mqtt/client.rb', line 100

def connect(host=@host, port=@port, keep_alive=@keep_alive, persistent=@persistent, blocking=@blocking)
  @persistent = persistent
  @blocking = blocking
  @host = host
  @port = port.to_i
  @keep_alive = keep_alive
  @connection_state_mutex.synchronize {
    @connection_state = MQTT_CS_NEW
  }
  @mqtt_thread.kill unless @mqtt_thread.nil?
  init_connection
  @connection_helper.send_connect(session_params)
  begin
    @connection_state = @connection_helper.do_connect(reconnect?)
    if connected?
      build_pubsub
      daemon_mode unless @blocking
    end
  rescue LowVersionException
    downgrade_version
  end
end

#connected?Boolean

Returns:

  • (Boolean)


140
141
142
# File 'lib/paho_mqtt/client.rb', line 140

def connected?
  @connection_state == MQTT_CS_CONNECTED
end

#daemon_modeObject



123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
# File 'lib/paho_mqtt/client.rb', line 123

def daemon_mode
  @mqtt_thread = Thread.new do
    @reconnect_thread.kill unless @reconnect_thread.nil? || !@reconnect_thread.alive?
    begin
      while connected? do
        mqtt_loop
      end
    rescue SystemCallError => e
      if @persistent
        reconnect()
      else
        raise e
      end
    end
  end
end

#disconnect(explicit = true) ⇒ Object



207
208
209
210
211
212
213
214
# File 'lib/paho_mqtt/client.rb', line 207

def disconnect(explicit=true)
  @last_packet_id = 0 if explicit
  @connection_helper.do_disconnect(@publisher, explicit, @mqtt_thread)
  @connection_state_mutex.synchronize {
    @connection_state = MQTT_CS_DISCONNECT
  }
  MQTT_ERR_SUCCESS
end

#generate_client_id(prefix = 'paho_ruby', lenght = 16) ⇒ Object



90
91
92
93
# File 'lib/paho_mqtt/client.rb', line 90

def generate_client_id(prefix='paho_ruby', lenght=16)
  charset = Array('A'..'Z') + Array('a'..'z') + Array('0'..'9')
  @client_id = prefix << Array.new(lenght) { charset.sample }.join
end

#loop_miscObject



181
182
183
184
185
186
187
# File 'lib/paho_mqtt/client.rb', line 181

def loop_misc
  if @connection_helper.check_keep_alive(@persistent, @handler.last_ping_resp, @keep_alive) == MQTT_CS_DISCONNECT
    reconnect if check_persistence
  end
  @publisher.check_waiting_publisher
  @subscriber.check_waiting_subscriber
end

#loop_read(max_packet = MAX_READ) ⇒ Object



160
161
162
163
164
165
166
167
168
169
170
171
172
# File 'lib/paho_mqtt/client.rb', line 160

def loop_read(max_packet=MAX_READ)
  max_packet.times do
    begin
      @handler.receive_packet
    rescue ReadingException
      if check_persistence
        reconnect
      else
        raise ReadingException
      end
    end
  end
end

#loop_write(max_packet = MAX_WRITING) ⇒ Object



148
149
150
151
152
153
154
155
156
157
158
# File 'lib/paho_mqtt/client.rb', line 148

def loop_write(max_packet=MAX_WRITING)
  begin
    @sender.writing_loop(max_packet)
  rescue WritingException
    if check_persistence
      reconnect
    else
      raise WritingException
    end
  end
end

#mqtt_loopObject



174
175
176
177
178
179
# File 'lib/paho_mqtt/client.rb', line 174

def mqtt_loop
  loop_read
  loop_write
  loop_misc
  sleep LOOP_TEMPO
end

#on_connack(&block) ⇒ Object



266
267
268
269
# File 'lib/paho_mqtt/client.rb', line 266

def on_connack(&block)
  @handler.on_connack = block if block_given?
  @handler.on_connack
end

#on_connack=(callback) ⇒ Object



306
307
308
# File 'lib/paho_mqtt/client.rb', line 306

def on_connack=(callback)
  @handler.on_connack = callback if callback.is_a?(Proc)
end

#on_message(&block) ⇒ Object



301
302
303
304
# File 'lib/paho_mqtt/client.rb', line 301

def on_message(&block)
  @handler.on_message = block if block_given?
  @handler.on_message
end

#on_message=(callback) ⇒ Object



334
335
336
# File 'lib/paho_mqtt/client.rb', line 334

def on_message=(callback)
  @handler.on_message = callback if callback.is_a?(Proc)
end

#on_puback(&block) ⇒ Object



281
282
283
284
# File 'lib/paho_mqtt/client.rb', line 281

def on_puback(&block)
  @handler.on_puback = block if block_given?
  @handler.on_puback
end

#on_puback=(callback) ⇒ Object



318
319
320
# File 'lib/paho_mqtt/client.rb', line 318

def on_puback=(callback)
  @handler.on_puback = callback if callback.is_a?(Proc)
end

#on_pubcomp(&block) ⇒ Object



296
297
298
299
# File 'lib/paho_mqtt/client.rb', line 296

def on_pubcomp(&block)
  @handler.on_pubcomp = block if block_given?
  @handler.on_pubcomp
end

#on_pubcomp=(callback) ⇒ Object



330
331
332
# File 'lib/paho_mqtt/client.rb', line 330

def on_pubcomp=(callback)
  @handler.on_pubcomp = callback if callback.is_a?(Proc)
end

#on_pubrec(&block) ⇒ Object



286
287
288
289
# File 'lib/paho_mqtt/client.rb', line 286

def on_pubrec(&block)
  @handler.on_pubrec = block if block_given?
  @handler.on_pubrec
end

#on_pubrec=(callback) ⇒ Object



322
323
324
# File 'lib/paho_mqtt/client.rb', line 322

def on_pubrec=(callback)
  @handler.on_pubrec = callback if callback.is_a?(Proc)
end

#on_pubrel(&block) ⇒ Object



291
292
293
294
# File 'lib/paho_mqtt/client.rb', line 291

def on_pubrel(&block)
  @handler.on_pubrel = block if block_given?
  @handler.on_pubrel
end

#on_pubrel=(callback) ⇒ Object



326
327
328
# File 'lib/paho_mqtt/client.rb', line 326

def on_pubrel=(callback)
  @handler.on_pubrel = callback if callback.is_a?(Proc)
end

#on_suback(&block) ⇒ Object



271
272
273
274
# File 'lib/paho_mqtt/client.rb', line 271

def on_suback(&block)
  @handler.on_suback = block if block_given?
  @handler.on_suback
end

#on_suback=(callback) ⇒ Object



310
311
312
# File 'lib/paho_mqtt/client.rb', line 310

def on_suback=(callback)
  @handler.on_suback = callback if callback.is_a?(Proc)
end

#on_unsuback(&block) ⇒ Object



276
277
278
279
# File 'lib/paho_mqtt/client.rb', line 276

def on_unsuback(&block)
  @handler.on_unsuback = block if block_given?
  @handler.on_unsuback
end

#on_unsuback=(callback) ⇒ Object



314
315
316
# File 'lib/paho_mqtt/client.rb', line 314

def on_unsuback=(callback)
  @handler.on_unsuback = callback if callback.is_a?(Proc)
end

#ping_hostObject



254
255
256
# File 'lib/paho_mqtt/client.rb', line 254

def ping_host
  @sender.send_pingreq
end

#publish(topic, payload = "", retain = false, qos = 0) ⇒ Object



216
217
218
219
220
221
222
223
224
# File 'lib/paho_mqtt/client.rb', line 216

def publish(topic, payload="", retain=false, qos=0)
  if topic == "" || !topic.is_a?(String)
    PahoMqtt.logger.error("Publish topics is invalid, not a string or empty.") if PahoMqtt.logger?
    raise ArgumentError
  end
  id = next_packet_id
  @publisher.send_publish(topic, payload, retain, qos, id)
  MQTT_ERR_SUCCESS
end

#reconnectObject



189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
# File 'lib/paho_mqtt/client.rb', line 189

def reconnect
  @reconnect_thread = Thread.new do
    RECONNECT_RETRY_TIME.times do
      PahoMqtt.logger.debug("New reconnect atempt...") if PahoMqtt.logger?
      connect
      if connected?
        break
      else
        sleep RECONNECT_RETRY_TIME
      end
    end
    unless connected?
      PahoMqtt.logger.error("Reconnection atempt counter is over.(#{RECONNECT_RETRY_TIME} times)") if PahoMqtt.logger?
      disconnect(false)
    end
  end
end

#reconnect?Boolean

Returns:

  • (Boolean)


144
145
146
# File 'lib/paho_mqtt/client.rb', line 144

def reconnect?
  Thread.current == @reconnect_thread
end

#registered_callbackObject



338
339
340
# File 'lib/paho_mqtt/client.rb', line 338

def registered_callback
  @handler.registered_callback
end

#remove_topic_callback(topic) ⇒ Object



262
263
264
# File 'lib/paho_mqtt/client.rb', line 262

def remove_topic_callback(topic)
  @handler.clear_topic_callback(topic)
end

#subscribe(*topics) ⇒ Object



226
227
228
229
230
231
232
233
234
235
236
237
238
# File 'lib/paho_mqtt/client.rb', line 226

def subscribe(*topics)
  begin
    id = next_packet_id
    unless @subscriber.send_subscribe(topics, id) == PahoMqtt::MQTT_ERR_SUCCESS
      reconnect if check_persistence
    end
    MQTT_ERR_SUCCESS
  rescue ProtocolViolation
    PahoMqtt.logger.error("Subscribe topics need one topic or a list of topics.") if PahoMqtt.logger?
    disconnect(false)
    raise ProtocolViolation
  end
end

#subscribed_topicsObject



342
343
344
# File 'lib/paho_mqtt/client.rb', line 342

def subscribed_topics
  @subscriber.subscribed_topics
end

#unsubscribe(*topics) ⇒ Object



240
241
242
243
244
245
246
247
248
249
250
251
252
# File 'lib/paho_mqtt/client.rb', line 240

def unsubscribe(*topics)
  begin
    id = next_packet_id
    unless @subscriber.send_unsubscribe(topics, id) == MQTT_ERR_SUCCESS
      reconnect if check_persistence
    end
    MQTT_ERR_SUCCESS
  rescue ProtocolViolation
    PahoMqtt.logger.error("Unsubscribe need at least one topics.") if PahoMqtt.logger?
    disconnect(false)
    raise ProtocolViolation
  end
end