Class: Fluent::MqttInput

Inherits:
Input
  • Object
show all
Includes:
SetTagKeyMixin, SetTimeKeyMixin
Defined in:
lib/fluent/plugin/in_mqtt.rb

Instance Method Summary collapse

Instance Method Details

#configure(conf) ⇒ Object



17
18
19
20
21
22
# File 'lib/fluent/plugin/in_mqtt.rb', line 17

def configure(conf)
  super
  @bind ||= conf['bind']
  @topic ||= conf['topic']
  @port ||= conf['port']
end

#emit(topic, message, time = Fluent::Engine.now) ⇒ Object



38
39
40
# File 'lib/fluent/plugin/in_mqtt.rb', line 38

def emit topic, message , time = Fluent::Engine.now
      Fluent::Engine.emit(topic, time , message )
end

#json_parse(message) ⇒ Object



42
43
44
45
46
47
48
49
50
# File 'lib/fluent/plugin/in_mqtt.rb', line 42

def json_parse message
  begin
    y = Yajl::Parser.new
    y.parse(message)
  rescue
    $log.error "JSON parse error", :error => $!.to_s, :error_class => $!.class.to_s
    $log.warn_backtrace $!.backtrace         
  end
end

#shutdownObject



51
52
53
54
# File 'lib/fluent/plugin/in_mqtt.rb', line 51

def shutdown
  @thread.kill
  @connect.disconnect
end

#startObject



24
25
26
27
28
29
30
31
32
33
34
35
36
# File 'lib/fluent/plugin/in_mqtt.rb', line 24

def start
  $log.debug "start mqtt"
  @connect = MQTT::Client.connect({remote_host: @bind, remote_port: @port})
  @connect.subscribe(@topic)

  @thread = Thread.new do
    @connect.get do |topic,message|
      topic.gsub!("/","\.")
      $log.debug "#{topic}: #{message}"
      emit topic, json_parse(message)
    end
  end
end