Class: Fluent::MultiprocessInput
- Inherits:
-
Input
- Object
- Input
- Fluent::MultiprocessInput
- Defined in:
- lib/fluent/plugin/in_multiprocess.rb
Defined Under Namespace
Classes: ProcessElement
Instance Method Summary collapse
Instance Method Details
#configure(conf) ⇒ Object
40 41 42 43 44 45 46 47 48 |
# File 'lib/fluent/plugin/in_multiprocess.rb', line 40 def configure(conf) @processes = conf.elements.select {|e| e.name == 'process' }.map {|e| pe = ProcessElement.new pe.configure(e) pe } end |
#shutdown ⇒ Object
72 73 74 75 76 77 78 79 80 81 |
# File 'lib/fluent/plugin/in_multiprocess.rb', line 72 def shutdown @processes.each {|pe| sleep pe.sleep_before_shutdown if pe.sleep_before_shutdown > 0 $log.info "shutting down child fluentd #{pe.cmdline}" pe.process_monitor.start_graceful_stop! } @processes.each {|pe| pe.process_monitor.join } end |
#start ⇒ Object
50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 |
# File 'lib/fluent/plugin/in_multiprocess.rb', line 50 def start @pm = ServerEngine::ProcessManager.new( :auto_tick => true, :auto_tick_interval => 1, :graceful_kill_interval => @graceful_kill_interval, :graceful_kill_interval_increment => @graceful_kill_interval_increment, :graceful_kill_timeout => @graceful_kill_timeout, :graceful_kill_signal => 'TERM', :immediate_kill_timeout => 0, # disabled ) plugin_rb = $LOADED_FEATURES.find {|x| x =~ /fluent\/plugin\.rb\z/ } fluentd_rb = File.join(File.dirname(plugin_rb), 'command', 'fluentd.rb') @processes.reverse_each do |pe| cmd = "#{Shellwords.shellescape(RbConfig.ruby)} #{Shellwords.shellescape(fluentd_rb)} #{pe.cmdline}" sleep pe.sleep_before_start if pe.sleep_before_start > 0 $log.info "launching child fluentd #{pe.cmdline}" pe.process_monitor = @pm.spawn(cmd) end end |