Class: PahoMqtt::Subscriber
- Inherits:
-
Object
- Object
- PahoMqtt::Subscriber
- Defined in:
- lib/paho_mqtt/subscriber.rb
Instance Attribute Summary collapse
-
#subscribed_topics ⇒ Object
readonly
Returns the value of attribute subscribed_topics.
Instance Method Summary collapse
- #add_subscription(max_qos, packet_id, adjust_qos) ⇒ Object
- #check_waiting_subscriber ⇒ Object
- #clear_queue ⇒ Object
- #config_subscription(new_id) ⇒ Object
-
#initialize(sender) ⇒ Subscriber
constructor
A new instance of Subscriber.
- #remove_subscription(packet_id, to_unsub) ⇒ Object
- #send_subscribe(topics, new_id) ⇒ Object
- #send_unsubscribe(topics, new_id) ⇒ Object
- #sender=(sender) ⇒ Object
- #valid_topics?(topics) ⇒ Boolean
Constructor Details
#initialize(sender) ⇒ Subscriber
Returns a new instance of Subscriber.
20 21 22 23 24 25 26 27 28 |
# File 'lib/paho_mqtt/subscriber.rb', line 20 def initialize(sender) @waiting_suback = [] @waiting_unsuback = [] @subscribed_mutex = Mutex.new @subscribed_topics = [] @suback_mutex = Mutex.new @unsuback_mutex = Mutex.new @sender = sender end |
Instance Attribute Details
#subscribed_topics ⇒ Object (readonly)
Returns the value of attribute subscribed_topics.
18 19 20 |
# File 'lib/paho_mqtt/subscriber.rb', line 18 def subscribed_topics @subscribed_topics end |
Instance Method Details
#add_subscription(max_qos, packet_id, adjust_qos) ⇒ Object
55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 |
# File 'lib/paho_mqtt/subscriber.rb', line 55 def add_subscription(max_qos, packet_id, adjust_qos) @suback_mutex.synchronize do adjust_qos, @waiting_suback = @waiting_suback.partition { |pck| pck[:id] == packet_id } end if adjust_qos.length == 1 adjust_qos = adjust_qos.first[:packet].topics adjust_qos.each do |t| if [0, 1, 2].include?(max_qos[0]) t[1] = max_qos.shift elsif max_qos[0] == 128 adjust_qos.delete(t) else PahoMqtt.logger.error("The QoS value is invalid in subscribe.") if PahoMqtt.logger? raise PacketException.new('Invalid suback QoS value') end end else PahoMqtt.logger.error("The packet id is invalid, already used.") if PahoMqtt.logger? raise PacketException.new("Invalid suback packet id: #{packet_id}") end @subscribed_mutex.synchronize do @subscribed_topics.concat(adjust_qos) end return adjust_qos end |
#check_waiting_subscriber ⇒ Object
141 142 143 144 |
# File 'lib/paho_mqtt/subscriber.rb', line 141 def check_waiting_subscriber @sender.check_ack_alive(@waiting_suback, @suback_mutex) @sender.check_ack_alive(@waiting_unsuback, @unsuback_mutex) end |
#clear_queue ⇒ Object
146 147 148 |
# File 'lib/paho_mqtt/subscriber.rb', line 146 def clear_queue @waiting_suback = [] end |
#config_subscription(new_id) ⇒ Object
34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 |
# File 'lib/paho_mqtt/subscriber.rb', line 34 def config_subscription(new_id) unless @subscribed_topics == [] || @subscribed_topics.nil? packet = PahoMqtt::Packet::Subscribe.new( :id => new_id, :topics => @subscribed_topics ) @subscribed_mutex.synchronize do @subscribed_topics = [] end @suback_mutex.synchronize do if @waiting_suback.length >= MAX_SUBACK PahoMqtt.logger.error('SUBACK queue is full, could not send subscribe') if PahoMqtt.logger? return MQTT_ERR_FAILURE end @waiting_suback.push(:id => new_id, :packet => packet, :timestamp => Time.now) end @sender.append_to_writing(packet) end MQTT_ERR_SUCCESS end |
#remove_subscription(packet_id, to_unsub) ⇒ Object
81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 |
# File 'lib/paho_mqtt/subscriber.rb', line 81 def remove_subscription(packet_id, to_unsub) @unsuback_mutex.synchronize do to_unsub, @waiting_unsuback = @waiting_unsuback.partition { |pck| pck[:id] == packet_id } end if to_unsub.length == 1 to_unsub = to_unsub.first[:packet].topics else PahoMqtt.logger.error("The packet id is invalid, already used.") if PahoMqtt.logger? raise PacketException.new("Invalid unsuback packet id: #{packet_id}") end @subscribed_mutex.synchronize do to_unsub.each do |filter| @subscribed_topics.delete_if { |topic| PahoMqtt.match_filter(topic.first, filter) } end end return to_unsub end |
#send_subscribe(topics, new_id) ⇒ Object
101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 |
# File 'lib/paho_mqtt/subscriber.rb', line 101 def send_subscribe(topics, new_id) unless valid_topics?(topics) == MQTT_ERR_FAIL packet = PahoMqtt::Packet::Subscribe.new( :id => new_id, :topics => topics ) @sender.append_to_writing(packet) @suback_mutex.synchronize do if @waiting_suback.length >= MAX_SUBACK PahoMqtt.logger.error('SUBACK queue is full, could not send subscribe') if PahoMqtt.logger? return MQTT_ERR_FAILURE end @waiting_suback.push(:id => new_id, :packet => packet, :timestamp => Time.now) end MQTT_ERR_SUCCESS else raise ProtocolViolation end end |
#send_unsubscribe(topics, new_id) ⇒ Object
121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 |
# File 'lib/paho_mqtt/subscriber.rb', line 121 def send_unsubscribe(topics, new_id) unless valid_topics?(topics) == MQTT_ERR_FAIL packet = PahoMqtt::Packet::Unsubscribe.new( :id => new_id, :topics => topics ) @sender.append_to_writing(packet) @unsuback_mutex.synchronize do if @waiting_suback.length >= MAX_UNSUBACK PahoMqtt.logger.error('UNSUBACK queue is full, could not send unbsubscribe') if PahoMqtt.logger? return MQTT_ERR_FAIL end @waiting_unsuback.push(:id => new_id, :packet => packet, :timestamp => Time.now) end MQTT_ERR_SUCCESS else raise ProtocolViolation end end |
#sender=(sender) ⇒ Object
30 31 32 |
# File 'lib/paho_mqtt/subscriber.rb', line 30 def sender=(sender) @sender = sender end |
#valid_topics?(topics) ⇒ Boolean
150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 |
# File 'lib/paho_mqtt/subscriber.rb', line 150 def valid_topics?(topics) unless topics.length == 0 topics.map do |topic| case topic when Array return MQTT_ERR_FAIL if topic.first == "" when String return MQTT_ERR_FAIL if topic == "" end end else MQTT_ERR_FAIL end MQTT_ERR_SUCCESS end |