Class: PahoMqtt::Sender

Inherits:
Object
  • Object
show all
Defined in:
lib/paho_mqtt/sender.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(ack_timeout) ⇒ Sender

Returns a new instance of Sender.



20
21
22
23
24
25
26
# File 'lib/paho_mqtt/sender.rb', line 20

def initialize(ack_timeout)
  @socket        = nil
  @writing_queue = []
  @writing_mutex = Mutex.new
  @last_ping_req = -1
  @ack_timeout   = ack_timeout
end

Instance Attribute Details

#last_ping_reqObject

Returns the value of attribute last_ping_req.



18
19
20
# File 'lib/paho_mqtt/sender.rb', line 18

def last_ping_req
  @last_ping_req
end

Instance Method Details

#append_to_writing(packet) ⇒ Object



49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
# File 'lib/paho_mqtt/sender.rb', line 49

def append_to_writing(packet)
  begin
    if @writing_queue.length <= MAX_WRITING
      @writing_mutex.synchronize do
        @writing_queue.push(packet)
      end
    else
      PahoMqtt.logger.error('Writing queue is full slowing down') if PahoMqtt.logger?
      raise FullWritingException
    end
  rescue FullWritingException
    sleep SELECT_TIMEOUT
    retry
  end
  MQTT_ERR_SUCCESS
end

#check_ack_alive(queue, mutex) ⇒ Object



90
91
92
93
94
95
96
97
98
99
100
101
# File 'lib/paho_mqtt/sender.rb', line 90

def check_ack_alive(queue, mutex)
  mutex.synchronize do
    now = Time.now
    queue.each do |pck|
      if now >= pck[:timestamp] + @ack_timeout
        pck[:packet].dup ||= true unless pck[:packet].class == PahoMqtt::Packet::Subscribe || pck[:packet].class == PahoMqtt::Packet::Unsubscribe
        append_to_writing(pck[:packet])
        pck[:timestamp] = now
      end
    end
  end
end

#flush_waiting_packet(sending = true) ⇒ Object



78
79
80
81
82
83
84
85
86
87
88
# File 'lib/paho_mqtt/sender.rb', line 78

def flush_waiting_packet(sending=true)
  if sending
    @writing_mutex.synchronize do
      @writing_queue.each do |m|
        send_packet(m)
      end
    end
  else
    @writing_queue = []
  end
end

#send_packet(packet) ⇒ Object



32
33
34
35
36
37
38
39
40
41
42
43
# File 'lib/paho_mqtt/sender.rb', line 32

def send_packet(packet)
  begin
    @socket.write(packet.to_s) unless @socket.nil? || @socket.closed?
    @last_ping_req = Time.now
    MQTT_ERR_SUCCESS
  end
rescue StandardError
  raise WritingException
rescue IO::WaitWritable
  IO.select(nil, [@socket], nil, SELECT_TIMEOUT)
  retry
end

#send_pingreqObject



45
46
47
# File 'lib/paho_mqtt/sender.rb', line 45

def send_pingreq
  send_packet(PahoMqtt::Packet::Pingreq.new)
end

#socket=(socket) ⇒ Object



28
29
30
# File 'lib/paho_mqtt/sender.rb', line 28

def socket=(socket)
  @socket = socket
end

#writing_loop(max_packet) ⇒ Object



66
67
68
69
70
71
72
73
74
75
76
# File 'lib/paho_mqtt/sender.rb', line 66

def writing_loop(max_packet)
  @writing_mutex.synchronize do
    cnt = 0
    while !@writing_queue.empty? && cnt < max_packet do
      packet = @writing_queue.shift
      send_packet(packet)
      cnt += 1
    end
  end
  MQTT_ERR_SUCCESS
end