Class: MQTT::SubHandler

Inherits:
Object
  • Object
show all
Defined in:
lib/mqtt/sub_handler.rb

Custom subscription handling collapse

Subscribing collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(mqttClient) ⇒ SubHandler

Initialize a new MQTT::SubHandler The handler immediately connects to the server, and begins receciving and sending.

Examples:

Starting the handler

mqtt = MQTT::SubHandler.new('iot.eclipse.org');
mqtt = MQTT::SubHandler.new(MQTT::Client.new("Your.Client.Opts"))

Parameters:

  • mqttClient (String, MQTT::Client)

    Either a URI to connect to, or a MQTT::Client The URI can be of the form “mqtts://Password@User:URL:port”. The MQTT client instance can be fully configured, as specified by the MQTT Gem. It must not already be connected!



300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
# File 'lib/mqtt/sub_handler.rb', line 300

def initialize(mqttClient)
  @callbackList = Array.new();
  if mqttClient.is_a? String then
    @mqtt = MQTT::Client.new(mqttClient);
  else
    @mqtt = mqttClient;
  end

  @conChangeMutex = Mutex.new();
  @connected     = false;

  @mqtt.client_id ||= MQTT::Client.generate_client_id("MQTT_Sub_", 8);

  @publishQueue    = Array.new();
  @subscribeQueue  = Array.new();
  @subscribedTopics  = Hash.new();

  @trackerHash = Hash.new();

  @listenerThread = Thread.new do
    if @mqtt.clean_session
      @mqttWasStartedClean = true;
      begin
        @mqtt.connect();
        @mqtt.disconnect();
      rescue MQTT::Exception
        sleep 1;
        retry
      rescue SocketError, SystemCallError
        sleep 5
        retry
      end
      @mqtt.clean_session=false;
    end

    mqtt_resub_thread
  end
  @listenerThread.abort_on_exception = true;

  at_exit {
    flush_pubqueue
    @listenerThread.kill();

    if(@mqttWasStartedClean) then
      print "Logging out of mqtt server... "
      begin
      Timeout::timeout(10) {
        begin
          @mqtt.clean_session = true;
          @mqtt.disconnect();
          @mqtt.connect();
        rescue MQTT::Exception, SocketError, SystemCallError
          sleep 1
          retry;
        end
      }
      rescue  Timeout::Error
        puts "Timed out, aborting!";
      else
        puts "Done."
      end
    end
  }

  begin
  Timeout::timeout(10) {
    until(@connected) do sleep 0.1; end
  }
  rescue Timeout::Error
  end
end

Class Method Details

.getTopicMatch(receivedTopicString, topicPattern) ⇒ nil, Array<String>

Note:

This function is mainly used for background processing.

Match a topic string to a topic pattern

Parameters:

  • receivedTopicString (String)

    The string (as returned by MQTT.get) to compare

  • topicPattern (Array<String>)

    The Topic-Array (as returned by .getTopicSplit) to compare against

Returns:

  • (nil, Array<String>)

    Nil if no match was found. An Array of matched wildcard topic branches (can be empty) when successfully matched



35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
# File 'lib/mqtt/sub_handler.rb', line 35

def self.getTopicMatch(receivedTopicString, topicPattern)
  receivedTopicList = getTopicSplit receivedTopicString;

  outputTopicList = Array.new();

  return nil unless receivedTopicList.length >= topicPattern.length;

  topicPattern.each_index do |i|
    if(topicPattern[i] == "+") then
      outputTopicList << receivedTopicList[i];

    elsif(topicPattern[i] == "#") then
      outputTopicList.concat receivedTopicList[i..-1];
      return outputTopicList;

    elsif topicPattern[i] != receivedTopicList[i];
      return nil;

    end
  end

  return outputTopicList if topicPattern.length == receivedTopicList.length;
  return nil;
end

.getTopicSplit(topicName) ⇒ Array<String>

Note:

This function is mainly used for background processing.

Split a Topic into a Topic-Array

Parameters:

  • topicName (String)

    The string topic which to split

Returns:

  • (Array<String>)

    A list of individual topic-branches



22
23
24
# File 'lib/mqtt/sub_handler.rb', line 22

def self.getTopicSplit(topicName)
  return topicName.scan(/[^\/]+/);
end

Instance Method Details

#lockAndListenObject

Pause the main thread and wait for messages. This is mainly useful when the code has set everything up, but doesn’t just want to end. “INT” is trapped, ensuring a smooth exit on Ctrl-C



263
264
265
266
267
268
269
270
# File 'lib/mqtt/sub_handler.rb', line 263

def lockAndListen()
  Signal.trap("INT") {
    exit 0
  }

  puts "Main thread paused."
  Thread.stop();
end

#publish_to(topic, data, qos: 1, retain: false) ⇒ Object Also known as: publishTo

Publish a message to topic.

Parameters:

  • topic (String)

    The topic to push to.

  • data (String)

    The data to be transmitted.

  • qos (nil, Numeric) (defaults to: 1)

    QoS for the publish. Currently not fully supported by the mqtt gem.

  • retain (nil, Boolean) (defaults to: false)

    retain-flag for the publish.

Raises:

  • (ArgumentError)


206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
# File 'lib/mqtt/sub_handler.rb', line 206

def publish_to(topic, data, qos: 1, retain: false)
  raise ArgumentError, "Wrong symbol in topic: #{topic}" if topic =~ /[#\+]/

  begin
    @conChangeMutex.lock
    if not @connected then
      @publishQueue << {topic: topic, data: data, qos: qos, retain: retain} unless qos == 0
      @conChangeMutex.unlock
    else
      @conChangeMutex.unlock
      @mqtt.publish(topic, data, retain);
    end
  rescue MQTT::Exception, SocketError, SystemCallError
    sleep 0.05;
    retry
  end
end

#register_subscription(subObject) ⇒ Object

Register a custom subscription, and send a subscription message to the server.

Parameters:

Returns:

  • void

Raises:

  • (ArgumentError)


110
111
112
113
114
115
116
# File 'lib/mqtt/sub_handler.rb', line 110

def register_subscription(subObject)
  raise ArgumentError, "Object is not a subscription!" unless subObject.is_a? MQTT::Subscriptions::Subscription
  return if @callbackList.include? subObject;

  @callbackList << subObject;
  raw_subscribe_to(subObject.topic, qos: subObject.qos);
end

#subscribe_to(topic, qos: 1) {|data, topicList| ... } ⇒ MQTT::Subscriptions::CallbackSubscription Also known as: subscribeTo

Attach a callback to a MQTT Topic or wildcard. The callback will be saved, and asynchronously executed whenever a message from a matching topic (including wildcards) is received.

Parameters:

  • topic (String)

    The MQTT-Topic to subscribe to. Can be a Wildcard.

  • qos (nil, Integer) (defaults to: 1)

    The QoS for the subscription. Currently not used!

Yield Parameters:

  • data (String)

    The raw MQTT data received from the MQTT server

  • topicList (Array<String>)

    An array of topic-branches corresponding to wildcard matches. Can be empty if no wildcard was used!

Yield Returns:

  • (void)

Returns:



191
192
193
194
195
196
# File 'lib/mqtt/sub_handler.rb', line 191

def subscribe_to(topic, qos: 1, &callback)
  subObject = MQTT::Subscriptions::CallbackSubscription.new(topic, qos, callback);
  register_subscription(subObject);

  return subObject;
end

#track(topic, qos: 1) {|data| ... } ⇒ MQTT::Subscriptions::ValueTrackerSubscription Also known as: on_change

Track data changes for a topic in the background. With no callback given, the returned object can be used to get the last received raw data string. With a callback given, the callback will be called whenever a change in data is detected.

Parameters:

  • topic (String)

    The MQTT-Topic to track for data. Can be a Wildcard.

  • qos (nil, Integer) (defaults to: 1)

    The QoS to use for the subscription

Yield Parameters:

  • data (String)

    The new (changed) data received from MQTT.

Yield Returns:

  • (void)

Returns:



166
167
168
169
170
171
172
173
174
175
176
177
# File 'lib/mqtt/sub_handler.rb', line 166

def track(topic, qos: 1, &callback)
  unless(@trackerHash.has_key? topic)
    subObject = MQTT::Subscriptions::ValueTrackerSubscription.new(topic, qos);
    register_subscription(subObject);

    @trackerHash[topic] = subObject;
  end

  @trackerHash[topic].attach(callback) if(callback)

  return @trackerHash[topic];
end

#unregister_subscription(subObject) ⇒ Object

Unregister a subscription. Removes it from the callback list and unsubscribes from the topic if no other subscriptions for it are present.

Parameters:

Returns:

  • void

Raises:

  • (ArgumentError)


100
101
102
103
104
105
# File 'lib/mqtt/sub_handler.rb', line 100

def unregister_subscription(subObject)
  raise ArgumentError, "Object is not a subscription!" unless subObject.is_a? MQTT::Subscriptions::Subscription
  return unless @callbackList.include? subObject;

  @callbackList.delete(subObject);
end

#wait_for(topic, qos: 1, timeout: nil) {|data, topicList| ... } ⇒ Boolean

Synchronously wait for data. It waits for a message on ‘topic`, optionally letting a block check the data for validity, and optionally aborting after a timeout

Parameters:

  • topic (String)

    The MQTT-Topic to wait for

  • timeout (nil, Integer) (defaults to: nil)

    The optional timeout after which to abort

  • qos (nil, Integer) (defaults to: 1)

    The QoS for this subscription

Yield Parameters:

  • data (String)

    The data received via MQTT

  • topicList (Array<String>)

    The wildcard topic branches matched.

Yield Returns:

  • (Boolean)

    Whether or not the data was sufficient, and capture should be stopped.

Returns:

  • (Boolean)

    True if the block returned true, False if the code timed-out



130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
# File 'lib/mqtt/sub_handler.rb', line 130

def wait_for(topic, qos: 1, timeout: nil)
  unless block_given? then
    raise ArgumentError, "A block for data-processing needs to be passed!"
  end

  subObject = MQTT::Subscriptions::WaitpointSubscription.new(topic, qos);
  register_subscription(subObject);

  begin
  Timeout::timeout(timeout) do
    loop do
      return_data = subObject.waitpoint.wait()[1];
      if yield(return_data[0], return_data[1]) then
        return true;
      end
    end
  end
  rescue Timeout::Error
    return false;
  ensure
    unregister_subscription(subObject);
  end
end