Class: Chantier::ProcessPoolWithKill

Inherits:
ProcessPool show all
Defined in:
lib/process_pool_with_kill.rb

Overview

Allows you to spin off a pool of subprocesses that is not larger than X, and maintains a pool of those proceses (same as ProcessPool). Will also forcibly quit those processes after a certain period to ensure they do not hang

manager = ProcessPoolWithKill.new(slots = 4, kill_after_seconds: 5) # seconds
jobs_hose.each_job do | job |
  # this call will block until a slot becomes available
  manager.fork_task do # this block runs in a subprocess
    Churner.new(job).churn
  end
  manager.still_running? # => most likely "true"
end

manager.block_until_complete! #=> Will block until all the subprocesses have terminated

Constant Summary collapse

TERMINATION_SIGNALS =
%w( TERM HUP INT QUIT PIPE KILL )
DEFAULT_KILL_TIMEOUT =
60

Constants inherited from ProcessPool

Chantier::ProcessPool::SCHEDULER_SLEEP_SECONDS

Instance Method Summary collapse

Methods inherited from ProcessPool

#block_until_complete!, #fork_task_in_all_slots, #map_fork, #still_running?

Constructor Details

#initialize(num_procs, kill_after_seconds: DEFAULT_KILL_TIMEOUT, **kwargs) ⇒ ProcessPoolWithKill

Returns a new instance of ProcessPoolWithKill.



21
22
23
24
# File 'lib/process_pool_with_kill.rb', line 21

def initialize(num_procs, kill_after_seconds: DEFAULT_KILL_TIMEOUT, **kwargs)
  @kill_after_seconds = kill_after_seconds.to_f
  super(num_procs, **kwargs)
end

Instance Method Details

#fork_task(&blk) ⇒ Object

Run the given block in a forked subprocess. This method will block the thread it is called from until a slot in the process table becomes free. Once that happens, the given block will be forked off and the method will return.



30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
# File 'lib/process_pool_with_kill.rb', line 30

def fork_task(&blk)
  task_pid = super
  Thread.abort_on_exception = true
  # Dispatch the killer thread which kicks in after KILL_AFTER_SECONDS.
  # Note that we do not manage the @pids table here because once the process
  # gets terminated it will bounce back to the standard wait() above.
  Thread.new do
    sleep @kill_after_seconds
    begin
      TERMINATION_SIGNALS.each do | sig |
        Process.kill(sig, task_pid)
        sleep 1 # Give it some time to react
      end
    rescue Errno::ESRCH
      # It has already quit, nothing to do
    end
  end
  
  return task_pid
end