Class: MQTT::SubHandler
- Inherits:
-
Object
- Object
- MQTT::SubHandler
- Defined in:
- lib/mqtt/sub_handler.rb
Direct Known Subclasses
Instance Attribute Summary collapse
-
#jsonifyHashes ⇒ Object
Whether or not hashes and arrays should be converted to JSON when sending.
Custom subscription handling collapse
-
#register_subscription(subObject) ⇒ Object
Register a custom subscription, and send a subscription message to the server.
-
#unregister_subscription(subObject) ⇒ Object
Unregister a subscription.
Subscribing collapse
-
#subscribe_to(topic, qos: 1) {|data, topicList| ... } ⇒ MQTT::Subscriptions::CallbackSubscription
(also: #subscribeTo)
Attach a callback to a MQTT Topic or wildcard.
-
#track(topic, qos: 1) {|data| ... } ⇒ MQTT::Subscriptions::ValueTrackerSubscription
(also: #on_change)
Track data changes for a topic in the background.
-
#wait_for(topic, qos: 1, timeout: nil) {|data, topicList| ... } ⇒ Boolean
Synchronously wait for data.
Class Method Summary collapse
-
.get_topic_split(topicName) ⇒ Array<String>
Split a Topic into a Topic-Array.
-
.getTopicMatch(receivedTopicString, topicPattern) ⇒ nil, Array<String>
Match a topic string to a topic pattern.
Instance Method Summary collapse
-
#initialize(mqttClient, jsonify: true) ⇒ SubHandler
constructor
Initialize a new MQTT::SubHandler The handler immediately connects to the server, and begins receciving and sending.
-
#lockAndListen ⇒ Object
Pause the main thread and wait for messages.
-
#publish_to(topic, data, qos: 1, retain: false) ⇒ Object
(also: #publishTo)
Publish a message to topic.
Constructor Details
#initialize(mqttClient, jsonify: true) ⇒ SubHandler
Initialize a new MQTT::SubHandler The handler immediately connects to the server, and begins receciving and sending.
351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 |
# File 'lib/mqtt/sub_handler.rb', line 351 def initialize(mqttClient, jsonify: true) @callbackList = Array.new(); if mqttClient.is_a? String @mqtt = MQTT::Client.new(mqttClient); else @mqtt = mqttClient; end @jsonifyHashes = jsonify; @conChangeMutex = Mutex.new(); @connected = false; @mqtt.client_id ||= MQTT::Client.generate_client_id("MQTT_Sub_", 8); @publishQueue = Array.new(); @subscribeQueue = Array.new(); @subscribedTopics = Hash.new(); @trackerHash = Hash.new(); @listenerThread = Thread.new do ensure_clean_start(); mqtt_resub_thread(); end @listenerThread.abort_on_exception = true; at_exit { flush_pubqueue(); @listenerThread.kill(); ensure_clean_exit(); } begin Timeout.timeout(10) { until(@connected) do sleep 0.1; end } rescue Timeout::Error end end |
Instance Attribute Details
#jsonifyHashes ⇒ Object
Whether or not hashes and arrays should be converted to JSON when sending
20 21 22 |
# File 'lib/mqtt/sub_handler.rb', line 20 def jsonifyHashes @jsonifyHashes end |
Class Method Details
.get_topic_split(topicName) ⇒ Array<String>
This function is mainly used for background processing.
Split a Topic into a Topic-Array
26 27 28 |
# File 'lib/mqtt/sub_handler.rb', line 26 def self.get_topic_split(topicName) return topicName.scan(/[^\/]+/); end |
.getTopicMatch(receivedTopicString, topicPattern) ⇒ nil, Array<String>
This function is mainly used for background processing.
Match a topic string to a topic pattern
39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 |
# File 'lib/mqtt/sub_handler.rb', line 39 def self.getTopicMatch(receivedTopicString, topicPattern) receivedTopicList = get_topic_split receivedTopicString; outputTopicList = Array.new(); return nil unless receivedTopicList.length >= topicPattern.length; topicPattern.each_index do |i| if(topicPattern[i] == "+") outputTopicList << receivedTopicList[i]; elsif(topicPattern[i] == "#") outputTopicList.concat receivedTopicList[i..-1]; return outputTopicList; elsif topicPattern[i] != receivedTopicList[i]; return nil; end end return outputTopicList if topicPattern.length == receivedTopicList.length; return nil; end |
Instance Method Details
#lockAndListen ⇒ Object
Pause the main thread and wait for messages. This is mainly useful when the code has set everything up, but doesn’t just want to end. “INT” is trapped, ensuring a smooth exit on Ctrl-C
312 313 314 315 316 317 318 319 |
# File 'lib/mqtt/sub_handler.rb', line 312 def lockAndListen() Signal.trap("INT") { exit 0 } puts "Main thread paused." Thread.stop(); end |
#publish_to(topic, data, qos: 1, retain: false) ⇒ Object Also known as: publishTo
Publish a message to topic.
210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 |
# File 'lib/mqtt/sub_handler.rb', line 210 def publish_to(topic, data, qos: 1, retain: false) raise ArgumentError, "Wrong symbol in topic: #{topic}" if topic =~ /[#\+]/ if(@jsonifyHashes and (data.is_a? Array or data.is_a? Hash)) data = data.to_json end begin @conChangeMutex.lock if not @connected @publishQueue << {topic: topic, data: data, qos: qos, retain: retain} unless qos == 0 @conChangeMutex.unlock else @conChangeMutex.unlock @mqtt.publish(topic, data, retain); end rescue MQTT::Exception, SocketError, SystemCallError sleep 0.05; retry end end |
#register_subscription(subObject) ⇒ Object
Register a custom subscription, and send a subscription message to the server.
114 115 116 117 118 119 120 |
# File 'lib/mqtt/sub_handler.rb', line 114 def register_subscription(subObject) raise ArgumentError, "Object is not a subscription!" unless subObject.is_a? MQTT::Subscriptions::Subscription return if @callbackList.include? subObject; @callbackList << subObject; raw_subscribe_to(subObject.topic, qos: subObject.qos); end |
#subscribe_to(topic, qos: 1) {|data, topicList| ... } ⇒ MQTT::Subscriptions::CallbackSubscription Also known as: subscribeTo
Attach a callback to a MQTT Topic or wildcard. The callback will be saved, and asynchronously executed whenever a message from a matching topic (including wildcards) is received.
195 196 197 198 199 200 |
# File 'lib/mqtt/sub_handler.rb', line 195 def subscribe_to(topic, qos: 1, &callback) subObject = MQTT::Subscriptions::CallbackSubscription.new(topic, qos, callback); register_subscription(subObject); return subObject; end |
#track(topic, qos: 1) {|data| ... } ⇒ MQTT::Subscriptions::ValueTrackerSubscription Also known as: on_change
Track data changes for a topic in the background. With no callback given, the returned object can be used to get the last received raw data string. With a callback given, the callback will be called whenever a change in data is detected.
170 171 172 173 174 175 176 177 178 179 180 181 |
# File 'lib/mqtt/sub_handler.rb', line 170 def track(topic, qos: 1, &callback) unless(@trackerHash.has_key? topic) subObject = MQTT::Subscriptions::ValueTrackerSubscription.new(topic, qos); register_subscription(subObject); @trackerHash[topic] = subObject; end @trackerHash[topic].attach(callback) if(callback) return @trackerHash[topic]; end |
#unregister_subscription(subObject) ⇒ Object
Unregister a subscription. Removes it from the callback list and unsubscribes from the topic if no other subscriptions for it are present.
104 105 106 107 108 109 |
# File 'lib/mqtt/sub_handler.rb', line 104 def unregister_subscription(subObject) raise ArgumentError, "Object is not a subscription!" unless subObject.is_a? MQTT::Subscriptions::Subscription return unless @callbackList.include? subObject; @callbackList.delete(subObject); end |
#wait_for(topic, qos: 1, timeout: nil) {|data, topicList| ... } ⇒ Boolean
Synchronously wait for data. It waits for a message on ‘topic`, optionally letting a block check the data for validity, and optionally aborting after a timeout
134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 |
# File 'lib/mqtt/sub_handler.rb', line 134 def wait_for(topic, qos: 1, timeout: nil) unless block_given? raise ArgumentError, "A block for data-processing needs to be passed!" end subObject = MQTT::Subscriptions::WaitpointSubscription.new(topic, qos); register_subscription(subObject); begin Timeout.timeout(timeout) do loop do return_data = subObject.waitpoint.wait()[1]; if yield(return_data[0], return_data[1]) return true; end end end rescue Timeout::Error return false; ensure unregister_subscription(subObject); end end |