Module: Fluent::PluginHelper::ChildProcess

Includes:
Thread, Timer
Defined in:
lib/fluent/plugin_helper/child_process.rb

Defined Under Namespace

Classes: ProcessInfo

Constant Summary collapse

CHILD_PROCESS_LOOP_CHECK_INTERVAL =

sec

0.2
CHILD_PROCESS_DEFAULT_EXIT_TIMEOUT =

sec

10
CHILD_PROCESS_DEFAULT_KILL_TIMEOUT =

sec

60
MODE_PARAMS =
[:read, :write, :stderr, :read_with_stderr]
STDERR_OPTIONS =
[:discard, :connect]

Constants included from EventLoop

EventLoop::EVENT_LOOP_RUN_DEFAULT_TIMEOUT

Constants included from Thread

Thread::THREAD_DEFAULT_WAIT_SECONDS

Instance Attribute Summary collapse

Attributes included from Timer

#_timers

Attributes included from EventLoop

#_event_loop

Attributes included from Thread

#_threads

Instance Method Summary collapse

Methods included from Timer

#terminate, #timer_execute, #timer_running?

Methods included from EventLoop

#event_loop_attach, #event_loop_running?, #event_loop_wait_until_start, #event_loop_wait_until_stop, #terminate

Methods included from Thread

#terminate, #thread_create, #thread_current_running?, #thread_exist?, #thread_running?, #thread_started?, #thread_wait_until_start, #thread_wait_until_stop

Instance Attribute Details

#_child_process_processesObject (readonly)

stop : mark callback thread as stopped shutdown : close write IO to child processes (STDIN of child processes), send TERM (KILL for Windows) to all child processes close : send KILL to all child processes terminate: [-]



41
42
43
# File 'lib/fluent/plugin_helper/child_process.rb', line 41

def _child_process_processes
  @_child_process_processes
end

Instance Method Details

#child_process_execute(title, command, arguments: nil, subprocess_name: nil, interval: nil, immediate: false, parallel: false, mode: [:read, :write], stderr: :discard, env: {}, unsetenv: false, chdir: nil, internal_encoding: 'utf-8', external_encoding: 'ascii-8bit', scrub: true, replace_string: nil, &block) ⇒ Object

Raises:

  • (ArgumentError)


56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
# File 'lib/fluent/plugin_helper/child_process.rb', line 56

def child_process_execute(
    title, command,
    arguments: nil, subprocess_name: nil, interval: nil, immediate: false, parallel: false,
    mode: [:read, :write], stderr: :discard, env: {}, unsetenv: false, chdir: nil,
    internal_encoding: 'utf-8', external_encoding: 'ascii-8bit', scrub: true, replace_string: nil,
    &block
)
  raise ArgumentError, "BUG: title must be a symbol" unless title.is_a? Symbol
  raise ArgumentError, "BUG: arguments required if subprocess name is replaced" if subprocess_name && !arguments

  raise ArgumentError, "BUG: invalid mode specification" unless mode.all?{|m| MODE_PARAMS.include?(m) }
  raise ArgumentError, "BUG: read_with_stderr is exclusive with :read and :stderr" if mode.include?(:read_with_stderr) && (mode.include?(:read) || mode.include?(:stderr))
  raise ArgumentError, "BUG: invalid stderr handling specification" unless STDERR_OPTIONS.include?(stderr)

  raise ArgumentError, "BUG: block not specified which receive i/o object" unless block_given?
  raise ArgumentError, "BUG: number of block arguments are different from size of mode" unless block.arity == mode.size

  running = false
  callback = ->(*args) {
    running = true
    begin
      block.call(*args)
    ensure
      running = false
    end
  }

  if immediate || !interval
    child_process_execute_once(title, command, arguments, subprocess_name, mode, stderr, env, unsetenv, chdir, internal_encoding, external_encoding, scrub, replace_string, &callback)
  end

  if interval
    timer_execute(:child_process_execute, interval, repeat: true) do
      if !parallel && running
        log.warn "previous child process is still running. skipped.", title: title, command: command, arguments: arguments, interval: interval, parallel: parallel
      else
        child_process_execute_once(title, command, arguments, subprocess_name, mode, stderr, env, unsetenv, chdir, internal_encoding, external_encoding, scrub, replace_string, &callback)
      end
    end
  end
end

#child_process_execute_once(title, command, arguments, subprocess_name, mode, stderr, env, unsetenv, chdir, internal_encoding, external_encoding, scrub, replace_string, &block) ⇒ Object



195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
# File 'lib/fluent/plugin_helper/child_process.rb', line 195

def child_process_execute_once(
    title, command, arguments, subprocess_name, mode, stderr, env, unsetenv, chdir,
    internal_encoding, external_encoding, scrub, replace_string, &block
)
  spawn_args = if arguments || subprocess_name
                 [ env, (subprocess_name ? [command, subprocess_name] : command), *(arguments || []) ]
               else
                 [ env, command ]
               end
  spawn_opts = {
    unsetenv_others: unsetenv,
  }
  if chdir
    spawn_opts[:chdir] = chdir
  end

  encoding_options = {}
  if scrub
    encoding_options[:invalid] = encoding_options[:undef] = :replace
    if replace_string
      encoding_options[:replace] = replace_string
    end
  end

  log.debug "Executing command", title: title, spawn: spawn_args, mode: mode, stderr: stderr

  readio = writeio = stderrio = wait_thread = nil
  readio_in_use = writeio_in_use = stderrio_in_use = false

  if !mode.include?(:stderr) && !mode.include?(:read_with_stderr) && stderr != :discard # connect
    writeio, readio, wait_thread = *Open3.popen2(*spawn_args, spawn_opts)
  elsif mode.include?(:read_with_stderr)
    writeio, readio, wait_thread = *Open3.popen2e(*spawn_args, spawn_opts)
  else
    writeio, readio, stderrio, wait_thread = *Open3.popen3(*spawn_args, spawn_opts)
    if !mode.include?(:stderr) # stderr == :discard
      stderrio.reopen(IO::NULL)
    end
  end

  if mode.include?(:write)
    writeio.set_encoding(external_encoding, internal_encoding, encoding_options)
    writeio_in_use = true
  end
  if mode.include?(:read) || mode.include?(:read_with_stderr)
    readio.set_encoding(external_encoding, internal_encoding, encoding_options)
    readio_in_use = true
  end
  if mode.include?(:stderr)
    stderrio.set_encoding(external_encoding, internal_encoding, encoding_options)
    stderrio_in_use = true
  end

  pid = wait_thread.pid # wait_thread => Process::Waiter

  io_objects = []
  mode.each do |m|
    io_objects << case m
                  when :read then readio
                  when :write then writeio
                  when :read_with_stderr then readio
                  when :stderr then stderrio
                  else
                    raise "BUG: invalid mode must be checked before here: '#{m}'"
                  end
  end

  m = Mutex.new
  m.lock
  thread = thread_create :child_process_callback do
    m.lock # run after plugin thread get pid, thread instance and i/o
    m.unlock
    begin
      block.call(*io_objects)
    rescue EOFError => e
      log.debug "Process exit and I/O closed", title: title, pid: pid, command: command, arguments: arguments
    rescue IOError => e
      if e.message == 'stream closed'
        log.debug "Process I/O stream closed", title: title, pid: pid, command: command, arguments: arguments
      else
        log.error "Unexpected I/O error for child process", title: title, pid: pid, command: command, arguments: arguments, error: e
      end
    rescue => e
      log.warn "Unexpected error while processing I/O for child process", title: title, pid: pid, command: command, error: e
    end
    process_info = @_child_process_mutex.synchronize do
      process_info = @_child_process_processes[pid]
      @_child_process_processes.delete(pid)
      process_info
    end
    child_process_kill(process_info, force: true) if process_info && process_info.alive && ::Thread.current[:_fluentd_plugin_helper_child_process_running]
  end
  thread[:_fluentd_plugin_helper_child_process_running] = true
  thread[:_fluentd_plugin_helper_child_process_pid] = pid
  pinfo = ProcessInfo.new(title, thread, pid, readio, readio_in_use, writeio, writeio_in_use, stderrio, stderrio_in_use, wait_thread, true, nil)
  @_child_process_mutex.synchronize do
    @_child_process_processes[pid] = pinfo
  end
  m.unlock
  pid
end

#child_process_exit_statusObject



52
53
54
# File 'lib/fluent/plugin_helper/child_process.rb', line 52

def child_process_exit_status
  ::Thread.current[:_fluentd_plugin_helper_child_process_exit_status]
end

#child_process_idObject



48
49
50
# File 'lib/fluent/plugin_helper/child_process.rb', line 48

def child_process_id
  ::Thread.current[:_fluentd_plugin_helper_child_process_pid]
end

#child_process_kill(process_info, force: false) ⇒ Object



160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
# File 'lib/fluent/plugin_helper/child_process.rb', line 160

def child_process_kill(process_info, force: false)
  if !process_info || !process_info.alive
    return
  end

  process_info.killed_at = Time.now unless force

  begin
    pid, status = Process.waitpid2(process_info.pid, Process::WNOHANG)
    if pid && status
      process_info.thread[:_fluentd_plugin_helper_child_process_exit_status] = status
      process_info.alive = false
    end
  rescue Errno::ECHILD, Errno::ESRCH, Errno::EPERM
    process_info.alive = false
  rescue
    # ignore
  end
  if !process_info.alive
    return
  end

  begin
    signal = (Fluent.windows? || force) ? :KILL : :TERM
    Process.kill(signal, process_info.pid)
    if force
      process_info.alive = false
    end
  rescue Errno::ECHILD, Errno::ESRCH
    process_info.alive = false
  end
end

#child_process_running?Boolean

Returns:

  • (Boolean)


43
44
45
46
# File 'lib/fluent/plugin_helper/child_process.rb', line 43

def child_process_running?
  # checker for code in callback of child_process_execute
  ::Thread.current[:_fluentd_plugin_helper_child_process_running] || false
end

#closeObject



138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
# File 'lib/fluent/plugin_helper/child_process.rb', line 138

def close
  while (pids = @_child_process_mutex.synchronize{ @_child_process_processes.keys }).size > 0
    pids.each do |pid|
      process_info = @_child_process_processes[pid]
      if !process_info || !process_info.alive
        @_child_process_mutex.synchronize{ @_child_process_processes.delete(pid) }
        next
      end

      process_info.killed_at ||= Time.now # for illegular case (e.g., created after shutdown)
      next if Time.now < process_info.killed_at + @_child_process_kill_timeout

      child_process_kill(process_info, force: true)
      @_child_process_mutex.synchronize{ @_child_process_processes.delete(pid) }
    end

    sleep CHILD_PROCESS_LOOP_CHECK_INTERVAL
  end

  super
end

#initializeObject



98
99
100
101
102
103
104
# File 'lib/fluent/plugin_helper/child_process.rb', line 98

def initialize
  super
  # plugins MAY configure this parameter
  @_child_process_exit_timeout = CHILD_PROCESS_DEFAULT_EXIT_TIMEOUT
  @_child_process_kill_timeout = CHILD_PROCESS_DEFAULT_KILL_TIMEOUT
  @_child_process_mutex = Mutex.new
end

#shutdownObject



120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
# File 'lib/fluent/plugin_helper/child_process.rb', line 120

def shutdown
  @_child_process_mutex.synchronize{ @_child_process_processes.keys }.each do |pid|
    process_info = @_child_process_processes[pid]
    next if !process_info || !process_info.writeio_in_use
    begin
      Timeout.timeout(@_child_process_exit_timeout) do
        process_info.writeio.close
      end
    rescue Timeout::Error
      log.debug "External process #{process_info.title} doesn't exist after STDIN close in timeout #{@_child_process_exit_timeout}sec"
    end

    child_process_kill(process_info)
  end

  super
end

#startObject



106
107
108
109
# File 'lib/fluent/plugin_helper/child_process.rb', line 106

def start
  super
  @_child_process_processes = {} # pid => ProcessInfo
end

#stopObject



111
112
113
114
115
116
117
118
# File 'lib/fluent/plugin_helper/child_process.rb', line 111

def stop
  @_child_process_mutex.synchronize{ @_child_process_processes.keys }.each do |pid|
    process_info = @_child_process_processes[pid]
    if process_info
      process_info.thread[:_fluentd_plugin_helper_child_process_running] = false
    end
  end
end