Class: Fluent::CephInput

Inherits:
Input
  • Object
show all
Includes:
Mixin::RewriteTagName
Defined in:
lib/fluent/plugin/in_ceph.rb

Defined Under Namespace

Classes: TimerWatcher

Instance Method Summary collapse

Constructor Details

#initializeCephInput

Returns a new instance of CephInput.



10
11
12
# File 'lib/fluent/plugin/in_ceph.rb', line 10

def initialize
  super
end

Instance Method Details

#configure(conf) ⇒ Object



22
23
24
25
26
27
28
29
30
31
32
33
# File 'lib/fluent/plugin/in_ceph.rb', line 22

def configure(conf)
  super
  @commands = Hash.new
  if @arguments.include? ","
      @arguments.split(",").each do |argument|
          @commands[argument.strip] = "#{@ceph_path} #{argument.strip} --format json"
      end
  else
      @commands[@arguments.strip] = "#{@ceph_path} #{@arguments.strip} --format json"
  end
  @hostname = `#{@hostname_command}`.chomp!
end

#emit_messageObject



35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
# File 'lib/fluent/plugin/in_ceph.rb', line 35

def emit_message
  @output = Hash.new
  @pids = Array.new
  @commands.each do |argument, command|
      io = IO.popen(command, "r")
      pid = io.pid
      json = io.read.strip
      @output[argument] = json && json.length >= 2 ? JSON.parse(json) : nil
      Process.detach(pid)
      Process.kill(:TERM, pid)
      @pids.push pid
  end
  @output['eventtime'] = DateTime.parse(Time.now.to_s).strftime("%d/%m/%Y %H:%M:%S")
  router.emit(@tag.dup, Engine.now, @output)
end

#restartObject



77
78
79
80
81
82
83
84
85
# File 'lib/fluent/plugin/in_ceph.rb', line 77

def restart
  @pids.each { |pid|
      Process.detach(@pid)
      Process.kill(:TERM, @pid)
  }
  @tw.detach
  @tw = TimerWatcher.new(@delay, true,  &method(:emit_message))
  @tw.attach(@loop)
end

#runObject



68
69
70
71
72
73
74
75
# File 'lib/fluent/plugin/in_ceph.rb', line 68

def run
  begin
    @loop.run
  rescue
    $log.error "unexpected error", :error=>$!.to_s
    $log.error_backtrace
  end
end

#shutdownObject



58
59
60
61
62
63
64
65
66
# File 'lib/fluent/plugin/in_ceph.rb', line 58

def shutdown
  @pids.each { |pid|
      Process.detach(pid) 
      Process.kill(:TERM, pid)
  }
  @tw.detach
  @loop.stop
  @thread.join
end

#startObject



51
52
53
54
55
56
# File 'lib/fluent/plugin/in_ceph.rb', line 51

def start
  @loop = Coolio::Loop.new
  @tw = TimerWatcher.new(@granularity, true,  &method(:emit_message))
  @tw.attach(@loop)
  @thread = Thread.new(&method(:run))
end