Class: Fluent::CephInput
- Inherits:
-
Input
- Object
- Input
- Fluent::CephInput
- Includes:
- Mixin::RewriteTagName
- Defined in:
- lib/fluent/plugin/in_ceph.rb
Defined Under Namespace
Classes: TimerWatcher
Instance Method Summary collapse
- #configure(conf) ⇒ Object
- #emit_message ⇒ Object
-
#initialize ⇒ CephInput
constructor
A new instance of CephInput.
- #restart ⇒ Object
- #run ⇒ Object
- #shutdown ⇒ Object
- #start ⇒ Object
Constructor Details
#initialize ⇒ CephInput
Returns a new instance of CephInput.
10 11 12 |
# File 'lib/fluent/plugin/in_ceph.rb', line 10 def initialize super end |
Instance Method Details
#configure(conf) ⇒ Object
22 23 24 25 26 27 28 29 30 31 32 33 |
# File 'lib/fluent/plugin/in_ceph.rb', line 22 def configure(conf) super @commands = Hash.new if @arguments.include? "," @arguments.split(",").each do |argument| @commands[argument.strip] = "#{@ceph_path} #{argument.strip} --format json" end else @commands[@arguments.strip] = "#{@ceph_path} #{@arguments.strip} --format json" end @hostname = `#{@hostname_command}`.chomp! end |
#emit_message ⇒ Object
35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 |
# File 'lib/fluent/plugin/in_ceph.rb', line 35 def @output = Hash.new @pids = Array.new @commands.each do |argument, command| io = IO.popen(command, "r") pid = io.pid json = io.read.strip @output[argument] = json && json.length >= 2 ? JSON.parse(json) : nil Process.detach(pid) Process.kill(:TERM, pid) @pids.push pid end @output['eventtime'] = DateTime.parse(Time.now.to_s).strftime("%d/%m/%Y %H:%M:%S") router.emit(@tag.dup, Engine.now, @output) end |
#restart ⇒ Object
77 78 79 80 81 82 83 84 85 |
# File 'lib/fluent/plugin/in_ceph.rb', line 77 def restart @pids.each { |pid| Process.detach(@pid) Process.kill(:TERM, @pid) } @tw.detach @tw = TimerWatcher.new(@delay, true, &method(:emit_message)) @tw.attach(@loop) end |
#run ⇒ Object
68 69 70 71 72 73 74 75 |
# File 'lib/fluent/plugin/in_ceph.rb', line 68 def run begin @loop.run rescue $log.error "unexpected error", :error=>$!.to_s $log.error_backtrace end end |
#shutdown ⇒ Object
58 59 60 61 62 63 64 65 66 |
# File 'lib/fluent/plugin/in_ceph.rb', line 58 def shutdown @pids.each { |pid| Process.detach(pid) Process.kill(:TERM, pid) } @tw.detach @loop.stop @thread.join end |
#start ⇒ Object
51 52 53 54 55 56 |
# File 'lib/fluent/plugin/in_ceph.rb', line 51 def start @loop = Coolio::Loop.new @tw = TimerWatcher.new(@granularity, true, &method(:emit_message)) @tw.attach(@loop) @thread = Thread.new(&method(:run)) end |