Class: Fluent::MultiprocessInput

Inherits:
Input
  • Object
show all
Defined in:
lib/fluent/plugin/in_multiprocess.rb

Defined Under Namespace

Classes: ProcessElement

Instance Method Summary collapse

Instance Method Details

#configure(conf) ⇒ Object



44
45
46
47
48
49
50
51
52
53
54
# File 'lib/fluent/plugin/in_multiprocess.rb', line 44

def configure(conf)
  super

  @processes = conf.elements.select {|e|
    e.name == 'process'
  }.map {|e|
    pe = ProcessElement.new
    pe.configure(e)
    pe
  }
end

#create_pid_file(pe) ⇒ Object



100
101
102
103
104
# File 'lib/fluent/plugin/in_multiprocess.rb', line 100

def create_pid_file(pe)
  File.open(pe.pid_file, "w") { |f|
    f.write pe.process_monitor.pid
  }
end

#delete_pid_file(pe) ⇒ Object



106
107
108
# File 'lib/fluent/plugin/in_multiprocess.rb', line 106

def delete_pid_file(pe)
  File.unlink(pe.pid_file) if File.exist?(pe.pid_file)
end

#shutdownObject



84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
# File 'lib/fluent/plugin/in_multiprocess.rb', line 84

def shutdown
  super

  @processes.each {|pe|
    sleep pe.sleep_before_shutdown if pe.sleep_before_shutdown > 0
    $log.info "shutting down child fluentd #{pe.cmdline}"
    pe.process_monitor.start_graceful_stop!
  }
  @processes.each {|pe|
    pe.process_monitor.join
  }
  @processes.each { |pe|
    delete_pid_file(pe) if pe.pid_file
  }
end

#startObject



56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
# File 'lib/fluent/plugin/in_multiprocess.rb', line 56

def start
  super

  @pm = ServerEngine::ProcessManager.new(
    :auto_tick => true,
    :auto_tick_interval => 1,
    :graceful_kill_interval => @graceful_kill_interval,
    :graceful_kill_interval_increment => @graceful_kill_interval_increment,
    :graceful_kill_timeout => @graceful_kill_timeout,
    :graceful_kill_signal => 'TERM',
    :immediate_kill_timeout => 0,  # disabled
  )

  plugin_rb = $LOADED_FEATURES.find {|x| x =~ /fluent\/plugin\.rb\z/ }
  fluentd_rb = File.join(File.dirname(plugin_rb), 'command', 'fluentd.rb')

  @processes.reverse_each do |pe|
    cmd = "#{Shellwords.shellescape(RbConfig.ruby)} #{Shellwords.shellescape(fluentd_rb)} #{pe.cmdline}"
    sleep pe.sleep_before_start if pe.sleep_before_start > 0
    $log.info "launching child fluentd #{pe.cmdline}"
    keep_file_descriptors = pe.keep_file_descriptors.nil? ? @keep_file_descriptors : pe.keep_file_descriptors
    options = {:close_others => !keep_file_descriptors}
    pe.process_monitor = @pm.spawn(cmd, options)

    create_pid_file(pe) if pe.pid_file
  end
end