Class: MQTT::SubHandler

Inherits:
Object
  • Object
show all
Defined in:
lib/mqtt/sub_handler.rb

Direct Known Subclasses

Testing::SubHandler

Instance Attribute Summary collapse

Custom subscription handling collapse

Subscribing collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(mqttClient, jsonify: true) ⇒ SubHandler

Initialize a new MQTT::SubHandler The handler immediately connects to the server, and begins receciving and sending.

Examples:

Starting the handler

mqtt = MQTT::SubHandler.new('iot.eclipse.org');
mqtt = MQTT::SubHandler.new(MQTT::Client.new("Your.Client.Opts"))

Parameters:

  • mqttClient (String, MQTT::Client)

    Either a URI to connect to, or a MQTT::Client The URI can be of the form “mqtts://Password@User:URL:port”. The MQTT client instance can be fully configured, as specified by the MQTT Gem. It must not already be connected!

  • jsonify (Boolean) (defaults to: true)

    Should Hashes and Arrays input into publish_to be converted to JSON? This can be useful to have one less .to_json call. Default is true.



351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
# File 'lib/mqtt/sub_handler.rb', line 351

def initialize(mqttClient, jsonify: true)
	@callbackList = Array.new();
	if mqttClient.is_a? String
		@mqtt = MQTT::Client.new(mqttClient);
	else
		@mqtt = mqttClient;
	end

	@jsonifyHashes = jsonify;

	@conChangeMutex = Mutex.new();
	@connected 		= false;

	@mqtt.client_id ||= MQTT::Client.generate_client_id("MQTT_Sub_", 8);

	@publishQueue 		= Array.new();
	@subscribeQueue 	= Array.new();
	@subscribedTopics 	= Hash.new();

	@trackerHash = Hash.new();

	@listenerThread = Thread.new do
		ensure_clean_start();
		mqtt_resub_thread();
	end
	@listenerThread.abort_on_exception = true;

	at_exit {
		flush_pubqueue();
		@listenerThread.kill();
		ensure_clean_exit();
	}

	begin
	Timeout.timeout(10) {
		until(@connected) do sleep 0.1; end
	}
	rescue Timeout::Error
	end
end

Instance Attribute Details

#jsonifyHashesObject

Whether or not hashes and arrays should be converted to JSON when sending



20
21
22
# File 'lib/mqtt/sub_handler.rb', line 20

def jsonifyHashes
  @jsonifyHashes
end

Class Method Details

.get_topic_split(topicName) ⇒ Array<String>

Note:

This function is mainly used for background processing.

Split a Topic into a Topic-Array

Parameters:

  • topicName (String)

    The string topic which to split

Returns:

  • (Array<String>)

    A list of individual topic-branches



26
27
28
# File 'lib/mqtt/sub_handler.rb', line 26

def self.get_topic_split(topicName)
	return topicName.scan(/[^\/]+/);
end

.getTopicMatch(receivedTopicString, topicPattern) ⇒ nil, Array<String>

Note:

This function is mainly used for background processing.

Match a topic string to a topic pattern

Parameters:

  • receivedTopicString (String)

    The string (as returned by MQTT.get) to compare

  • topicPattern (Array<String>)

    The Topic-Array (as returned by .get_topic_split) to compare against

Returns:

  • (nil, Array<String>)

    Nil if no match was found. An Array of matched wildcard topic branches (can be empty) when successfully matched



39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
# File 'lib/mqtt/sub_handler.rb', line 39

def self.getTopicMatch(receivedTopicString, topicPattern)
	receivedTopicList = get_topic_split receivedTopicString;

	outputTopicList = Array.new();

	return nil unless receivedTopicList.length >= topicPattern.length;

	topicPattern.each_index do |i|
		if(topicPattern[i] == "+")
			outputTopicList << receivedTopicList[i];

		elsif(topicPattern[i] == "#")
			outputTopicList.concat receivedTopicList[i..-1];
			return outputTopicList;

		elsif topicPattern[i] != receivedTopicList[i];
			return nil;

		end
	end

	return outputTopicList if topicPattern.length == receivedTopicList.length;
	return nil;
end

Instance Method Details

#lockAndListenObject

Pause the main thread and wait for messages. This is mainly useful when the code has set everything up, but doesn’t just want to end. “INT” is trapped, ensuring a smooth exit on Ctrl-C



312
313
314
315
316
317
318
319
# File 'lib/mqtt/sub_handler.rb', line 312

def lockAndListen()
	Signal.trap("INT") {
		exit 0
	}

	puts "Main thread paused."
	Thread.stop();
end

#publish_to(topic, data, qos: 1, retain: false) ⇒ Object Also known as: publishTo

Publish a message to topic.

Parameters:

  • topic (String)

    The topic to push to.

  • data (String)

    The data to be transmitted.

  • qos (nil, Numeric) (defaults to: 1)

    QoS for the publish. Currently not fully supported by the mqtt gem.

  • retain (nil, Boolean) (defaults to: false)

    retain-flag for the publish.

Raises:

  • (ArgumentError)


210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
# File 'lib/mqtt/sub_handler.rb', line 210

def publish_to(topic, data, qos: 1, retain: false)
	raise ArgumentError, "Wrong symbol in topic: #{topic}" if topic =~ /[#\+]/

	if(@jsonifyHashes and (data.is_a? Array or data.is_a? Hash))
		data = data.to_json
	end

	begin
		@conChangeMutex.lock
		if not @connected
			@publishQueue << {topic: topic, data: data, qos: qos, retain: retain} unless qos == 0
			@conChangeMutex.unlock
		else
			@conChangeMutex.unlock
			@mqtt.publish(topic, data, retain);
		end
	rescue MQTT::Exception, SocketError, SystemCallError
		sleep 0.05;
		retry
	end
end

#register_subscription(subObject) ⇒ Object

Register a custom subscription, and send a subscription message to the server.

Parameters:

Returns:

  • void

Raises:

  • (ArgumentError)


114
115
116
117
118
119
120
# File 'lib/mqtt/sub_handler.rb', line 114

def register_subscription(subObject)
	raise ArgumentError, "Object is not a subscription!" unless subObject.is_a? MQTT::Subscriptions::Subscription
	return if @callbackList.include? subObject;

	@callbackList << subObject;
	raw_subscribe_to(subObject.topic, qos: subObject.qos);
end

#subscribe_to(topic, qos: 1) {|data, topicList| ... } ⇒ MQTT::Subscriptions::CallbackSubscription Also known as: subscribeTo

Attach a callback to a MQTT Topic or wildcard. The callback will be saved, and asynchronously executed whenever a message from a matching topic (including wildcards) is received.

Parameters:

  • topic (String)

    The MQTT-Topic to subscribe to. Can be a Wildcard.

  • qos (nil, Integer) (defaults to: 1)

    The QoS for the subscription. Currently not used!

Yield Parameters:

  • data (String)

    The raw MQTT data received from the MQTT server

  • topicList (Array<String>)

    An array of topic-branches corresponding to wildcard matches. Can be empty if no wildcard was used!

Yield Returns:

  • (void)

Returns:



195
196
197
198
199
200
# File 'lib/mqtt/sub_handler.rb', line 195

def subscribe_to(topic, qos: 1, &callback)
	subObject = MQTT::Subscriptions::CallbackSubscription.new(topic, qos, callback);
	register_subscription(subObject);

	return subObject;
end

#track(topic, qos: 1) {|data| ... } ⇒ MQTT::Subscriptions::ValueTrackerSubscription Also known as: on_change

Track data changes for a topic in the background. With no callback given, the returned object can be used to get the last received raw data string. With a callback given, the callback will be called whenever a change in data is detected.

Parameters:

  • topic (String)

    The MQTT-Topic to track for data. Can be a Wildcard.

  • qos (nil, Integer) (defaults to: 1)

    The QoS to use for the subscription

Yield Parameters:

  • data (String)

    The new (changed) data received from MQTT.

Yield Returns:

  • (void)

Returns:



170
171
172
173
174
175
176
177
178
179
180
181
# File 'lib/mqtt/sub_handler.rb', line 170

def track(topic, qos: 1, &callback)
	unless(@trackerHash.has_key? topic)
		subObject = MQTT::Subscriptions::ValueTrackerSubscription.new(topic, qos);
		register_subscription(subObject);

		@trackerHash[topic] = subObject;
	end

	@trackerHash[topic].attach(callback) if(callback)

	return @trackerHash[topic];
end

#unregister_subscription(subObject) ⇒ Object

Unregister a subscription. Removes it from the callback list and unsubscribes from the topic if no other subscriptions for it are present.

Parameters:

Returns:

  • void

Raises:

  • (ArgumentError)


104
105
106
107
108
109
# File 'lib/mqtt/sub_handler.rb', line 104

def unregister_subscription(subObject)
	raise ArgumentError, "Object is not a subscription!" unless subObject.is_a? MQTT::Subscriptions::Subscription
	return unless @callbackList.include? subObject;

	@callbackList.delete(subObject);
end

#wait_for(topic, qos: 1, timeout: nil) {|data, topicList| ... } ⇒ Boolean

Synchronously wait for data. It waits for a message on ‘topic`, optionally letting a block check the data for validity, and optionally aborting after a timeout

Parameters:

  • topic (String)

    The MQTT-Topic to wait for

  • timeout (nil, Integer) (defaults to: nil)

    The optional timeout after which to abort

  • qos (nil, Integer) (defaults to: 1)

    The QoS for this subscription

Yield Parameters:

  • data (String)

    The data received via MQTT

  • topicList (Array<String>)

    The wildcard topic branches matched.

Yield Returns:

  • (Boolean)

    Whether or not the data was sufficient, and capture should be stopped.

Returns:

  • (Boolean)

    True if the block returned true, False if the code timed-out



134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
# File 'lib/mqtt/sub_handler.rb', line 134

def wait_for(topic, qos: 1, timeout: nil)
	unless block_given?
		raise ArgumentError, "A block for data-processing needs to be passed!"
	end

	subObject = MQTT::Subscriptions::WaitpointSubscription.new(topic, qos);
	register_subscription(subObject);

	begin
	Timeout.timeout(timeout) do
		loop do
			return_data = subObject.waitpoint.wait()[1];
			if yield(return_data[0], return_data[1])
				return true;
			end
		end
	end
	rescue Timeout::Error
		return false;
	ensure
		unregister_subscription(subObject);
	end
end