Class: MQTT::SubHandler

Inherits:
Object
  • Object
show all
Defined in:
lib/mqtt/sub_handler.rb

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(mqttClient, autoListen: true) ⇒ SubHandler

Returns a new instance of SubHandler.



217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
# File 'lib/mqtt/sub_handler.rb', line 217

def initialize(mqttClient, autoListen: true)
	@callbackList = Array.new();
	if mqttClient.is_a? String then
		@mqtt = MQTT::Client.new(mqttClient);
	else
		@mqtt = mqttClient;
	end

	@conChangeMutex = Mutex.new();
	@connected 		= false;

	@mqtt.client_id ||= MQTT::Client.generate_client_id("MQTT_Sub_", 8);

	@publishQueue 		= Array.new();
	@subscribeQueue 	= Array.new();
	@subscribedTopics 	= Hash.new();

	@trackerHash = Hash.new();

	@listenerThread = Thread.new do
		if @mqtt.clean_session
			@mqttWasStartedClean = true;
			begin
				@mqtt.connect();
				@mqtt.disconnect();
			rescue MQTT::Exception
				sleep 1;
				retry
			rescue SocketError, SystemCallError
				sleep 5
				retry
			end
			@mqtt.clean_session=false;
		end

		mqtt_resub_thread
	end
	@listenerThread.abort_on_exception = true;

	at_exit {
		flush_pubqueue
		@listenerThread.kill();

		if(@mqttWasStartedClean) then
			print "Logging out of mqtt server... "
			begin
			Timeout::timeout(10) {
				begin
					@mqtt.clean_session = true;
					@mqtt.disconnect();
					@mqtt.connect();
				rescue MQTT::Exception, SocketError, SystemCallError
					sleep 1
					retry;
				end
			}
			rescue  Timeout::Error
				puts "Timed out, aborting!";
			else
				puts "Done."
			end
		end
	}

	begin
	Timeout::timeout(10) {
		until(@connected) do sleep 0.1; end
	}
	rescue Timeout::Error
	end
end

Class Method Details

.getTopicMatch(receivedTopicString, topicPattern) ⇒ Object



18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
# File 'lib/mqtt/sub_handler.rb', line 18

def self.getTopicMatch(receivedTopicString, topicPattern)
	receivedTopicList = getTopicSplit receivedTopicString;

	outputTopicList = Array.new();

	return nil unless receivedTopicList.length >= topicPattern.length;

	topicPattern.each_index do |i|
		if(topicPattern[i] == "+") then
			outputTopicList << receivedTopicList[i];

		elsif(topicPattern[i] == "#") then
			outputTopicList.concat receivedTopicList[i..-1];
			return outputTopicList;

		elsif topicPattern[i] != receivedTopicList[i];
			return nil;

		end
	end

	return outputTopicList if topicPattern.length == receivedTopicList.length;
	return nil;
end

.getTopicSplit(topicName) ⇒ Object



14
15
16
# File 'lib/mqtt/sub_handler.rb', line 14

def self.getTopicSplit(topicName)
	return topicName.scan(/[^\/]+/);
end

Instance Method Details

#flush_pubqueueObject



197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
# File 'lib/mqtt/sub_handler.rb', line 197

def flush_pubqueue()
	puts "\n";
	if @publishQueue.empty? then
		puts "MQTT buffer empty, continuing."
	else
		print "Finishing sending of MQTT messages ... "
		begin
			Timeout::timeout(10) {
				until @publishQueue.empty? do
					sleep 0.05;
				end
			}
		rescue Timeout::Error
			puts "Timed out, aborting."
		else
			puts "Done."
		end
	end
end

#lockAndListenObject



189
190
191
192
193
194
195
196
# File 'lib/mqtt/sub_handler.rb', line 189

def lockAndListen()
	Signal.trap("INT") {
		exit 0
	}

	puts "Main thread paused."
	Thread.stop();
end

#publish_to(topic, data, qos: 1, retain: false) ⇒ Object Also known as: publishTo

Raises:

  • (ArgumentError)


135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
# File 'lib/mqtt/sub_handler.rb', line 135

def publish_to(topic, data, qos: 1, retain: false)
	raise ArgumentError, "Wrong symbol in topic: #{topic}" if topic =~ /[#\+]/

	begin
		@conChangeMutex.lock
		if not @connected then
			@publishQueue << {topic: topic, data: data, qos: qos, retain: retain} unless qos == 0
			@conChangeMutex.unlock
		else
			@conChangeMutex.unlock
			@mqtt.publish(topic, data, retain);
		end
	rescue MQTT::Exception, SocketError, SystemCallError
		sleep 0.05;
		retry
	end
end

#register_subscription(subObject) ⇒ Object

Raises:

  • (ArgumentError)


80
81
82
83
84
85
86
# File 'lib/mqtt/sub_handler.rb', line 80

def register_subscription(subObject)
	raise ArgumentError, "Object is not a subscription!" unless subObject.is_a? MQTT::Subscription
	return if @callbackList.include? subObject;

	@callbackList << subObject;
	raw_subscribe_to(subObject.topic, qos: subObject.qos);
end

#subscribe_to(topic, qos: 1, &callback) ⇒ Object Also known as: subscribeTo



126
127
128
129
130
131
# File 'lib/mqtt/sub_handler.rb', line 126

def subscribe_to(topic, qos: 1, &callback)
	subObject = MQTT::CallbackSubscription.new(topic, qos, callback);
	register_subscription(subObject);

	return subObject;
end

#track(topic, qos: 1, &callback) ⇒ Object Also known as: on_change



113
114
115
116
117
118
119
120
121
122
123
124
# File 'lib/mqtt/sub_handler.rb', line 113

def track(topic, qos: 1, &callback)
	unless(@trackerHash.has_key? topic)
		subObject = MQTT::ValueTrackerSubscription.new(topic, qos);
		register_subscription(subObject);

		@trackerHash[topic] = subObject;
	end

	@trackerHash[topic].attach(callback) if(callback)

	return @trackerHash[topic];
end

#unregister_subscription(subObject) ⇒ Object

Raises:

  • (ArgumentError)


74
75
76
77
78
79
# File 'lib/mqtt/sub_handler.rb', line 74

def unregister_subscription(subObject)
	raise ArgumentError, "Object is not a subscription!" unless subObject.is_a? MQTT::Subscription
	return unless @callbackList.include? subObject;

	@callbackList.delete(subObject);
end

#wait_for(topic, qos: 1, timeout: nil) ⇒ Object



88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
# File 'lib/mqtt/sub_handler.rb', line 88

def wait_for(topic, qos: 1, timeout: nil)
	subObject = MQTT::WaitpointSubscription.new(topic, qos);
	register_subscription(subObject);

	if block_given? then
		begin
		Timeout::timeout(timeout) do
			loop do
				return_data = subObject.waitpoint.wait()[1];
				if yield(return_data[0], return_data[1]) then
					unregister_subscription(subObject);
					return true;
				end
			end
		end
		rescue Timeout::Error
			return false;
		end
	else
		return_data = subObject.waitpoint.wait(timeout);
	end

	unregister_subscription(subObject);
	return return_data;
end