Class: MQTT::SubHandler
- Inherits:
-
Object
- Object
- MQTT::SubHandler
- Defined in:
- lib/mqtt/sub_handler.rb
Direct Known Subclasses
Instance Attribute Summary collapse
-
#jsonifyHashes ⇒ Object
Whether or not hashes and arrays should be converted to JSON when sending.
Custom subscription handling collapse
-
#register_subscription(subObject) ⇒ Object
Register a custom subscription, and send a subscription message to the server.
-
#unregister_subscription(subObject) ⇒ Object
Unregister a subscription.
Subscribing collapse
-
#subscribe_to(topic, qos: 1) {|data, topicList| ... } ⇒ MQTT::Subscriptions::CallbackSubscription
(also: #subscribeTo)
Attach a callback to a MQTT Topic or wildcard.
-
#track(topic, qos: 1) {|data| ... } ⇒ MQTT::Subscriptions::ValueTrackerSubscription
(also: #on_change)
Track data changes for a topic in the background.
-
#wait_for(topic, qos: 1, timeout: nil) {|data, topicList| ... } ⇒ Boolean
Synchronously wait for data.
Class Method Summary collapse
-
.get_topic_split(topicName) ⇒ Array<String>
Split a Topic into a Topic-Array.
-
.getTopicMatch(receivedTopicString, topicPattern) ⇒ nil, Array<String>
Match a topic string to a topic pattern.
Instance Method Summary collapse
-
#initialize(mqttClient, jsonify: true) ⇒ SubHandler
constructor
Initialize a new MQTT::SubHandler The handler immediately connects to the server, and begins receciving and sending.
-
#lockAndListen ⇒ Object
Pause the main thread and wait for messages.
-
#publish_to(topic, data, qos: 1, retain: false) ⇒ Object
(also: #publishTo)
Publish a message to topic.
Constructor Details
#initialize(mqttClient, jsonify: true) ⇒ SubHandler
Initialize a new MQTT::SubHandler The handler immediately connects to the server, and begins receciving and sending.
358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 |
# File 'lib/mqtt/sub_handler.rb', line 358 def initialize(mqttClient, jsonify: true) @callbackList = Array.new(); if mqttClient.is_a? String @mqtt = MQTT::Client.new(mqttClient); else @mqtt = mqttClient; end @jsonifyHashes = jsonify; @conChangeMutex = Mutex.new(); @connected = false; @mqtt.client_id ||= MQTT::Client.generate_client_id("MQTT_Sub_", 8); @publishQueue = Array.new(); @subscribeQueue = Array.new(); @subscribedTopics = Hash.new(); @trackerHash = Hash.new(); @listenerThread = Thread.new do ensure_clean_start(); mqtt_resub_thread(); end @listenerThread.abort_on_exception = true; @publisherThread = Thread.new do mqtt_push_thread(); end @publisherThread.abort_on_exception = true; begin Timeout.timeout(3) { until(@connected) sleep 0.1; end } rescue Timeout::Error STDERR.puts "MQTT: #{@mqtt.host} did not connect!".red end at_exit { flush_pubqueue(); @listenerThread.kill(); @publisherThread.kill(); ensure_clean_exit(); } end |
Instance Attribute Details
#jsonifyHashes ⇒ Object
Whether or not hashes and arrays should be converted to JSON when sending
22 23 24 |
# File 'lib/mqtt/sub_handler.rb', line 22 def jsonifyHashes @jsonifyHashes end |
Class Method Details
.get_topic_split(topicName) ⇒ Array<String>
This function is mainly used for background processing.
Split a Topic into a Topic-Array
28 29 30 |
# File 'lib/mqtt/sub_handler.rb', line 28 def self.get_topic_split(topicName) return topicName.scan(/[^\/]+/); end |
.getTopicMatch(receivedTopicString, topicPattern) ⇒ nil, Array<String>
This function is mainly used for background processing.
Match a topic string to a topic pattern
41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 |
# File 'lib/mqtt/sub_handler.rb', line 41 def self.getTopicMatch(receivedTopicString, topicPattern) receivedTopicList = get_topic_split receivedTopicString; outputTopicList = Array.new(); return nil unless receivedTopicList.length >= topicPattern.length; topicPattern.each_index do |i| if(topicPattern[i] == "+") outputTopicList << receivedTopicList[i]; elsif(topicPattern[i] == "#") outputTopicList.concat receivedTopicList[i..-1]; return outputTopicList; elsif topicPattern[i] != receivedTopicList[i]; return nil; end end return outputTopicList if topicPattern.length == receivedTopicList.length; return nil; end |
Instance Method Details
#lockAndListen ⇒ Object
Pause the main thread and wait for messages. This is mainly useful when the code has set everything up, but doesn’t just want to end. “INT” is trapped, ensuring a smooth exit on Ctrl-C
322 323 324 325 326 327 328 329 |
# File 'lib/mqtt/sub_handler.rb', line 322 def lockAndListen() Signal.trap("INT") { exit 0 } puts "Main thread paused." Thread.stop(); end |
#publish_to(topic, data, qos: 1, retain: false) ⇒ Object Also known as: publishTo
Publish a message to topic.
207 208 209 210 211 212 213 214 215 216 |
# File 'lib/mqtt/sub_handler.rb', line 207 def publish_to(topic, data, qos: 1, retain: false) raise ArgumentError, "Wrong symbol in topic: #{topic}" if topic =~ /[#\+]/ if(@jsonifyHashes and (data.is_a? Array or data.is_a? Hash)) data = data.to_json end @publishQueue << {topic: topic, data: data, qos: qos, retain: retain} @publisherThread.run(); end |
#register_subscription(subObject) ⇒ Object
Register a custom subscription, and send a subscription message to the server.
111 112 113 114 115 116 117 |
# File 'lib/mqtt/sub_handler.rb', line 111 def register_subscription(subObject) raise ArgumentError, "Object is not a subscription!" unless subObject.is_a? MQTT::Subscriptions::Subscription return if @callbackList.include? subObject; @callbackList << subObject; raw_subscribe_to(subObject.topic, qos: subObject.qos); end |
#subscribe_to(topic, qos: 1) {|data, topicList| ... } ⇒ MQTT::Subscriptions::CallbackSubscription Also known as: subscribeTo
Attach a callback to a MQTT Topic or wildcard. The callback will be saved, and asynchronously executed whenever a message from a matching topic (including wildcards) is received.
192 193 194 195 196 197 |
# File 'lib/mqtt/sub_handler.rb', line 192 def subscribe_to(topic, qos: 1, &callback) subObject = MQTT::Subscriptions::CallbackSubscription.new(topic, qos, callback); register_subscription(subObject); return subObject; end |
#track(topic, qos: 1) {|data| ... } ⇒ MQTT::Subscriptions::ValueTrackerSubscription Also known as: on_change
Track data changes for a topic in the background. With no callback given, the returned object can be used to get the last received raw data string. With a callback given, the callback will be called whenever a change in data is detected.
167 168 169 170 171 172 173 174 175 176 177 178 |
# File 'lib/mqtt/sub_handler.rb', line 167 def track(topic, qos: 1, &callback) unless(@trackerHash.has_key? topic) subObject = MQTT::Subscriptions::ValueTrackerSubscription.new(topic, qos); register_subscription(subObject); @trackerHash[topic] = subObject; end @trackerHash[topic].attach(callback) if(callback) return @trackerHash[topic]; end |
#unregister_subscription(subObject) ⇒ Object
Unregister a subscription. Removes it from the callback list and unsubscribes from the topic if no other subscriptions for it are present.
101 102 103 104 105 106 |
# File 'lib/mqtt/sub_handler.rb', line 101 def unregister_subscription(subObject) raise ArgumentError, "Object is not a subscription!" unless subObject.is_a? MQTT::Subscriptions::Subscription return unless @callbackList.include? subObject; @callbackList.delete(subObject); end |
#wait_for(topic, qos: 1, timeout: nil) {|data, topicList| ... } ⇒ Boolean
Synchronously wait for data. It waits for a message on ‘topic`, optionally letting a block check the data for validity, and optionally aborting after a timeout
131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 |
# File 'lib/mqtt/sub_handler.rb', line 131 def wait_for(topic, qos: 1, timeout: nil) unless block_given? raise ArgumentError, "A block for data-processing needs to be passed!" end subObject = MQTT::Subscriptions::WaitpointSubscription.new(topic, qos); register_subscription(subObject); begin Timeout.timeout(timeout) do loop do return_data = subObject.waitpoint.wait()[1]; if yield(return_data[0], return_data[1]) return true; end end end rescue Timeout::Error return false; ensure unregister_subscription(subObject); end end |