Class: MQTT::BaseHandler

Inherits:
Object
  • Object
show all
Includes:
XasLogger::Mix
Defined in:
lib/mqtt/base_handler.rb

Direct Known Subclasses

SubHandler

Custom subscription handling collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(mqttClient, logger: nil) ⇒ BaseHandler

Initialize a new MQTT::SubHandler The handler immediately connects to the server, and begins receciving and sending.

Examples:

Starting the handler

mqtt = MQTT::SubHandler.new('mqtt.eclipse.org');
mqtt = MQTT::SubHandler.new(MQTT::Client.new("Your.Client.Opts"))

Parameters:

  • mqttClient (String, MQTT::Client)

    Either a URI to connect to, or a MQTT::Client The URI can be of the form “mqtts://Password@User:URL:port”. The MQTT client instance can be fully configured, as specified by the MQTT Gem. It must not already be connected!

  • jsonify (Boolean)

    Should Hashes and Arrays input into publish_to be converted to JSON? This can be useful to have one less .to_json call. Default is true.



298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
# File 'lib/mqtt/base_handler.rb', line 298

def initialize(mqttClient, logger: nil)
  @callbackList = Array.new();
  if mqttClient.is_a? String
    @mqtt = MQTT::Client.new(mqttClient);
    @mqtt.clean_session = false;
  else
    @mqtt = mqttClient;
  end

  init_x_log("MQTT #{@mqtt.host}", logger);
  self.log_level = Logger::INFO;

  @conChangeMutex = Mutex.new();
  @connected     = false;

  @mqtt.client_id ||= MQTT::Client.generate_client_id("MQTT_Sub_", 8);

  @packetQueue = Array.new();
  @packetQueueMutex = Mutex.new();

  @publisherThreadWaiting = false;

  @subscribedTopics  = Hash.new();

  @trackerHash = Hash.new();

  @listenerThread = Thread.new do
    ensure_clean_start();
    mqtt_resub_thread();
  end
  @listenerThread.abort_on_exception = true;

  begin
    Timeout.timeout(5) {
      until(@connected)
        sleep 0.1;
      end
    }
  rescue Timeout::Error
    x_loge("Broker did not connect!");
  end

  @publisherThread = Thread.new do
    mqtt_push_thread();
  end
  @publisherThread.abort_on_exception = true;

  at_exit {
    destroy!()
  }
end

Class Method Details

.get_topic_split(topicName) ⇒ Array<String>

Note:

This function is mainly used for background processing.

Split a Topic into a Topic-Array

Parameters:

  • topicName (String)

    The string topic which to split

Returns:

  • (Array<String>)

    A list of individual topic-branches



16
17
18
# File 'lib/mqtt/base_handler.rb', line 16

def self.get_topic_split(topicName)
  return topicName.scan(/[^\/]+/);
end

.getTopicMatch(receivedTopicString, topicPattern) ⇒ nil, Array<String>

Note:

This function is mainly used for background processing.

Match a topic string to a topic pattern

Parameters:

  • receivedTopicString (String)

    The string (as returned by MQTT.get) to compare

  • topicPattern (Array<String>)

    The Topic-Array (as returned by .get_topic_split) to compare against

Returns:

  • (nil, Array<String>)

    Nil if no match was found. An Array of matched wildcard topic branches (can be empty) when successfully matched



29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
# File 'lib/mqtt/base_handler.rb', line 29

def self.getTopicMatch(receivedTopicString, topicPattern)
  receivedTopicList = get_topic_split receivedTopicString;

  outputTopicList = Array.new();

  return nil unless receivedTopicList.length >= topicPattern.length;

  topicPattern.each_index do |i|
    if(topicPattern[i] == "+")
      outputTopicList << receivedTopicList[i];

    elsif(topicPattern[i] == "#")
      outputTopicList.concat receivedTopicList[i..-1];
      return outputTopicList;

    elsif topicPattern[i] != receivedTopicList[i];
      return nil;

    end
  end

  return outputTopicList if topicPattern.length == receivedTopicList.length;
  return nil;
end

Instance Method Details

#destroy!Object



257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
# File 'lib/mqtt/base_handler.rb', line 257

def destroy!()
  return if @destroying
  @destroying = true;

  unless @packetQueue.empty?
    x_logd "Finishing sending of MQTT messages ... "
    @publisherThread.run() if @publisherThreadWaiting
    begin
      Timeout.timeout(4) {
        until @packetQueue.empty? do
          sleep 0.05;
        end
      }
    rescue Timeout::Error
      x_logw "Publishes did not complete";
    else
      x_logd "Publish clean finished"
    end
  end

  @publisherThread.run();
  @publisherThread.join();
  @listenerThread.kill();

  @mqtt.disconnect() if @connected

  ensure_clean_exit();

  x_logi("Fully disconnected!");
end

#lockAndListenObject

Pause the main thread and wait for messages. This is mainly useful when the code has set everything up, but doesn’t just want to end. “INT” is trapped, ensuring a smooth exit on Ctrl-C



248
249
250
251
252
253
254
255
# File 'lib/mqtt/base_handler.rb', line 248

def lockAndListen()
  Signal.trap("INT") {
    exit 0
  }

  puts "Main thread paused."
  Thread.stop();
end

#register_subscription(subObject) ⇒ Object

Register a custom subscription, and send a subscription message to the server.

Parameters:

Returns:

  • void

Raises:

  • (ArgumentError)


114
115
116
117
118
119
120
# File 'lib/mqtt/base_handler.rb', line 114

def register_subscription(subObject)
  raise ArgumentError, "Object is not a subscription!" unless subObject.is_a? MQTT::Subscriptions::Subscription
  return if @callbackList.include? subObject;

  @callbackList << subObject;
  queue_packet({type: :sub, topic: subObject.topic, qos: subObject.qos});
end

#unregister_subscription(subObject) ⇒ Object

Unregister a subscription. Removes it from the callback list and unsubscribes from the topic if no other subscriptions for it are present.

Parameters:

Returns:

  • void

Raises:

  • (ArgumentError)


103
104
105
106
107
108
109
# File 'lib/mqtt/base_handler.rb', line 103

def unregister_subscription(subObject)
  raise ArgumentError, "Object is not a subscription!" unless subObject.is_a? MQTT::Subscriptions::Subscription
  return unless @callbackList.include? subObject;

  queue_packet({type: :unsub, topic: subObject.topic});
  @callbackList.delete(subObject);
end