Class: Fluent::Plugin::MqttInput

Inherits:
Input
  • Object
show all
Defined in:
lib/fluent/plugin/in_mqtt.rb

Constant Summary collapse

DEFAULT_PARSER_TYPE =
'none'

Instance Method Summary collapse

Instance Method Details

#configure(conf) ⇒ Object



32
33
34
35
36
# File 'lib/fluent/plugin/in_mqtt.rb', line 32

def configure(conf)
  compat_parameters_convert(conf, :inject, :parser)
  super
  configure_parser(conf)
end

#configure_parser(conf) ⇒ Object



38
39
40
# File 'lib/fluent/plugin/in_mqtt.rb', line 38

def configure_parser(conf)
  @parser = parser_create(usage: 'in_mqtt_parser', type: @format, conf: conf)
end

#emit(topic, message, time = Fluent::Engine.now) ⇒ Object



80
81
82
83
84
85
86
87
88
89
# File 'lib/fluent/plugin/in_mqtt.rb', line 80

def emit topic, message, time = Fluent::Engine.now
  if message.class == Array
    message.each do |data|
      log.debug "#{topic}: #{data}"
      router.emit(topic , time , data)
    end
  else
    router.emit(topic , time , message)
  end
end

#parse(message) ⇒ Object

Return [time (if not available return now), message]



43
44
45
46
47
# File 'lib/fluent/plugin/in_mqtt.rb', line 43

def parse(message)
  @parser.parse(message) {|time, record|
    return (time || Fluent::Engine.now), record
  }
end

#shutdownObject



91
92
93
94
# File 'lib/fluent/plugin/in_mqtt.rb', line 91

def shutdown
  @connect.disconnect
  super
end

#startObject



49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
# File 'lib/fluent/plugin/in_mqtt.rb', line 49

def start
  super
  log.debug "start mqtt #{@bind}"
  opts = {host: @bind,
          port: @port}
  opts[:client_id] = @client_id if @client_id
  opts[:username] =  @username if @username
  opts[:password] = @password if @password
  opts[:ssl] = @ssl if @ssl
  opts[:ca_file] = @ca if @ca
  opts[:cert_file] = @cert if @cert
  opts[:key_file] = @key if @key
  @connect = MQTT::Client.connect(opts)
  @connect.subscribe(@topic)

  thread_create(:in_mqtt_worker) do
    @connect.get do |topic,message|
      topic.gsub!("/","\.")
      log.debug "#{topic}: #{message}"
      begin
        time, record = self.parse(message)
        record = inject_values_to_record(topic, time, record)
      rescue Exception => e
        log.error e
      end
      emit topic, record, time
    end
  end
end