Class: Chantier::ProcessPool
- Inherits:
-
Object
- Object
- Chantier::ProcessPool
- Defined in:
- lib/process_pool.rb
Overview
Allows you to spin off a pool of subprocesses that is not larger than X, and maintains a pool of those proceses. You can then enqueue tasks to be executed within that pool. When all slots are full the caller will be blocked until a slot becomes available.
manager = ProcessPool.new(slots = 4)
jobs_hose.each_job do | job |
# this call will block until a slot becomes available
manager.fork_task do # this block runs in a subprocess
Churner.new(job).churn
end
manager.still_running? # => most likely "true"
end
manager.block_until_complete! #=> Will block until all the subprocesses have terminated
If you have a finite Enumerable at hand you can also launch it into the ProcessPool, like so:
manager = ProcessPool.new(slots = 4)
manager.map_fork(job_tickets) do | job_ticket |
# this block will run in a forked subprocess
Churner.new(job).churn
...
end
Can be rewritten using Threads if operation on JVM/Rubinius will be feasible.
Direct Known Subclasses
Constant Summary collapse
- SCHEDULER_SLEEP_SECONDS =
The manager uses loops in a few places. By doing a little sleep() in those loops we can yield process control back to the OS which brings the CPU usage of the managing process to small numbers. If you just do a loop {} MRI will saturate a whole core and not let go off of it until the loop returns.
(1.0 / 1000)
Instance Method Summary collapse
-
#block_until_complete! ⇒ Object
Analogous to Process.wait or wait_all - will block until all of the process slots have been freed.
-
#fork_task(&blk) ⇒ Object
Run the given block in a forked subprocess.
-
#initialize(num_procs, max_failures: nil) ⇒ ProcessPool
constructor
Initializes a new ProcessPool with the given number of workers.
-
#map_fork(arguments_per_job, &blk) ⇒ Object
Distributes the elements in the given Enumerable to parallel workers, N workers at a time.
-
#still_running? ⇒ Boolean
Tells whether some processes are still churning.
Constructor Details
#initialize(num_procs, max_failures: nil) ⇒ ProcessPool
Initializes a new ProcessPool with the given number of workers. If max_failures is given the fork_task method will raise an exception if more than N processes spawned have been terminated with a non-0 exit status.
40 41 42 43 44 45 46 47 |
# File 'lib/process_pool.rb', line 40 def initialize(num_procs, max_failures: nil) @max_failures = max_failures && max_failures.to_i @non_zero_exits = 0 raise "Need at least 1 slot, given #{num_procs.to_i}" unless num_procs.to_i > 0 @pids = [nil] * num_procs.to_i @semaphore = Mutex.new end |
Instance Method Details
#block_until_complete! ⇒ Object
Analogous to Process.wait or wait_all - will block until all of the process slots have been freed.
134 135 136 137 138 139 |
# File 'lib/process_pool.rb', line 134 def block_until_complete! loop do return unless still_running? sleep SCHEDULER_SLEEP_SECONDS # Breathing room end end |
#fork_task(&blk) ⇒ Object
Run the given block in a forked subprocess. This method will block the thread it is called from until a slot in the process table becomes free. Once that happens, the given block will be forked off and the method will return.
70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 |
# File 'lib/process_pool.rb', line 70 def fork_task(&blk) if @max_failures && @non_zero_exits > @max_failures raise "Reached error limit of processes quitting with non-0 status - limit set at #{@max_failures}" end destination_slot_idx = nil # Try to find a slot in the process table where this job can go catch :_found do loop do @semaphore.synchronize do if destination_slot_idx = @pids.index(nil) @pids[destination_slot_idx] = true # occupy it throw :_found end end sleep SCHEDULER_SLEEP_SECONDS # Breathing room end end task_pid = fork(&blk) # No need to lock this because we already reserved that slot @pids[destination_slot_idx] = task_pid puts("Spun off a task process #{task_pid} into slot #{destination_slot_idx}") if $VERBOSE # Dispatch the watcher thread that will record that the process has quit into the # process table Thread.new do Process.wait(task_pid) # This call will block until that process quites terminated_normally = $?.exited? && $?.exitstatus.zero? @semaphore.synchronize do # Now we can remove that process from the process table @pids[destination_slot_idx] = nil # and increment the error count if needed @non_zero_exits += 1 unless terminated_normally end end return task_pid # Dispatch the killer thread which kicks in after KILL_AFTER_SECONDS. # Note that we do not manage the @pids table here because once the process # gets terminated it will bounce back to the standard wait() above. # Thread.new do # sleep KILL_AFTER_SECONDS # begin # TERMINATION_SIGNALS.each do | sig | # Process.kill(sig, task_pid) # sleep 5 # Give it some time to react # end # rescue Errno::ESRCH # # It has already quit, nothing to do # end # end end |
#map_fork(arguments_per_job, &blk) ⇒ Object
Distributes the elements in the given Enumerable to parallel workers, N workers at a time. The method will return once all the workers for all the elements of the Enumerable have terminated.
pool = ProcessPool.new(5)
pool.map_fork(array_of_urls) do | single_url |
Faraday.get(single_url).response ...
...
...
end
59 60 61 62 63 64 |
# File 'lib/process_pool.rb', line 59 def map_fork(arguments_per_job, &blk) arguments_per_job.each do | single_block_argument | fork_task { yield(single_block_argument) } end block_until_complete! end |
#still_running? ⇒ Boolean
Tells whether some processes are still churning
129 130 131 |
# File 'lib/process_pool.rb', line 129 def still_running? @pids.any?{|e| e } end |