Class: Fluent::ExecFilterOutput::ChildProcess

Inherits:
Object
  • Object
show all
Defined in:
lib/fluent/plugin/out_exec_filter.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(parser, respawns = 0, log = $log) ⇒ ChildProcess

Returns a new instance of ChildProcess.



271
272
273
274
275
276
277
278
279
# File 'lib/fluent/plugin/out_exec_filter.rb', line 271

def initialize(parser, respawns=0, log = $log)
  @pid = nil
  @thread = nil
  @parser = parser
  @respawns = respawns
  @mutex = Mutex.new
  @finished = nil
  @log = log
end

Instance Attribute Details

#finishedObject

Returns the value of attribute finished.



269
270
271
# File 'lib/fluent/plugin/out_exec_filter.rb', line 269

def finished
  @finished
end

Instance Method Details

#kill_child(join_wait) ⇒ Object



292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
# File 'lib/fluent/plugin/out_exec_filter.rb', line 292

def kill_child(join_wait)
  begin
    signal = Fluent.windows? ? :KILL : :TERM
    Process.kill(signal, @pid)
  rescue #Errno::ECHILD, Errno::ESRCH, Errno::EPERM
    # Errno::ESRCH 'No such process', ignore
    # child process killed by signal chained from fluentd process
  end
  if @thread.join(join_wait)
    # @thread successfully shutdown
    return
  end
  begin
    Process.kill(:KILL, @pid)
  rescue #Errno::ECHILD, Errno::ESRCH, Errno::EPERM
    # ignore if successfully killed by :TERM
  end
  @thread.join
end

#runObject



351
352
353
354
355
356
357
358
359
360
361
362
363
364
# File 'lib/fluent/plugin/out_exec_filter.rb', line 351

def run
  @parser.call(@io)
rescue
  @log.error "exec_filter thread unexpectedly failed with an error.", command: @command, error: $!.to_s
  @log.warn_backtrace $!.backtrace
ensure
  _pid, stat = Process.waitpid2(@pid)
  unless @finished
    @log.error "exec_filter process unexpectedly exited.", command: @command, ecode: stat.to_i
    unless @respawns == 0
      @log.warn "exec_filter child process will respawn for next input data (respawns #{@respawns})."
    end
  end
end

#shutdownObject



312
313
314
315
316
317
# File 'lib/fluent/plugin/out_exec_filter.rb', line 312

def shutdown
  @finished = true
  @mutex.synchronize do
    kill_child(60) # TODO wait time
  end
end

#start(command) ⇒ Object



281
282
283
284
285
286
287
288
289
290
# File 'lib/fluent/plugin/out_exec_filter.rb', line 281

def start(command)
  @command = command
  @mutex.synchronize do
    @io = IO.popen(command, "r+")
    @pid = @io.pid
    @io.sync = true
    @thread = Thread.new(&method(:run))
  end
  @finished = false
end

#try_respawnObject



333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
# File 'lib/fluent/plugin/out_exec_filter.rb', line 333

def try_respawn
  return false if @respawns == 0
  @mutex.synchronize do
    return false if @respawns == 0

    kill_child(5) # TODO wait time

    @io = IO.popen(@command, "r+")
    @pid = @io.pid
    @io.sync = true
    @thread = Thread.new(&method(:run))

    @respawns -= 1 if @respawns > 0
  end
  @log.warn "exec_filter child process successfully respawned.", command: @command, respawns: @respawns
  true
end

#write(chunk) ⇒ Object



319
320
321
322
323
324
325
326
327
328
329
330
331
# File 'lib/fluent/plugin/out_exec_filter.rb', line 319

def write(chunk)
  begin
    chunk.write_to(@io)
  rescue Errno::EPIPE => e
    # Broken pipe (child process unexpectedly exited)
    @log.warn "exec_filter Broken pipe, child process maybe exited.", command: @command
    if try_respawn
      retry # retry chunk#write_to with child respawned
    else
      raise e # to retry #write with other ChildProcess instance (when num_children > 1)
    end
  end
end