Class: SimpleMQTTClient

Inherits:
Object
  • Object
show all
Includes:
MQTT
Defined in:
lib/simple_mqtt_client/simple_mqtt_client.rb

Overview

This class implements a simple MQTT client. It wraps the mqtt gem (github.com/njh/ruby-mqtt) and offers a friendlier interface.

Instance Method Summary collapse

Methods included from MQTT

#is_wildcard_channel?

Constructor Details

#initialize(host, client_id = nil) ⇒ SimpleMQTTClient

Creates a new MQTT client The method registers a thread whose job is to listen for new packets and execute all the callbacks associated to that channel.

@param [String] host the hostname of the MQTT broker we are connecting to
@param [String] client_id the **unique** client identifier


19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
# File 'lib/simple_mqtt_client/simple_mqtt_client.rb', line 19

def initialize( host, client_id=nil )
  # list of channels we are subscribed to. associates a set of callbacks to each channel
  @channels = Hash.new 
  @client = client_id.nil? ? MQTT::Client.connect(host: host) : MQTT::Client.connect(host: host, client_id: client_id)
  # thread exists as along as the class is not garbage collected
  @thread = Thread.new('mqtt') do 
    @client.get do |channel, message|
      # Execute all the appropriate callbacks:
      # the ones specific to this channel with a single parameter (message)
      # the ones associated to a wildcard channel, with two parameters (message and channel)
      cbs = get_callbacks(channel)
      begin
        cbs.each { |cb| cb.parameters.length==1 ? cb.call(message) : cb.call(message, channel) }
      rescue ArgumentError
        STDERR.puts "The callback you passed for #{channel} has the #{$!}"
      end
    end
  end
end

Instance Method Details

#disconnectObject

Disconnects this simple MQTT client instance from the broker



82
83
84
85
86
# File 'lib/simple_mqtt_client/simple_mqtt_client.rb', line 82

def disconnect
  @thread.exit
  @client.disconnect
  @channels.clear
end

#get_subscribed_channelsObject

Returns a hash of all the channels this client is currently subscribed to with relative callbacks

@return [Hash] all channels this client is currently subscribed to, and relative callbacks


90
91
92
# File 'lib/simple_mqtt_client/simple_mqtt_client.rb', line 90

def get_subscribed_channels
  @channels
end

#is_channel_wildcard?(channel) ⇒ Boolean

Returns true if a channel is a wildcard channel

@return [Boolean] true if the channel is a wildcard channel. See MQTT specification for wildcard channels

here

Parameters:

  • channel (String)

    the channel we are testing for wildcard

Returns:

  • (Boolean)


104
105
106
# File 'lib/simple_mqtt_client/simple_mqtt_client.rb', line 104

def is_channel_wildcard?( channel )
  channel.include?('#') || channel.include?('+')
end

#publish(channel, message) ⇒ Object

Publishes a message to a channel This method will prevent publishing to wildcard channels.

@param [String] channel the channel we are publishing to
@param [String] message the message we are publishing


75
76
77
78
79
# File 'lib/simple_mqtt_client/simple_mqtt_client.rb', line 75

def publish( channel, message )
  # Check we are not publishing to a wildcard channel
  STDERR.puts 'Can\'t publish to a wildcard channel!' if is_channel_wildcard?(channel)
  @client.publish(channel, message)
end

#subscribe(channel, callback) ⇒ Object

Subscribes to a channel and registers a callback

  • Single channel callbacks take only one parameter: the received message.

  • Wildcard callbacks take two parameters: the received message and the channel the message was sent to.

It is possible to register multiple callbacks per channel. All of them will be executed whenever a message on that channel is received. Note that overlaps between channel-specific callbacks and wildcard-filters are allowed.

@param [String] channel the channel or filter we are subscribing to
@param [Proc] callback the callback that gets executed when a message is received
whenever a messages is received


48
49
50
51
52
53
54
55
# File 'lib/simple_mqtt_client/simple_mqtt_client.rb', line 48

def subscribe( channel, callback )
  if @channels.include?(channel)
    @channels[channel] << callback
  else
    @channels[channel]=[callback]
    @client.subscribe channel
  end
end

#subscriptionsObject

Returns a hash of all the channels this client is currently subscribed to with relative callbacks

@return [Hash] all channels this client is currently subscribed to, and relative callbacks


96
97
98
# File 'lib/simple_mqtt_client/simple_mqtt_client.rb', line 96

def subscriptions
  @channels
end

#unsubscribe(channel, callback) ⇒ Object

Un-subscribes a specific callback from a channel. After the last callback is removed, we actually unsibscribe.

@param [String] channel the channel we are un-subscribing from
@param [Proc] callback the specific callback we want to remove


61
62
63
64
65
66
67
68
69
# File 'lib/simple_mqtt_client/simple_mqtt_client.rb', line 61

def unsubscribe( channel, callback )
  if @channels.include? channel
    @channels[channel].delete(callback)
  end
  if @channels[channel].empty?
    @client.unsubscribe channel
    @channels.delete(channel)
  end
end