Class: Fluent::ExecFilterOutput::ChildProcess
- Inherits:
-
Object
- Object
- Fluent::ExecFilterOutput::ChildProcess
- Defined in:
- lib/fluent/plugin/out_exec_filter.rb
Instance Attribute Summary collapse
-
#finished ⇒ Object
Returns the value of attribute finished.
Instance Method Summary collapse
-
#initialize(parser, respawns = 0, log = $log) ⇒ ChildProcess
constructor
A new instance of ChildProcess.
- #kill_child(join_wait) ⇒ Object
- #run ⇒ Object
- #shutdown ⇒ Object
- #start(command) ⇒ Object
- #try_respawn ⇒ Object
- #write(chunk) ⇒ Object
Constructor Details
#initialize(parser, respawns = 0, log = $log) ⇒ ChildProcess
Returns a new instance of ChildProcess.
271 272 273 274 275 276 277 278 279 |
# File 'lib/fluent/plugin/out_exec_filter.rb', line 271 def initialize(parser, respawns=0, log = $log) @pid = nil @thread = nil @parser = parser @respawns = respawns @mutex = Mutex.new @finished = nil @log = log end |
Instance Attribute Details
#finished ⇒ Object
Returns the value of attribute finished.
269 270 271 |
# File 'lib/fluent/plugin/out_exec_filter.rb', line 269 def finished @finished end |
Instance Method Details
#kill_child(join_wait) ⇒ Object
292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 |
# File 'lib/fluent/plugin/out_exec_filter.rb', line 292 def kill_child(join_wait) begin signal = Fluent.windows? ? :KILL : :TERM Process.kill(signal, @pid) rescue #Errno::ECHILD, Errno::ESRCH, Errno::EPERM # Errno::ESRCH 'No such process', ignore # child process killed by signal chained from fluentd process end if @thread.join(join_wait) # @thread successfully shutdown return end begin Process.kill(:KILL, @pid) rescue #Errno::ECHILD, Errno::ESRCH, Errno::EPERM # ignore if successfully killed by :TERM end @thread.join end |
#run ⇒ Object
351 352 353 354 355 356 357 358 359 360 361 362 363 364 |
# File 'lib/fluent/plugin/out_exec_filter.rb', line 351 def run @parser.call(@io) rescue @log.error "exec_filter thread unexpectedly failed with an error.", command: @command, error: $!.to_s @log.warn_backtrace $!.backtrace ensure _pid, stat = Process.waitpid2(@pid) unless @finished @log.error "exec_filter process unexpectedly exited.", command: @command, ecode: stat.to_i unless @respawns == 0 @log.warn "exec_filter child process will respawn for next input data (respawns #{@respawns})." end end end |
#shutdown ⇒ Object
312 313 314 315 316 317 |
# File 'lib/fluent/plugin/out_exec_filter.rb', line 312 def shutdown @finished = true @mutex.synchronize do kill_child(60) # TODO wait time end end |
#start(command) ⇒ Object
281 282 283 284 285 286 287 288 289 290 |
# File 'lib/fluent/plugin/out_exec_filter.rb', line 281 def start(command) @command = command @mutex.synchronize do @io = IO.popen(command, "r+") @pid = @io.pid @io.sync = true @thread = Thread.new(&method(:run)) end @finished = false end |
#try_respawn ⇒ Object
333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 |
# File 'lib/fluent/plugin/out_exec_filter.rb', line 333 def try_respawn return false if @respawns == 0 @mutex.synchronize do return false if @respawns == 0 kill_child(5) # TODO wait time @io = IO.popen(@command, "r+") @pid = @io.pid @io.sync = true @thread = Thread.new(&method(:run)) @respawns -= 1 if @respawns > 0 end @log.warn "exec_filter child process successfully respawned.", command: @command, respawns: @respawns true end |
#write(chunk) ⇒ Object
319 320 321 322 323 324 325 326 327 328 329 330 331 |
# File 'lib/fluent/plugin/out_exec_filter.rb', line 319 def write(chunk) begin chunk.write_to(@io) rescue Errno::EPIPE => e # Broken pipe (child process unexpectedly exited) @log.warn "exec_filter Broken pipe, child process maybe exited.", command: @command if try_respawn retry # retry chunk#write_to with child respawned else raise e # to retry #write with other ChildProcess instance (when num_children > 1) end end end |