Class: MQTT::SubHandler
- Inherits:
-
Object
- Object
- MQTT::SubHandler
- Defined in:
- lib/mqtt/sub_handler.rb
Custom subscription handling collapse
-
#register_subscription(subObject) ⇒ Object
Register a custom subscription, and send a subscription message to the server.
-
#unregister_subscription(subObject) ⇒ Object
Unregister a subscription.
Subscribing collapse
-
#subscribe_to(topic, qos: 1) {|data, topicList| ... } ⇒ MQTT::Subscriptions::CallbackSubscription
(also: #subscribeTo)
Attach a callback to a MQTT Topic or wildcard.
-
#track(topic, qos: 1) {|data| ... } ⇒ MQTT::Subscriptions::ValueTrackerSubscription
(also: #on_change)
Track data changes for a topic in the background.
-
#wait_for(topic, qos: 1, timeout: nil) {|data, topicList| ... } ⇒ Boolean
Synchronously wait for data.
Class Method Summary collapse
-
.getTopicMatch(receivedTopicString, topicPattern) ⇒ nil, Array<String>
Match a topic string to a topic pattern.
-
.getTopicSplit(topicName) ⇒ Array<String>
Split a Topic into a Topic-Array.
Instance Method Summary collapse
-
#initialize(mqttClient) ⇒ SubHandler
constructor
Initialize a new MQTT::SubHandler The handler immediately connects to the server, and begins receciving and sending.
-
#lockAndListen ⇒ Object
Pause the main thread and wait for messages.
-
#publish_to(topic, data, qos: 1, retain: false) ⇒ Object
(also: #publishTo)
Publish a message to topic.
Constructor Details
#initialize(mqttClient) ⇒ SubHandler
Initialize a new MQTT::SubHandler The handler immediately connects to the server, and begins receciving and sending.
300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 |
# File 'lib/mqtt/sub_handler.rb', line 300 def initialize(mqttClient) @callbackList = Array.new(); if mqttClient.is_a? String then @mqtt = MQTT::Client.new(mqttClient); else @mqtt = mqttClient; end @conChangeMutex = Mutex.new(); @connected = false; @mqtt.client_id ||= MQTT::Client.generate_client_id("MQTT_Sub_", 8); @publishQueue = Array.new(); @subscribeQueue = Array.new(); @subscribedTopics = Hash.new(); @trackerHash = Hash.new(); @listenerThread = Thread.new do if @mqtt.clean_session @mqttWasStartedClean = true; begin @mqtt.connect(); @mqtt.disconnect(); rescue MQTT::Exception sleep 1; retry rescue SocketError, SystemCallError sleep 5 retry end @mqtt.clean_session=false; end mqtt_resub_thread end @listenerThread.abort_on_exception = true; at_exit { flush_pubqueue @listenerThread.kill(); if(@mqttWasStartedClean) then print "Logging out of mqtt server... " begin Timeout::timeout(10) { begin @mqtt.clean_session = true; @mqtt.disconnect(); @mqtt.connect(); rescue MQTT::Exception, SocketError, SystemCallError sleep 1 retry; end } rescue Timeout::Error puts "Timed out, aborting!"; else puts "Done." end end } begin Timeout::timeout(10) { until(@connected) do sleep 0.1; end } rescue Timeout::Error end end |
Class Method Details
.getTopicMatch(receivedTopicString, topicPattern) ⇒ nil, Array<String>
This function is mainly used for background processing.
Match a topic string to a topic pattern
35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 |
# File 'lib/mqtt/sub_handler.rb', line 35 def self.getTopicMatch(receivedTopicString, topicPattern) receivedTopicList = getTopicSplit receivedTopicString; outputTopicList = Array.new(); return nil unless receivedTopicList.length >= topicPattern.length; topicPattern.each_index do |i| if(topicPattern[i] == "+") then outputTopicList << receivedTopicList[i]; elsif(topicPattern[i] == "#") then outputTopicList.concat receivedTopicList[i..-1]; return outputTopicList; elsif topicPattern[i] != receivedTopicList[i]; return nil; end end return outputTopicList if topicPattern.length == receivedTopicList.length; return nil; end |
.getTopicSplit(topicName) ⇒ Array<String>
This function is mainly used for background processing.
Split a Topic into a Topic-Array
22 23 24 |
# File 'lib/mqtt/sub_handler.rb', line 22 def self.getTopicSplit(topicName) return topicName.scan(/[^\/]+/); end |
Instance Method Details
#lockAndListen ⇒ Object
Pause the main thread and wait for messages. This is mainly useful when the code has set everything up, but doesn’t just want to end. “INT” is trapped, ensuring a smooth exit on Ctrl-C
263 264 265 266 267 268 269 270 |
# File 'lib/mqtt/sub_handler.rb', line 263 def lockAndListen() Signal.trap("INT") { exit 0 } puts "Main thread paused." Thread.stop(); end |
#publish_to(topic, data, qos: 1, retain: false) ⇒ Object Also known as: publishTo
Publish a message to topic.
206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 |
# File 'lib/mqtt/sub_handler.rb', line 206 def publish_to(topic, data, qos: 1, retain: false) raise ArgumentError, "Wrong symbol in topic: #{topic}" if topic =~ /[#\+]/ begin @conChangeMutex.lock if not @connected then @publishQueue << {topic: topic, data: data, qos: qos, retain: retain} unless qos == 0 @conChangeMutex.unlock else @conChangeMutex.unlock @mqtt.publish(topic, data, retain); end rescue MQTT::Exception, SocketError, SystemCallError sleep 0.05; retry end end |
#register_subscription(subObject) ⇒ Object
Register a custom subscription, and send a subscription message to the server.
110 111 112 113 114 115 116 |
# File 'lib/mqtt/sub_handler.rb', line 110 def register_subscription(subObject) raise ArgumentError, "Object is not a subscription!" unless subObject.is_a? MQTT::Subscriptions::Subscription return if @callbackList.include? subObject; @callbackList << subObject; raw_subscribe_to(subObject.topic, qos: subObject.qos); end |
#subscribe_to(topic, qos: 1) {|data, topicList| ... } ⇒ MQTT::Subscriptions::CallbackSubscription Also known as: subscribeTo
Attach a callback to a MQTT Topic or wildcard. The callback will be saved, and asynchronously executed whenever a message from a matching topic (including wildcards) is received.
191 192 193 194 195 196 |
# File 'lib/mqtt/sub_handler.rb', line 191 def subscribe_to(topic, qos: 1, &callback) subObject = MQTT::Subscriptions::CallbackSubscription.new(topic, qos, callback); register_subscription(subObject); return subObject; end |
#track(topic, qos: 1) {|data| ... } ⇒ MQTT::Subscriptions::ValueTrackerSubscription Also known as: on_change
Track data changes for a topic in the background. With no callback given, the returned object can be used to get the last received raw data string. With a callback given, the callback will be called whenever a change in data is detected.
166 167 168 169 170 171 172 173 174 175 176 177 |
# File 'lib/mqtt/sub_handler.rb', line 166 def track(topic, qos: 1, &callback) unless(@trackerHash.has_key? topic) subObject = MQTT::Subscriptions::ValueTrackerSubscription.new(topic, qos); register_subscription(subObject); @trackerHash[topic] = subObject; end @trackerHash[topic].attach(callback) if(callback) return @trackerHash[topic]; end |
#unregister_subscription(subObject) ⇒ Object
Unregister a subscription. Removes it from the callback list and unsubscribes from the topic if no other subscriptions for it are present.
100 101 102 103 104 105 |
# File 'lib/mqtt/sub_handler.rb', line 100 def unregister_subscription(subObject) raise ArgumentError, "Object is not a subscription!" unless subObject.is_a? MQTT::Subscriptions::Subscription return unless @callbackList.include? subObject; @callbackList.delete(subObject); end |
#wait_for(topic, qos: 1, timeout: nil) {|data, topicList| ... } ⇒ Boolean
Synchronously wait for data. It waits for a message on ‘topic`, optionally letting a block check the data for validity, and optionally aborting after a timeout
130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 |
# File 'lib/mqtt/sub_handler.rb', line 130 def wait_for(topic, qos: 1, timeout: nil) unless block_given? then raise ArgumentError, "A block for data-processing needs to be passed!" end subObject = MQTT::Subscriptions::WaitpointSubscription.new(topic, qos); register_subscription(subObject); begin Timeout::timeout(timeout) do loop do return_data = subObject.waitpoint.wait()[1]; if yield(return_data[0], return_data[1]) then return true; end end end rescue Timeout::Error return false; ensure unregister_subscription(subObject); end end |