Class: MQTT::BaseHandler
- Inherits:
-
Object
- Object
- MQTT::BaseHandler
- Includes:
- XasLogger::Mix
- Defined in:
- lib/mqtt/base_handler.rb
Direct Known Subclasses
Custom subscription handling collapse
-
#register_subscription(subObject) ⇒ Object
Register a custom subscription, and send a subscription message to the server.
-
#unregister_subscription(subObject) ⇒ Object
Unregister a subscription.
Class Method Summary collapse
-
.get_topic_split(topicName) ⇒ Array<String>
Split a Topic into a Topic-Array.
-
.getTopicMatch(receivedTopicString, topicPattern) ⇒ nil, Array<String>
Match a topic string to a topic pattern.
Instance Method Summary collapse
- #destroy! ⇒ Object
-
#initialize(mqttClient, logger: nil) ⇒ BaseHandler
constructor
Initialize a new MQTT::SubHandler The handler immediately connects to the server, and begins receciving and sending.
-
#lockAndListen ⇒ Object
Pause the main thread and wait for messages.
Constructor Details
#initialize(mqttClient, logger: nil) ⇒ BaseHandler
Initialize a new MQTT::SubHandler The handler immediately connects to the server, and begins receciving and sending.
298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 |
# File 'lib/mqtt/base_handler.rb', line 298 def initialize(mqttClient, logger: nil) @callbackList = Array.new(); if mqttClient.is_a? String @mqtt = MQTT::Client.new(mqttClient); @mqtt.clean_session = false; else @mqtt = mqttClient; end init_x_log("MQTT #{@mqtt.host}", logger); self.log_level = Logger::INFO; @conChangeMutex = Mutex.new(); @connected = false; @mqtt.client_id ||= MQTT::Client.generate_client_id("MQTT_Sub_", 8); @packetQueue = Array.new(); @packetQueueMutex = Mutex.new(); @publisherThreadWaiting = false; @subscribedTopics = Hash.new(); @trackerHash = Hash.new(); @listenerThread = Thread.new do ensure_clean_start(); mqtt_resub_thread(); end @listenerThread.abort_on_exception = true; begin Timeout.timeout(5) { until(@connected) sleep 0.1; end } rescue Timeout::Error x_loge("Broker did not connect!"); end @publisherThread = Thread.new do mqtt_push_thread(); end @publisherThread.abort_on_exception = true; at_exit { destroy!() } end |
Class Method Details
.get_topic_split(topicName) ⇒ Array<String>
This function is mainly used for background processing.
Split a Topic into a Topic-Array
16 17 18 |
# File 'lib/mqtt/base_handler.rb', line 16 def self.get_topic_split(topicName) return topicName.scan(/[^\/]+/); end |
.getTopicMatch(receivedTopicString, topicPattern) ⇒ nil, Array<String>
This function is mainly used for background processing.
Match a topic string to a topic pattern
29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 |
# File 'lib/mqtt/base_handler.rb', line 29 def self.getTopicMatch(receivedTopicString, topicPattern) receivedTopicList = get_topic_split receivedTopicString; outputTopicList = Array.new(); return nil unless receivedTopicList.length >= topicPattern.length; topicPattern.each_index do |i| if(topicPattern[i] == "+") outputTopicList << receivedTopicList[i]; elsif(topicPattern[i] == "#") outputTopicList.concat receivedTopicList[i..-1]; return outputTopicList; elsif topicPattern[i] != receivedTopicList[i]; return nil; end end return outputTopicList if topicPattern.length == receivedTopicList.length; return nil; end |
Instance Method Details
#destroy! ⇒ Object
257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 |
# File 'lib/mqtt/base_handler.rb', line 257 def destroy!() return if = true; unless @packetQueue.empty? x_logd "Finishing sending of MQTT messages ... " @publisherThread.run() if @publisherThreadWaiting begin Timeout.timeout(4) { until @packetQueue.empty? do sleep 0.05; end } rescue Timeout::Error x_logw "Publishes did not complete"; else x_logd "Publish clean finished" end end @publisherThread.run(); @publisherThread.join(); @listenerThread.kill(); @mqtt.disconnect() if @connected ensure_clean_exit(); x_logi("Fully disconnected!"); end |
#lockAndListen ⇒ Object
Pause the main thread and wait for messages. This is mainly useful when the code has set everything up, but doesn’t just want to end. “INT” is trapped, ensuring a smooth exit on Ctrl-C
248 249 250 251 252 253 254 255 |
# File 'lib/mqtt/base_handler.rb', line 248 def lockAndListen() Signal.trap("INT") { exit 0 } puts "Main thread paused." Thread.stop(); end |
#register_subscription(subObject) ⇒ Object
Register a custom subscription, and send a subscription message to the server.
114 115 116 117 118 119 120 |
# File 'lib/mqtt/base_handler.rb', line 114 def register_subscription(subObject) raise ArgumentError, "Object is not a subscription!" unless subObject.is_a? MQTT::Subscriptions::Subscription return if @callbackList.include? subObject; @callbackList << subObject; queue_packet({type: :sub, topic: subObject.topic, qos: subObject.qos}); end |
#unregister_subscription(subObject) ⇒ Object
Unregister a subscription. Removes it from the callback list and unsubscribes from the topic if no other subscriptions for it are present.
103 104 105 106 107 108 109 |
# File 'lib/mqtt/base_handler.rb', line 103 def unregister_subscription(subObject) raise ArgumentError, "Object is not a subscription!" unless subObject.is_a? MQTT::Subscriptions::Subscription return unless @callbackList.include? subObject; queue_packet({type: :unsub, topic: subObject.topic}); @callbackList.delete(subObject); end |