Class: Fluent::MultiprocessInput
- Inherits:
-
Input
- Object
- Input
- Fluent::MultiprocessInput
- Defined in:
- lib/fluent/plugin/in_multiprocess.rb
Defined Under Namespace
Classes: ProcessElement
Instance Method Summary collapse
- #configure(conf) ⇒ Object
- #create_pid_file(pe) ⇒ Object
- #delete_pid_file(pe) ⇒ Object
- #shutdown ⇒ Object
- #start ⇒ Object
Instance Method Details
#configure(conf) ⇒ Object
44 45 46 47 48 49 50 51 52 53 54 |
# File 'lib/fluent/plugin/in_multiprocess.rb', line 44 def configure(conf) super @processes = conf.elements.select {|e| e.name == 'process' }.map {|e| pe = ProcessElement.new pe.configure(e) pe } end |
#create_pid_file(pe) ⇒ Object
100 101 102 103 104 |
# File 'lib/fluent/plugin/in_multiprocess.rb', line 100 def create_pid_file(pe) File.open(pe.pid_file, "w") { |f| f.write pe.process_monitor.pid } end |
#delete_pid_file(pe) ⇒ Object
106 107 108 |
# File 'lib/fluent/plugin/in_multiprocess.rb', line 106 def delete_pid_file(pe) File.unlink(pe.pid_file) if File.exist?(pe.pid_file) end |
#shutdown ⇒ Object
84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 |
# File 'lib/fluent/plugin/in_multiprocess.rb', line 84 def shutdown super @processes.each {|pe| sleep pe.sleep_before_shutdown if pe.sleep_before_shutdown > 0 $log.info "shutting down child fluentd #{pe.cmdline}" pe.process_monitor.start_graceful_stop! } @processes.each {|pe| pe.process_monitor.join } @processes.each { |pe| delete_pid_file(pe) if pe.pid_file } end |
#start ⇒ Object
56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 |
# File 'lib/fluent/plugin/in_multiprocess.rb', line 56 def start super @pm = ServerEngine::ProcessManager.new( :auto_tick => true, :auto_tick_interval => 1, :graceful_kill_interval => @graceful_kill_interval, :graceful_kill_interval_increment => @graceful_kill_interval_increment, :graceful_kill_timeout => @graceful_kill_timeout, :graceful_kill_signal => 'TERM', :immediate_kill_timeout => 0, # disabled ) plugin_rb = $LOADED_FEATURES.find {|x| x =~ /fluent\/plugin\.rb\z/ } fluentd_rb = File.join(File.dirname(plugin_rb), 'command', 'fluentd.rb') @processes.reverse_each do |pe| cmd = "#{Shellwords.shellescape(RbConfig.ruby)} #{Shellwords.shellescape(fluentd_rb)} #{pe.cmdline}" sleep pe.sleep_before_start if pe.sleep_before_start > 0 $log.info "launching child fluentd #{pe.cmdline}" keep_file_descriptors = pe.keep_file_descriptors.nil? ? @keep_file_descriptors : pe.keep_file_descriptors = {:close_others => !keep_file_descriptors} pe.process_monitor = @pm.spawn(cmd, ) create_pid_file(pe) if pe.pid_file end end |