Class: Mqtt3

Inherits:
Object
  • Object
show all
Defined in:
lib/ruby-mqtt3.rb

Constant Summary collapse

MQTT_PACKET_TYPES =
[
'INVALID', #0
'CONNECT', #1
'CONNACK', #2
'PUBLISH', #3
'PUBACK', #4
'PUBREC', #5
'PUBREL', #6
'PUBCOMP', #7
'SUBSCRIBE', #8
'SUBACK', #9
'UNSUBSCRIBE', #10
'UNSUBACK', #11
'PINGREQ', #12
'PINGRESP', #13
'DISCONNECT', #14
'RESERVED' ].freeze
CONNECT =
1
CONNACK =
2
PUBLISH =
3
PUBACK =
4
PUBREC =
5
PUBREL =
6
PUBCOMP =
7
SUBSCRIBE =
8
SUBACK =
9
UNSUBSCRIBE =
10
UNSUBACK =
11
PINGREQ =
12
PINGRESP =
13
DISCONNECT =
14

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(host: 'localhost', port: 1883, reconnect: true, keepalive_sec: 30, client_id: nil, clean_session: true, will_topic: nil, will_payload: nil, will_qos: 0, will_retain: false, username: nil, password: nil, persistence_filename: nil, persistence_mode: :save_manual, ssl: nil, ssl_cert: nil, ssl_cert_file: nil, ssl_key: nil, ssl_key_file: nil, ssl_ca_file: nil, ssl_passphrase: nil) ⇒ Mqtt3

Returns a new instance of Mqtt3.



72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
# File 'lib/ruby-mqtt3.rb', line 72

def initialize(host: 'localhost',
               port: 1883,
               reconnect: true,
               keepalive_sec: 30,
               client_id: nil,
               clean_session: true,
               will_topic: nil,
               will_payload: nil,
               will_qos: 0,
               will_retain: false,
               username: nil,
               password: nil,
               persistence_filename: nil,
               persistence_mode: :save_manual, # or :save_everytime
               ssl: nil,
               ssl_cert: nil,
               ssl_cert_file: nil,
               ssl_key: nil,
               ssl_key_file: nil,
               ssl_ca_file: nil,
               ssl_passphrase: nil)
  @host = host
  @port = port
  @reconnect = reconnect
  @keepalive_sec = keepalive_sec
  @client_id = client_id
  if @client_id.nil?
    @client_id = File.basename($0)[0..10]
    charset = Array('A'..'Z') + Array('a'..'z') + Array('0'..'9')
    @client_id += '-' + Array.new(8) { charset.sample }.join
  end
  @clean_session = clean_session
  @will_topic = will_topic
  @will_payload = will_payload
  @will_qos = will_qos
  @will_retain = will_retain
  @username = username
  @password = password
  @persistence_filename = persistence_filename
  @persistence_mode = persistence_mode

  @ssl = ssl
  @ssl_cert = ssl_cert
  @ssl_cert_file = ssl_cert_file
  @ssl_key = ssl_key
  @ssl_key_file = ssl_key_file
  @ssl_ca_file = ssl_ca_file
  @ssl_passphrase = ssl_passphrase

  init_ssl() if @ssl

  @socket = nil
  @packet_id = 0
  @outgoing_qos1_store = Hash.new
  @outgoing_qos2_store = Hash.new
  @incoming_qos1_store = Hash.new
  @incoming_qos2_store = Hash.new
  @packet_id = 0
  @state = :disconnected
end

Instance Attribute Details

#clean_sessionObject

Returns the value of attribute clean_session.



15
16
17
# File 'lib/ruby-mqtt3.rb', line 15

def clean_session
  @clean_session
end

#client_idObject

Returns the value of attribute client_id.



14
15
16
# File 'lib/ruby-mqtt3.rb', line 14

def client_id
  @client_id
end

#debug(x) ⇒ Object

Returns the value of attribute debug.



7
8
9
# File 'lib/ruby-mqtt3.rb', line 7

def debug
  @debug
end

#hostObject

connection params



10
11
12
# File 'lib/ruby-mqtt3.rb', line 10

def host
  @host
end

#ipObject

Returns the value of attribute ip.



11
12
13
# File 'lib/ruby-mqtt3.rb', line 11

def ip
  @ip
end

#keepalive_secObject

Returns the value of attribute keepalive_sec.



13
14
15
# File 'lib/ruby-mqtt3.rb', line 13

def keepalive_sec
  @keepalive_sec
end

#last_packet_sent_atObject (readonly)

internal state



34
35
36
# File 'lib/ruby-mqtt3.rb', line 34

def last_packet_sent_at
  @last_packet_sent_at
end

#packet_idObject (readonly)

Returns the value of attribute packet_id.



35
36
37
# File 'lib/ruby-mqtt3.rb', line 35

def packet_id
  @packet_id
end

#passwordObject

Returns the value of attribute password.



21
22
23
# File 'lib/ruby-mqtt3.rb', line 21

def password
  @password
end

#persistence_filenameObject

Returns the value of attribute persistence_filename.



22
23
24
# File 'lib/ruby-mqtt3.rb', line 22

def persistence_filename
  @persistence_filename
end

#persistence_modeObject

Returns the value of attribute persistence_mode.



23
24
25
# File 'lib/ruby-mqtt3.rb', line 23

def persistence_mode
  @persistence_mode
end

#reconnectObject

Returns the value of attribute reconnect.



12
13
14
# File 'lib/ruby-mqtt3.rb', line 12

def reconnect
  @reconnect
end

#sslObject

Returns the value of attribute ssl.



25
26
27
# File 'lib/ruby-mqtt3.rb', line 25

def ssl
  @ssl
end

#ssl_ca_fileObject

Returns the value of attribute ssl_ca_file.



30
31
32
# File 'lib/ruby-mqtt3.rb', line 30

def ssl_ca_file
  @ssl_ca_file
end

#ssl_certObject

Returns the value of attribute ssl_cert.



26
27
28
# File 'lib/ruby-mqtt3.rb', line 26

def ssl_cert
  @ssl_cert
end

#ssl_cert_fileObject

Returns the value of attribute ssl_cert_file.



27
28
29
# File 'lib/ruby-mqtt3.rb', line 27

def ssl_cert_file
  @ssl_cert_file
end

#ssl_contextObject (readonly)

Returns the value of attribute ssl_context.



37
38
39
# File 'lib/ruby-mqtt3.rb', line 37

def ssl_context
  @ssl_context
end

#ssl_keyObject

Returns the value of attribute ssl_key.



28
29
30
# File 'lib/ruby-mqtt3.rb', line 28

def ssl_key
  @ssl_key
end

#ssl_key_fileObject

Returns the value of attribute ssl_key_file.



29
30
31
# File 'lib/ruby-mqtt3.rb', line 29

def ssl_key_file
  @ssl_key_file
end

#ssl_passphraseObject

Returns the value of attribute ssl_passphrase.



31
32
33
# File 'lib/ruby-mqtt3.rb', line 31

def ssl_passphrase
  @ssl_passphrase
end

#stateObject (readonly)

Returns the value of attribute state.



36
37
38
# File 'lib/ruby-mqtt3.rb', line 36

def state
  @state
end

#usernameObject

Returns the value of attribute username.



20
21
22
# File 'lib/ruby-mqtt3.rb', line 20

def username
  @username
end

#will_payloadObject

Returns the value of attribute will_payload.



17
18
19
# File 'lib/ruby-mqtt3.rb', line 17

def will_payload
  @will_payload
end

#will_qosObject

Returns the value of attribute will_qos.



18
19
20
# File 'lib/ruby-mqtt3.rb', line 18

def will_qos
  @will_qos
end

#will_retainObject

Returns the value of attribute will_retain.



19
20
21
# File 'lib/ruby-mqtt3.rb', line 19

def will_retain
  @will_retain
end

#will_topicObject

Returns the value of attribute will_topic.



16
17
18
# File 'lib/ruby-mqtt3.rb', line 16

def will_topic
  @will_topic
end

Instance Method Details

#connectObject



149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
# File 'lib/ruby-mqtt3.rb', line 149

def connect
  body = encode_string('MQTT')
  body += encode_bytes 0x04

  flags = 0
  flags |= 0x02 if @clean_session
  flags |= 0x04 unless @will_topic.nil?
  flags |= ((@will_qos & 0x03) << 3)
  flags |= 0x20 if @will_retain
  flags |= 0x40 unless @password.nil?
  flags |= 0x80 unless @username.nil?
  body += encode_bytes(flags)

  body += encode_short(@keepalive_sec)
  body += encode_string(@client_id)
  unless will_topic.nil?
    body += encode_string(@will_topic)
    body += encode_string(@will_payload)
  end
  body += encode_string(@username) unless @username.nil?
  body += encode_string(@password) unless @password.nil?

  packet = encode_bytes(CONNECT << 4)
  packet += encode_length(body.length)
  packet += body

  send_packet(packet)
end

#decode_short(bytes) ⇒ Object



299
300
301
# File 'lib/ruby-mqtt3.rb', line 299

def decode_short(bytes)
  bytes.unpack('n').first
end

#disconnectObject



141
142
143
# File 'lib/ruby-mqtt3.rb', line 141

def disconnect
  send_packet("\xe0\x00".force_encoding('ASCII-8BIT')) #DISCONNECT
end

#encode_bits(bits) ⇒ Object



265
266
267
# File 'lib/ruby-mqtt3.rb', line 265

def encode_bits(bits)
  [bits.map { |b| b ? '1' : '0' }.join].pack('b*')
end

#encode_bytes(*bytes) ⇒ Object



261
262
263
# File 'lib/ruby-mqtt3.rb', line 261

def encode_bytes(*bytes)
  bytes.pack('C*')
end

#encode_length(body_length) ⇒ Object



282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
# File 'lib/ruby-mqtt3.rb', line 282

def encode_length(body_length)
  if body_length > 268_435_455
    raise 'Error serialising packet: body is more than 256MB'
  end

  x = ''
  loop do
    digit = (body_length % 128)
    body_length = body_length.div(128)
    # if there are more digits to encode, set the top bit of this digit
    digit |= 0x80 if body_length > 0
    x += digit.chr
    break if body_length <= 0
  end
  return x
end

#encode_short(val) ⇒ Object



269
270
271
272
# File 'lib/ruby-mqtt3.rb', line 269

def encode_short(val)
  raise 'Value too big for short' if val > 0xffff
  [val.to_i].pack('n')
end

#encode_string(str) ⇒ Object



274
275
276
277
278
279
280
# File 'lib/ruby-mqtt3.rb', line 274

def encode_string(str)
  str = str.to_s.encode('UTF-8')

  # Force to binary, when assembling the packet
  str.force_encoding('ASCII-8BIT')
  encode_short(str.bytesize) + str
end

#handle_packet(type, flags, length, data) ⇒ Object



342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
# File 'lib/ruby-mqtt3.rb', line 342

def handle_packet(type,flags,length,data)
  debug "+++ #{MQTT_PACKET_TYPES[type]} flags: #{flags}  length: #{length}  data: #{data.unpack('H*').first}"
  case type
  when CONNACK
    return_code = data[1].ord
    if return_code == 0
      @state = :mqtt_connected
      session_present = (data[0].ord == 1)
      @on_connect_block.call(session_present) unless @on_connect_block.nil?

      #sending QoS 1 and Qos2 messages
      @outgoing_qos1_store.each do |packet_id,m|
        debug "resending QoS 1 packet #{packet_id} #{m[0]} #{m[1]}"
        publish_dup(m[0],m[1],m[2],m[3],true,packet_id)
      end
      @outgoing_qos2_store.each do |packet_id,m|
        state = m[4]
        if state == PUBLISH
          debug "resending QoS 2 packet PUBLISH #{packet_id} #{m[0]} #{m[1]}"
          publish_dup(m[0],m[1],m[2],m[3],true,packet_id)
        elsif state == PUBREL
          debug "resending QoS 2 packet PUBREL #{packet_id} #{m[0]} #{m[1]}"
          pubrel(packet_id)
        end
      end
    else
      @on_mqtt_connect_error_block.call(return_code) unless @on_mqtt_connect_error_block.nil?
    end

  when PUBLISH
    qos = (flags & 6) >> 1
    topic_length = decode_short(data[0..1])
    topic = data[2..topic_length+1]

    if (qos > 0)
      packet_id = decode_short(data[topic_length+2..topic_length+3])
      message_starts_at = 4
    else
      packet_id = nil
      message_starts_at = 2
    end
    message = data[topic_length+message_starts_at..-1]

    if qos == 0
      @on_message_block.call(topic, message, qos, packet_id) unless @on_message_block.nil?
    elsif qos == 1
      if @incoming_qos1_store[packet_id].nil?
        @incoming_qos1_store[packet_id] = true
        save_everytime
        @on_message_block.call(topic, message, qos, packet_id) unless @on_message_block.nil?
      end

      sent = puback(packet_id)
      if sent
        @incoming_qos1_store.delete packet_id
        save_everytime
      end

    elsif qos == 2
      pubrec(packet_id)
      if @incoming_qos2_store[packet_id].nil?
        @incoming_qos2_store[packet_id] = true
        save_everytime
        @on_message_block.call(topic, message, qos, packet_id) unless @on_message_block.nil?
      end
    end

  when PUBACK
    packet_id = decode_short(data)
    if @outgoing_qos1_store.has_key?(packet_id)
      @outgoing_qos1_store.delete(packet_id)
      save_everytime()
      @on_publish_finished_block.call(packet_id) unless @on_publish_finished_block.nil?
    else
      debug "WARNING: PUBACK #{packet_id} not found"
    end

  when PUBREC
    packet_id = decode_short(data)
    p = @outgoing_qos2_store[packet_id]
    unless p.nil?
      if p[4] == PUBLISH
        @outgoing_qos2_store[packet_id][4] = PUBREL
        save_everytime()
      else
        debug "WARNING: PUBREC #{packet_id} not in PUBLISH state"
      end
    else
      debug "WARNING: PUBREC #{packet_id} not found"
    end
    pubrel(packet_id)

  when PUBREL
    packet_id = decode_short(data)
    sent = pubcomp(packet_id)
    if sent
      @incoming_qos2_store.delete packet_id
      save_everytime
    end

  when PUBCOMP
    packet_id = decode_short(data)
    p = @outgoing_qos2_store[packet_id]
    unless p.nil?
      if p[4] == PUBREL
        @outgoing_qos2_store.delete(packet_id)
        @on_publish_finished_block.call(packet_id) unless @on_publish_finished_block.nil?
        save_everytime()
      else
        debug "WARNING: PUBCOMP #{packet_id} not in PUBLISH state"
      end
    else
      debug "WARNING: PUBCOMP #{packet_id} not found"
    end

  when SUBACK
    # for each topic
    #@on_subscribe_block.call(topic_name, packet_id, ret)

  when PINGREQ
    pingresp

  when PINGRESP
  else
    debug "WARNING: packet type: #{type} is not handled"
  end
end

#init_sslObject



499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
# File 'lib/ruby-mqtt3.rb', line 499

def init_ssl
  @ssl_cert = File.read(@ssl_cert_file) if @ssl_cert_file
  @ssl_key = File.read(@ssl_key_file) if @ssl_key_file

  @ssl_context = OpenSSL::SSL::SSLContext.new

  unless @ssl.is_a?(TrueClass)
    @ssl_context.ssl_version = @ssl
  end

  @ssl_context.cert = OpenSSL::X509::Certificate.new(@ssl_cert) if @ssl_cert
  @ssl_context.key  = OpenSSL::PKey::RSA.new(@ssl_key, @ssl_passphrase) if @ssl_key
  @ssl_context.ca_file  = @ssl_ca_file if @ssl_ca_file

  @ssl_context.verify_mode = OpenSSL::SSL::VERIFY_PEER
end

#invalidObject



145
146
147
# File 'lib/ruby-mqtt3.rb', line 145

def invalid
  send_packet("\xff\x00".force_encoding('ASCII-8BIT'))
end

#next_packet_idObject



255
256
257
258
259
# File 'lib/ruby-mqtt3.rb', line 255

def next_packet_id
  @packet_id += 1
  @packet_id = 0 if @packet_id > 0xffff
  return @packet_id
end

#on_connect(&block) ⇒ Object



314
315
316
# File 'lib/ruby-mqtt3.rb', line 314

def on_connect(&block)
  @on_connect_block = block
end

#on_disconnect(&block) ⇒ Object



326
327
328
# File 'lib/ruby-mqtt3.rb', line 326

def on_disconnect(&block)
  @on_disconnect_block = block
end

#on_message(&block) ⇒ Object



338
339
340
# File 'lib/ruby-mqtt3.rb', line 338

def on_message(&block)
  @on_message_block = block
end

#on_mqtt_connect_error(&block) ⇒ Object



322
323
324
# File 'lib/ruby-mqtt3.rb', line 322

def on_mqtt_connect_error(&block)
  @on_mqtt_connect_error_block = block
end

#on_publish_finished(&block) ⇒ Object



334
335
336
# File 'lib/ruby-mqtt3.rb', line 334

def on_publish_finished(&block)
  @on_publish_finished_block = block
end

#on_subscribe(&block) ⇒ Object



330
331
332
# File 'lib/ruby-mqtt3.rb', line 330

def on_subscribe(&block)
  @on_subscribe_block = block
end

#on_tcp_connect_error(&block) ⇒ Object



318
319
320
# File 'lib/ruby-mqtt3.rb', line 318

def on_tcp_connect_error(&block)
  @on_tcp_connect_error_block = block
end

#pingreqObject



133
134
135
# File 'lib/ruby-mqtt3.rb', line 133

def pingreq
  send_packet("\xc0\x00".force_encoding('ASCII-8BIT')) #PINGREQ
end

#pingrespObject



137
138
139
# File 'lib/ruby-mqtt3.rb', line 137

def pingresp
  send_packet("\xd0\x00".force_encoding('ASCII-8BIT')) #PINGRESP
end

#puback(packet_id) ⇒ Object



231
232
233
234
235
# File 'lib/ruby-mqtt3.rb', line 231

def puback(packet_id)
  packet = "\x42\x02".force_encoding('ASCII-8BIT') #PUBACK
  packet += encode_short(packet_id)
  send_packet(packet)
end

#pubcomp(packet_id) ⇒ Object



249
250
251
252
253
# File 'lib/ruby-mqtt3.rb', line 249

def pubcomp(packet_id)
  packet = "\x72\x02".force_encoding('ASCII-8BIT') #PUBCOMP
  packet += encode_short(packet_id)
  send_packet(packet)
end

#publish(topic, message, qos = 0, retain = false) ⇒ Object



193
194
195
# File 'lib/ruby-mqtt3.rb', line 193

def publish(topic,message,qos = 0,retain = false)
  publish_dup(topic,message,qos,retain,false,nil)
end

#publish_dup(topic, message, qos = 0, retain = false, dup = false, packet_id = nil) ⇒ Object



197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
# File 'lib/ruby-mqtt3.rb', line 197

def publish_dup(topic,message,qos = 0,retain = false, dup = false, packet_id = nil)
  raise 'Invalid topic name' if topic.nil? || topic.to_s.empty?
  raise 'Invalid QoS' if qos < 0 || qos > 2

  # first publish
  if packet_id.nil?
    packet_id = next_packet_id()

    if qos == 1
      @outgoing_qos1_store[packet_id] = [topic,message,qos,retain]
    elsif qos == 2
      @outgoing_qos2_store[packet_id] = [topic,message,qos,retain,PUBLISH]
    end
    save_everytime()
  end


  flags = 0
  flags += 1 if retain
  flags += qos << 1
  flags += 8 if dup

  body = encode_string(topic)
  body += encode_short(packet_id) if qos > 0
  body += message

  packet = encode_bytes((PUBLISH << 4) + flags)
  packet += encode_length(body.length)
  packet += body

  send_packet(packet)
  return packet_id
end

#pubrec(packet_id) ⇒ Object



237
238
239
240
241
# File 'lib/ruby-mqtt3.rb', line 237

def pubrec(packet_id)
  packet = "\x52\x02".force_encoding('ASCII-8BIT') #PUBREC
  packet += encode_short(packet_id)
  send_packet(packet)
end

#pubrel(packet_id) ⇒ Object



243
244
245
246
247
# File 'lib/ruby-mqtt3.rb', line 243

def pubrel(packet_id)
  packet = "\x62\x02".force_encoding('ASCII-8BIT') #PUBREL
  packet += encode_short(packet_id)
  send_packet(packet)
end

#read_bytes(count) ⇒ Object



485
486
487
488
489
490
491
492
493
494
495
496
497
# File 'lib/ruby-mqtt3.rb', line 485

def read_bytes(count)
  buffer = ''
  while buffer.length != count
    #TODO rescue
    chunk = @socket.read(count - buffer.length)
    if chunk == '' || chunk.nil?
      raise Mqtt3NormalExitException
    else
      buffer += chunk
    end
  end
  return buffer
end

#read_from_socket_loopObject



640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
# File 'lib/ruby-mqtt3.rb', line 640

def read_from_socket_loop
  loop do
    x = read_bytes(1).ord
    type = (x & 0xf0) >> 4
    flags = x & 0x0f

    # Read in the packet length
    multiplier = 1
    length = 0
    pos = 1

    loop do
      digit = read_bytes(1).ord
      length += ((digit & 0x7F) * multiplier)
      multiplier *= 0x80
      pos += 1
      break if (digit & 0x80).zero? || pos > 4
    end

    data = read_bytes(length)
    handle_packet(type, flags, length, data)
  end
end

#runObject



523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
# File 'lib/ruby-mqtt3.rb', line 523

def run
  #persistence
  if @persistence_filename
    if @clean_session
      if File.exist?(@persistence_filename)
        debug "removing file " + @persistence_filename
        File.delete(@persistence_filename)
      end
    else
      if File.exist?(@persistence_filename)
        @outgoing_qos1_store, @outgoing_qos2_store, @incoming_qos1_store, @incoming_qos2_store = Marshal.load(File.read(@persistence_filename))
        debug "loading state from #{@persistence_filename}  out_QoS1:#{@outgoing_qos1_store.inspect}  out_QoS2:#{@outgoing_qos2_store.inspect}  in_QoS1: #{@incoming_qos1_store.inspect}  in_QoS2: #{@incoming_qos2_store.inspect}"
      end
    end
  end

  Fiber.schedule do
    @fiber_main = Fiber.current
    #debug 'entering main fiber' + @fiber_main.inspect
    counter = 0
    while @reconnect do
      ret = tcp_connect()
      if ret.is_a? (Exception)
        @on_tcp_connect_error_block.call(e,counter) unless @on_tcp_connect_error_block.nil?
      else
        @socket = ret
        @state = :tcp_connected
        counter = 0
        debug 'TCP connected'
        connect

        @fiber_ping = run_fiber_ping

        begin
          e = read_from_socket_loop()
        rescue Mqtt3NormalExitException
          @reconnect = false
        rescue
        end

        @fiber_ping.raise(Mqtt3NormalExitException)

        @state = :disconnected
        @on_disconnect_block.call(e) unless @on_disconnect_block.nil?
      end

      if @reconnect
        if @on_reconnect_block
          @on_reconnect_block.call(counter)
          counter += 1
        else
          if counter > 0
            sleep counter
          end

          if counter == 0
            counter = 1
          else
            counter *= 2
            counter = 300 if counter > 300
          end
        end
      end
    end
    #debug 'exiting main fiber' + @fiber_main.inspect
  end
end

#run_fiber_pingObject



596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
# File 'lib/ruby-mqtt3.rb', line 596

def run_fiber_ping
  fiber_ping = Fiber.schedule do
    #debug 'entering ping fiber' + Fiber.current.inspect
    begin
      loop do
        if @last_packet_sent_at.nil? || @state != :mqtt_connected
          #debug "sleeping for #{@keepalive_sec} sec"
          sleep @keepalive_sec
        else
          #only send when needed (store time, and adjust sleep with it)
          while ((t = @last_packet_sent_at + @keepalive_sec - Time.now) >= 0) do
            #debug "sleeping for #{t} sec"
            sleep t
          end
          pingreq
        end
      end
    rescue Mqtt3NormalExitException
    end
    #debug 'exiting ping fiber' + @fiber_ping.inspect
  end
  return fiber_ping
end

#saveObject



476
477
478
479
480
481
482
483
# File 'lib/ruby-mqtt3.rb', line 476

def save
  if @persistence_filename
    File.open(@persistence_filename,"w+") do |f|
      debug "saving state to " + @persistence_filename
      f.write Marshal.dump([@outgoing_qos1_store,@outgoing_qos2_store,@incoming_qos1_store,@incoming_qos2_store])
    end
  end
end

#save_everytimeObject



470
471
472
473
474
# File 'lib/ruby-mqtt3.rb', line 470

def save_everytime
  if @persistence_filename && (@persistence_mode == :save_everytime)
    save
  end
end

#send_packet(p) ⇒ Object



303
304
305
306
307
308
309
310
311
312
# File 'lib/ruby-mqtt3.rb', line 303

def send_packet(p)
  return false if state == :disconnected
  return false if state == :tcp_connected && ((p[0].ord >> 4) != CONNECT)

  debug '--- ' + MQTT_PACKET_TYPES[p[0].ord >> 4] + ' flags: ' + (p[0].ord & 0x0f).to_s + '  ' + p.unpack('H*').first

  @socket.write(p)
  @last_packet_sent_at = Time.now
  return true
end

#stopObject



591
592
593
594
# File 'lib/ruby-mqtt3.rb', line 591

def stop
  #puts "sending raise to #{@fiber_main}"
  @fiber_main.raise(Mqtt3NormalExitException)
end

#subscribe(topic_list) ⇒ Object



178
179
180
181
182
183
184
185
186
187
188
189
190
191
# File 'lib/ruby-mqtt3.rb', line 178

def subscribe(topic_list)
  body = encode_short(next_packet_id())
  topic_list.each do |x|
    body += encode_string(x[0])
    body += encode_bytes(x[1])
  end

  flags = 2
  packet = encode_bytes((SUBSCRIBE << 4) + flags)
  packet += encode_length(body.length)
  packet += body

  send_packet(packet)
end

#tcp_connectObject



620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
# File 'lib/ruby-mqtt3.rb', line 620

def tcp_connect
  begin
    tcp_socket = TCPSocket.new(@host, @port, connect_timeout: 1)

    if @ssl
      socket = OpenSSL::SSL::SSLSocket.new(tcp_socket, @ssl_context)
      socket.sync_close = true
      # Set hostname on secure socket for Server Name Indication (SNI)
      #TODO ??? socket.hostname = @host if socket.respond_to?(:hostname=)
      socket.connect
    else
      socket = tcp_socket
    end

    return socket
  rescue => e
    return e
  end
end