Class: Fluent::MultiprocessInput

Inherits:
Input
  • Object
show all
Defined in:
lib/fluent/plugin/in_multiprocess.rb

Defined Under Namespace

Classes: ProcessElement

Instance Method Summary collapse

Instance Method Details

#configure(conf) ⇒ Object



40
41
42
43
44
45
46
47
48
# File 'lib/fluent/plugin/in_multiprocess.rb', line 40

def configure(conf)
  @processes = conf.elements.select {|e|
    e.name == 'process'
  }.map {|e|
    pe = ProcessElement.new
    pe.configure(e)
    pe
  }
end

#shutdownObject



72
73
74
75
76
77
78
79
80
81
# File 'lib/fluent/plugin/in_multiprocess.rb', line 72

def shutdown
  @processes.each {|pe|
    sleep pe.sleep_before_shutdown if pe.sleep_before_shutdown > 0
    $log.info "shutting down child fluentd #{pe.cmdline}"
    pe.process_monitor.start_graceful_stop!
  }
  @processes.each {|pe|
    pe.process_monitor.join
  }
end

#startObject



50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
# File 'lib/fluent/plugin/in_multiprocess.rb', line 50

def start
  @pm = ServerEngine::ProcessManager.new(
    :auto_tick => true,
    :auto_tick_interval => 1,
    :graceful_kill_interval => @graceful_kill_interval,
    :graceful_kill_interval_increment => @graceful_kill_interval_increment,
    :graceful_kill_timeout => @graceful_kill_timeout,
    :graceful_kill_signal => 'TERM',
    :immediate_kill_timeout => 0,  # disabled
  )

  plugin_rb = $LOADED_FEATURES.find {|x| x =~ /fluent\/plugin\.rb\z/ }
  fluentd_rb = File.join(File.dirname(plugin_rb), 'command', 'fluentd.rb')

  @processes.reverse_each do |pe|
    cmd = "#{Shellwords.shellescape(RbConfig.ruby)} #{Shellwords.shellescape(fluentd_rb)} #{pe.cmdline}"
    sleep pe.sleep_before_start if pe.sleep_before_start > 0
    $log.info "launching child fluentd #{pe.cmdline}"
    pe.process_monitor = @pm.spawn(cmd)
  end
end