Class: PahoMqtt::Sender
- Inherits:
-
Object
- Object
- PahoMqtt::Sender
- Defined in:
- lib/paho_mqtt/sender.rb
Instance Attribute Summary collapse
-
#last_ping_req ⇒ Object
Returns the value of attribute last_ping_req.
Instance Method Summary collapse
- #append_to_writing(packet) ⇒ Object
- #check_ack_alive(queue, mutex) ⇒ Object
- #flush_waiting_packet(sending = true) ⇒ Object
-
#initialize(ack_timeout) ⇒ Sender
constructor
A new instance of Sender.
- #send_packet(packet) ⇒ Object
- #send_pingreq ⇒ Object
- #socket=(socket) ⇒ Object
- #writing_loop(max_packet) ⇒ Object
Constructor Details
#initialize(ack_timeout) ⇒ Sender
Returns a new instance of Sender.
20 21 22 23 24 25 26 |
# File 'lib/paho_mqtt/sender.rb', line 20 def initialize(ack_timeout) @socket = nil @writing_queue = [] @writing_mutex = Mutex.new @last_ping_req = -1 @ack_timeout = ack_timeout end |
Instance Attribute Details
#last_ping_req ⇒ Object
Returns the value of attribute last_ping_req.
18 19 20 |
# File 'lib/paho_mqtt/sender.rb', line 18 def last_ping_req @last_ping_req end |
Instance Method Details
#append_to_writing(packet) ⇒ Object
49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 |
# File 'lib/paho_mqtt/sender.rb', line 49 def append_to_writing(packet) begin if @writing_queue.length <= MAX_WRITING @writing_mutex.synchronize do @writing_queue.push(packet) end else PahoMqtt.logger.error('Writing queue is full slowing down') if PahoMqtt.logger? raise FullWritingException end rescue FullWritingException sleep SELECT_TIMEOUT retry end MQTT_ERR_SUCCESS end |
#check_ack_alive(queue, mutex) ⇒ Object
90 91 92 93 94 95 96 97 98 99 100 101 |
# File 'lib/paho_mqtt/sender.rb', line 90 def check_ack_alive(queue, mutex) mutex.synchronize do now = Time.now queue.each do |pck| if now >= pck[:timestamp] + @ack_timeout pck[:packet].dup ||= true unless pck[:packet].class == PahoMqtt::Packet::Subscribe || pck[:packet].class == PahoMqtt::Packet::Unsubscribe append_to_writing(pck[:packet]) pck[:timestamp] = now end end end end |
#flush_waiting_packet(sending = true) ⇒ Object
78 79 80 81 82 83 84 85 86 87 88 |
# File 'lib/paho_mqtt/sender.rb', line 78 def flush_waiting_packet(sending=true) if sending @writing_mutex.synchronize do @writing_queue.each do |m| send_packet(m) end end else @writing_queue = [] end end |
#send_packet(packet) ⇒ Object
32 33 34 35 36 37 38 39 40 41 42 43 |
# File 'lib/paho_mqtt/sender.rb', line 32 def send_packet(packet) begin @socket.write(packet.to_s) unless @socket.nil? || @socket.closed? @last_ping_req = Time.now MQTT_ERR_SUCCESS end rescue StandardError raise WritingException rescue IO::WaitWritable IO.select(nil, [@socket], nil, SELECT_TIMEOUT) retry end |
#send_pingreq ⇒ Object
45 46 47 |
# File 'lib/paho_mqtt/sender.rb', line 45 def send_pingreq send_packet(PahoMqtt::Packet::Pingreq.new) end |
#socket=(socket) ⇒ Object
28 29 30 |
# File 'lib/paho_mqtt/sender.rb', line 28 def socket=(socket) @socket = socket end |
#writing_loop(max_packet) ⇒ Object
66 67 68 69 70 71 72 73 74 75 76 |
# File 'lib/paho_mqtt/sender.rb', line 66 def writing_loop(max_packet) @writing_mutex.synchronize do cnt = 0 while !@writing_queue.empty? && cnt < max_packet do packet = @writing_queue.shift send_packet(packet) cnt += 1 end end MQTT_ERR_SUCCESS end |