Class: MqttConnector

Inherits:
Object
  • Object
show all
Defined in:
lib/cisco-deviot/mqtt-connector.rb

Constant Summary collapse

@@logger =
Logger.new(STDOUT)

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(gateway, mqtt_server) ⇒ MqttConnector

Returns a new instance of MqttConnector.



29
30
31
32
33
34
35
36
37
38
39
# File 'lib/cisco-deviot/mqtt-connector.rb', line 29

def initialize(gateway, mqtt_server)
  uri = URI.parse(mqtt_server)
  @gateway = gateway
  @host  = uri.host
  @port = uri.port ? uri.port : 1883
  ns = gateway.owner ? gateway.owner.gsub('@', '_') : '_'
  name = gateway.name.gsub('/', '_')
  @data = "/deviot/#{ns}/#{name}/data"
  @action = "/deviot/#{ns}/#{name}/action"
  @action_thread = nil
end

Instance Attribute Details

#actionObject (readonly)

Returns the value of attribute action.



27
28
29
# File 'lib/cisco-deviot/mqtt-connector.rb', line 27

def action
  @action
end

#dataObject (readonly)

Returns the value of attribute data.



26
27
28
# File 'lib/cisco-deviot/mqtt-connector.rb', line 26

def data
  @data
end

#hostObject (readonly)

Returns the value of attribute host.



24
25
26
# File 'lib/cisco-deviot/mqtt-connector.rb', line 24

def host
  @host
end

#portObject (readonly)

Returns the value of attribute port.



25
26
27
# File 'lib/cisco-deviot/mqtt-connector.rb', line 25

def port
  @port
end

Instance Method Details

#publish(data) ⇒ Object



79
80
81
82
83
84
85
86
87
# File 'lib/cisco-deviot/mqtt-connector.rb', line 79

def publish(data)
  if @client
    begin
      @client.publish(@data, data.to_json)
    rescue Exception => e
      @@logger.error("failed to publish #{data}: #{e}")
    end
  end
end

#startObject



41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
# File 'lib/cisco-deviot/mqtt-connector.rb', line 41

def start
  @action_thread = Thread.new {
    @seconds = 2
    while true
      begin
        MQTT::Client.connect({host: @host, port: @port}) { |client|
          @@logger.info("mqtt server #{self} connected")
          @seconds = 2
          @client = client
          client.get(@action) do |topic, message|
            begin
              @gateway.call_action(JSON.parse(message))
            rescue Exception => e
              @@logger.error("failed to call action #{message}: #{e}")
            end
          end
        }
      rescue Exception => e
        @client = nil
        @@logger.error("mqtt server #{self} disconnected, reconnect in #{@seconds} seconds...")
        sleep(@seconds)
        @seconds = [128, @seconds * 2].min
      end
    end
  }
end

#stopObject



68
69
70
71
72
73
74
75
76
77
# File 'lib/cisco-deviot/mqtt-connector.rb', line 68

def stop
  if @client
    @client.unsubscribe(@action)
    @action_thread.kill if @action_thread and @action_thread.alive?
    @action_thread = nil
    @client.disconnect
    @client = nil
    @@logger.info("mqtt server #{self} disconnected")
  end
end

#to_sObject



89
90
91
# File 'lib/cisco-deviot/mqtt-connector.rb', line 89

def to_s
  "#{@host}:#{@port}"
end