Class: PahoMqtt::Publisher
- Inherits:
-
Object
- Object
- PahoMqtt::Publisher
- Defined in:
- lib/paho_mqtt/publisher.rb
Instance Method Summary collapse
- #check_waiting_publisher ⇒ Object
- #config_all_message_queue ⇒ Object
- #config_message_queue(queue, mutex) ⇒ Object
- #do_puback(packet_id) ⇒ Object
- #do_pubcomp(packet_id) ⇒ Object
- #do_publish(qos, packet_id) ⇒ Object
- #do_pubrec(packet_id) ⇒ Object
- #do_pubrel(packet_id) ⇒ Object
- #flush_publisher ⇒ Object
-
#initialize(sender) ⇒ Publisher
constructor
A new instance of Publisher.
- #send_puback(packet_id) ⇒ Object
- #send_pubcomp(packet_id) ⇒ Object
- #send_publish(topic, payload, retain, qos, new_id) ⇒ Object
- #send_pubrec(packet_id) ⇒ Object
- #send_pubrel(packet_id) ⇒ Object
- #sender=(sender) ⇒ Object
Constructor Details
#initialize(sender) ⇒ Publisher
Returns a new instance of Publisher.
18 19 20 21 22 23 24 25 26 27 28 |
# File 'lib/paho_mqtt/publisher.rb', line 18 def initialize(sender) @waiting_puback = [] @waiting_pubrec = [] @waiting_pubrel = [] @waiting_pubcomp = [] @puback_mutex = Mutex.new @pubrec_mutex = Mutex.new @pubrel_mutex = Mutex.new @pubcomp_mutex = Mutex.new @sender = sender end |
Instance Method Details
#check_waiting_publisher ⇒ Object
169 170 171 172 173 174 |
# File 'lib/paho_mqtt/publisher.rb', line 169 def check_waiting_publisher @sender.check_ack_alive(@waiting_puback, @puback_mutex) @sender.check_ack_alive(@waiting_pubrec, @pubrec_mutex) @sender.check_ack_alive(@waiting_pubrel, @pubrel_mutex) @sender.check_ack_alive(@waiting_pubcomp, @pubcomp_mutex) end |
#config_all_message_queue ⇒ Object
154 155 156 157 158 159 |
# File 'lib/paho_mqtt/publisher.rb', line 154 def (@waiting_puback, @puback_mutex) (@waiting_pubrec, @pubrec_mutex) (@waiting_pubrel, @pubrel_mutex) (@waiting_pubcomp, @pubcomp_mutex) end |
#config_message_queue(queue, mutex) ⇒ Object
161 162 163 164 165 166 167 |
# File 'lib/paho_mqtt/publisher.rb', line 161 def (queue, mutex) mutex.synchronize do queue.each do |pck| pck[:timestamp] = Time.now end end end |
#do_puback(packet_id) ⇒ Object
86 87 88 89 90 91 |
# File 'lib/paho_mqtt/publisher.rb', line 86 def do_puback(packet_id) @puback_mutex.synchronize do @waiting_puback.delete_if { |pck| pck[:id] == packet_id } end MQTT_ERR_SUCCESS end |
#do_pubcomp(packet_id) ⇒ Object
147 148 149 150 151 152 |
# File 'lib/paho_mqtt/publisher.rb', line 147 def do_pubcomp(packet_id) @pubcomp_mutex.synchronize do @waiting_pubcomp.delete_if { |pck| pck[:id] == packet_id } end MQTT_ERR_SUCCESS end |
#do_publish(qos, packet_id) ⇒ Object
64 65 66 67 68 69 70 71 72 73 74 75 76 |
# File 'lib/paho_mqtt/publisher.rb', line 64 def do_publish(qos, packet_id) case qos when 0 when 1 send_puback(packet_id) when 2 send_pubrec(packet_id) else PahoMqtt.logger.error("The packet QoS value is invalid in publish.") if PahoMqtt.logger? raise PacketException.new('Invalid publish QoS value') end MQTT_ERR_SUCCESS end |
#do_pubrec(packet_id) ⇒ Object
108 109 110 111 112 113 114 |
# File 'lib/paho_mqtt/publisher.rb', line 108 def do_pubrec(packet_id) @pubrec_mutex.synchronize do @waiting_pubrec.delete_if { |pck| pck[:id] == packet_id } end send_pubrel(packet_id) MQTT_ERR_SUCCESS end |
#do_pubrel(packet_id) ⇒ Object
131 132 133 134 135 136 137 |
# File 'lib/paho_mqtt/publisher.rb', line 131 def do_pubrel(packet_id) @pubrel_mutex.synchronize do @waiting_pubrel.delete_if { |pck| pck[:id] == packet_id } end send_pubcomp(packet_id) MQTT_ERR_SUCCESS end |
#flush_publisher ⇒ Object
176 177 178 179 180 181 182 183 184 185 186 187 188 189 |
# File 'lib/paho_mqtt/publisher.rb', line 176 def flush_publisher @puback_mutex.synchronize do @waiting_puback = [] end @pubrec_mutex.synchronize do @waiting_pubrec = [] end @pubrel_mutex.synchronize do @waiting_pubrel = [] end @pubcomp_mutex.synchronize do @waiting_pubcomp = [] end end |
#send_puback(packet_id) ⇒ Object
78 79 80 81 82 83 84 |
# File 'lib/paho_mqtt/publisher.rb', line 78 def send_puback(packet_id) packet = PahoMqtt::Packet::Puback.new( :id => packet_id ) @sender.append_to_writing(packet) MQTT_ERR_SUCCESS end |
#send_pubcomp(packet_id) ⇒ Object
139 140 141 142 143 144 145 |
# File 'lib/paho_mqtt/publisher.rb', line 139 def send_pubcomp(packet_id) packet = PahoMqtt::Packet::Pubcomp.new( :id => packet_id ) @sender.append_to_writing(packet) MQTT_ERR_SUCCESS end |
#send_publish(topic, payload, retain, qos, new_id) ⇒ Object
34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 |
# File 'lib/paho_mqtt/publisher.rb', line 34 def send_publish(topic, payload, retain, qos, new_id) packet = PahoMqtt::Packet::Publish.new( :id => new_id, :topic => topic, :payload => payload, :retain => retain, :qos => qos ) case qos when 1 @puback_mutex.synchronize do if @waiting_puback.length >= MAX_PUBACK PahoMqtt.logger.error('PUBACK queue is full, could not send with qos=1') if PahoMqtt.logger? return MQTT_ERR_FAIL end @waiting_puback.push(:id => new_id, :packet => packet, :timestamp => Time.now) end when 2 @pubrec_mutex.synchronize do if @waiting_pubrec.length >= MAX_PUBREC PahoMqtt.logger.error('PUBREC queue is full, could not send with qos=2') if PahoMqtt.logger? return MQTT_ERR_FAIL end @waiting_pubrec.push(:id => new_id, :packet => packet, :timestamp => Time.now) end end @sender.append_to_writing(packet) MQTT_ERR_SUCCESS end |
#send_pubrec(packet_id) ⇒ Object
93 94 95 96 97 98 99 100 101 102 103 104 105 106 |
# File 'lib/paho_mqtt/publisher.rb', line 93 def send_pubrec(packet_id) packet = PahoMqtt::Packet::Pubrec.new( :id => packet_id ) @pubrel_mutex.synchronize do if @waiting_pubrel.length >= MAX_PUBREL PahoMqtt.logger.error('PUBREL queue is full, could not acknowledge qos=2') if PahoMqtt.logger? return MQTT_ERR_FAIL end @waiting_pubrel.push(:id => packet_id , :packet => packet, :timestamp => Time.now) end @sender.append_to_writing(packet) MQTT_ERR_SUCCESS end |
#send_pubrel(packet_id) ⇒ Object
116 117 118 119 120 121 122 123 124 125 126 127 128 129 |
# File 'lib/paho_mqtt/publisher.rb', line 116 def send_pubrel(packet_id) packet = PahoMqtt::Packet::Pubrel.new( :id => packet_id ) @pubcomp_mutex.synchronize do if @waiting_pubcomp.length >= MAX_PUBCOMP PahoMqtt.logger.error('PUBCOMP queue is full, could not acknowledge qos=2') if PahoMqtt.logger? return MQTT_ERR_FAIL end @waiting_pubcomp.push(:id => packet_id, :packet => packet, :timestamp => Time.now) end @sender.append_to_writing(packet) MQTT_ERR_SUCCESS end |
#sender=(sender) ⇒ Object
30 31 32 |
# File 'lib/paho_mqtt/publisher.rb', line 30 def sender=(sender) @sender = sender end |