Module: Fluent::PluginHelper::ChildProcess

Includes:
Thread, Timer
Defined in:
lib/fluent/plugin/in_exec/iterative_process.rb

Instance Method Summary collapse

Instance Method Details

#timer_process_execute(title, command, start_timestamp, interval, time_callback, arguments: nil, subprocess_name: nil, immediate: false, parallel: false, mode: [:read, :write], stderr: :discard, env: {}, unsetenv: false, chdir: nil, internal_encoding: 'utf-8', external_encoding: 'ascii-8bit', scrub: true, replace_string: nil, wait_timeout: nil, on_exit_callback: nil, delay_seconds: 0, &block) ⇒ Object

Raises:

  • (ArgumentError)


10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
# File 'lib/fluent/plugin/in_exec/iterative_process.rb', line 10

def timer_process_execute(
  title, command, start_timestamp, interval, time_callback,
  arguments: nil, subprocess_name: nil, immediate: false, parallel: false,
  mode: [:read, :write], stderr: :discard, env: {}, unsetenv: false, chdir: nil,
  internal_encoding: 'utf-8', external_encoding: 'ascii-8bit', scrub: true, replace_string: nil,
  wait_timeout: nil, on_exit_callback: nil, delay_seconds: 0,
  &block
)
  raise ArgumentError, "BUG: title must be a symbol" unless title.is_a? Symbol
  raise ArgumentError, "BUG: arguments required if subprocess name is replaced" if subprocess_name && !arguments

  mode ||= []
  mode = [] unless block
  raise ArgumentError, "BUG: invalid mode specification" unless mode.all?{|m| MODE_PARAMS.include?(m) }
  raise ArgumentError, "BUG: read_with_stderr is exclusive with :read and :stderr" if mode.include?(:read_with_stderr) && (mode.include?(:read) || mode.include?(:stderr))
  raise ArgumentError, "BUG: invalid stderr handling specification" unless STDERR_OPTIONS.include?(stderr)

  raise ArgumentError, "BUG: number of block arguments are different from size of mode" if block && block.arity != mode.size

  running = false
  callback = ->(*args) {
    running = true
    begin
      block && block.call(*args)
    ensure
      running = false
    end
  }

  execute_child_process = ->(cmd) {
    child_process_execute_once(
      title, cmd, arguments,
      subprocess_name, mode, stderr, env, unsetenv, chdir,
      internal_encoding, external_encoding, scrub, replace_string,
      wait_timeout, on_exit_callback,
      &callback
    )
  }

  now = Fluent::EventTime.now.to_int - delay_seconds
  if immediate && start_timestamp.to_i < now
    execute_child_process.call(command % [start_timestamp, now])
    start_timestamp = now
    time_callback.call(start_timestamp)
  end

  timer_execute(:child_process_execute, interval, repeat: true) do
    if !parallel && running
      log.warn "previous child process is still running. skipped.", title: title, command: command, arguments: arguments, interval: interval
    else
      end_timestamp = Fluent::EventTime.now.to_int - delay_seconds
      execute_child_process.call(command % [start_timestamp, end_timestamp])
      start_timestamp = end_timestamp
      time_callback.call(start_timestamp)
    end
  end

end