Class: PahoMqtt::Publisher
- Inherits:
-
Object
- Object
- PahoMqtt::Publisher
- Defined in:
- lib/paho_mqtt/publisher.rb
Instance Method Summary collapse
- #check_waiting_publisher ⇒ Object
- #config_all_message_queue ⇒ Object
- #config_message_queue(queue, mutex, max_packet) ⇒ Object
- #do_puback(packet_id) ⇒ Object
- #do_pubcomp(packet_id) ⇒ Object
- #do_publish(qos, packet_id) ⇒ Object
- #do_pubrec(packet_id) ⇒ Object
- #do_pubrel(packet_id) ⇒ Object
- #flush_publisher ⇒ Object
-
#initialize(sender) ⇒ Publisher
constructor
A new instance of Publisher.
- #send_puback(packet_id) ⇒ Object
- #send_pubcomp(packet_id) ⇒ Object
- #send_publish(topic, payload, retain, qos, new_id) ⇒ Object
- #send_pubrec(packet_id) ⇒ Object
- #send_pubrel(packet_id) ⇒ Object
- #sender=(sender) ⇒ Object
Constructor Details
#initialize(sender) ⇒ Publisher
Returns a new instance of Publisher.
18 19 20 21 22 23 24 25 26 27 28 |
# File 'lib/paho_mqtt/publisher.rb', line 18 def initialize(sender) @waiting_puback = [] @waiting_pubrec = [] @waiting_pubrel = [] @waiting_pubcomp = [] @puback_mutex = Mutex.new @pubrec_mutex = Mutex.new @pubrel_mutex = Mutex.new @pubcomp_mutex = Mutex.new @sender = sender end |
Instance Method Details
#check_waiting_publisher ⇒ Object
158 159 160 161 162 163 |
# File 'lib/paho_mqtt/publisher.rb', line 158 def check_waiting_publisher @sender.check_ack_alive(@waiting_puback, @puback_mutex, MAX_PUBACK) @sender.check_ack_alive(@waiting_pubrec, @pubrec_mutex, MAX_PUBREC) @sender.check_ack_alive(@waiting_pubrel, @pubrel_mutex, MAX_PUBREL) @sender.check_ack_alive(@waiting_pubcomp, @pubcomp_mutex, MAX_PUBCOMP) end |
#config_all_message_queue ⇒ Object
138 139 140 141 142 143 |
# File 'lib/paho_mqtt/publisher.rb', line 138 def (@waiting_puback, @puback_mutex, MAX_PUBACK) (@waiting_pubrec, @pubrec_mutex, MAX_PUBREC) (@waiting_pubrel, @pubrel_mutex, MAX_PUBREL) (@waiting_pubcomp, @pubcomp_mutex, MAX_PUBCOMP) end |
#config_message_queue(queue, mutex, max_packet) ⇒ Object
145 146 147 148 149 150 151 152 153 154 155 156 |
# File 'lib/paho_mqtt/publisher.rb', line 145 def (queue, mutex, max_packet) mutex.synchronize { cnt = 0 queue.each do |pck| pck[:packet].dup ||= true if cnt <= max_packet @sender.append_to_writing(pck[:packet]) cnt += 1 end end } end |
#do_puback(packet_id) ⇒ Object
78 79 80 81 82 83 |
# File 'lib/paho_mqtt/publisher.rb', line 78 def do_puback(packet_id) @puback_mutex.synchronize{ @waiting_puback.delete_if { |pck| pck[:id] == packet_id } } MQTT_ERR_SUCCESS end |
#do_pubcomp(packet_id) ⇒ Object
131 132 133 134 135 136 |
# File 'lib/paho_mqtt/publisher.rb', line 131 def do_pubcomp(packet_id) @pubcomp_mutex.synchronize { @waiting_pubcomp.delete_if { |pck| pck[:id] == packet_id } } MQTT_ERR_SUCCESS end |
#do_publish(qos, packet_id) ⇒ Object
56 57 58 59 60 61 62 63 64 65 66 67 68 |
# File 'lib/paho_mqtt/publisher.rb', line 56 def do_publish(qos, packet_id) case qos when 0 when 1 send_puback(packet_id) when 2 send_pubrec(packet_id) else @logger.error("The packet qos value is invalid in publish.") if logger? raise PacketException end MQTT_ERR_SUCCESS end |
#do_pubrec(packet_id) ⇒ Object
96 97 98 99 100 101 102 |
# File 'lib/paho_mqtt/publisher.rb', line 96 def do_pubrec(packet_id) @pubrec_mutex.synchronize { @waiting_pubrec.delete_if { |pck| pck[:id] == packet_id } } send_pubrel(packet_id) MQTT_ERR_SUCCESS end |
#do_pubrel(packet_id) ⇒ Object
115 116 117 118 119 120 121 |
# File 'lib/paho_mqtt/publisher.rb', line 115 def do_pubrel(packet_id) @pubrel_mutex.synchronize { @waiting_pubrel.delete_if { |pck| pck[:id] == packet_id } } send_pubcomp(packet_id) MQTT_ERR_SUCCESS end |
#flush_publisher ⇒ Object
165 166 167 168 169 170 171 172 173 174 175 176 177 178 |
# File 'lib/paho_mqtt/publisher.rb', line 165 def flush_publisher @puback_mutex.synchronize { @waiting_puback = [] } @pubrec_mutex.synchronize { @waiting_pubrec = [] } @pubrel_mutex.synchronize { @waiting_pubrel = [] } @pubcomp_mutex.synchronize { @waiting_pubcomp = [] } end |
#send_puback(packet_id) ⇒ Object
70 71 72 73 74 75 76 |
# File 'lib/paho_mqtt/publisher.rb', line 70 def send_puback(packet_id) packet = PahoMqtt::Packet::Puback.new( :id => packet_id ) @sender.append_to_writing(packet) MQTT_ERR_SUCCESS end |
#send_pubcomp(packet_id) ⇒ Object
123 124 125 126 127 128 129 |
# File 'lib/paho_mqtt/publisher.rb', line 123 def send_pubcomp(packet_id) packet = PahoMqtt::Packet::Pubcomp.new( :id => packet_id ) @sender.append_to_writing(packet) MQTT_ERR_SUCCESS end |
#send_publish(topic, payload, retain, qos, new_id) ⇒ Object
34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 |
# File 'lib/paho_mqtt/publisher.rb', line 34 def send_publish(topic, payload, retain, qos, new_id) packet = PahoMqtt::Packet::Publish.new( :id => new_id, :topic => topic, :payload => payload, :retain => retain, :qos => qos ) @sender.append_to_writing(packet) case qos when 1 @puback_mutex.synchronize{ @waiting_puback.push({:id => new_id, :packet => packet, :timestamp => Time.now}) } when 2 @pubrec_mutex.synchronize{ @waiting_pubrec.push({:id => new_id, :packet => packet, :timestamp => Time.now}) } end MQTT_ERR_SUCCESS end |
#send_pubrec(packet_id) ⇒ Object
85 86 87 88 89 90 91 92 93 94 |
# File 'lib/paho_mqtt/publisher.rb', line 85 def send_pubrec(packet_id) packet = PahoMqtt::Packet::Pubrec.new( :id => packet_id ) @sender.append_to_writing(packet) @pubrel_mutex.synchronize{ @waiting_pubrel.push({:id => packet_id , :packet => packet, :timestamp => Time.now}) } MQTT_ERR_SUCCESS end |
#send_pubrel(packet_id) ⇒ Object
104 105 106 107 108 109 110 111 112 113 |
# File 'lib/paho_mqtt/publisher.rb', line 104 def send_pubrel(packet_id) packet = PahoMqtt::Packet::Pubrel.new( :id => packet_id ) @sender.append_to_writing(packet) @pubcomp_mutex.synchronize{ @waiting_pubcomp.push({:id => packet_id, :packet => packet, :timestamp => Time.now}) } MQTT_ERR_SUCCESS end |
#sender=(sender) ⇒ Object
30 31 32 |
# File 'lib/paho_mqtt/publisher.rb', line 30 def sender=(sender) @sender = sender end |