Class: Chantier::ProcessPool

Inherits:
Object
  • Object
show all
Defined in:
lib/process_pool.rb

Overview

Allows you to spin off a pool of subprocesses that is not larger than X, and maintains a pool of those proceses. You can then enqueue tasks to be executed within that pool. When all slots are full the caller will be blocked until a slot becomes available.

manager = ProcessPool.new(slots = 4)
jobs_hose.each_job do | job |
  # this call will block until a slot becomes available
  manager.fork_task do # this block runs in a subprocess
    Churner.new(job).churn
  end
  manager.still_running? # => most likely "true"
end

manager.block_until_complete! #=> Will block until all the subprocesses have terminated

If you have a finite Enumerable at hand you can also launch it into the ProcessPool, like so:

manager = ProcessPool.new(slots = 4)

manager.map_fork(job_tickets) do | job_ticket |
  # this block will run in a forked subprocess
  Churner.new(job).churn
  ...
end

Can be rewritten using Threads if operation on JVM/Rubinius will be feasible.

Constant Summary collapse

SCHEDULER_SLEEP_SECONDS =

The manager uses loops in a few places. By doing a little sleep() in those loops we can yield process control back to the OS which brings the CPU usage of the managing process to small numbers. If you just do a loop {} MRI will saturate a whole core and not let go off of it until the loop returns.

(1.0 / 1000)

Instance Method Summary collapse

Constructor Details

#initialize(num_procs) ⇒ ProcessPool

Returns a new instance of ProcessPool.



42
43
44
45
46
# File 'lib/process_pool.rb', line 42

def initialize(num_procs)
  raise "Need at least 1 slot, given #{num_procs.to_i}" unless num_procs.to_i > 0
  @pids = [nil] * num_procs.to_i
  @semaphore = Mutex.new
end

Instance Method Details

#block_until_complete!Object

Analogous to Process.wait or wait_all - will block until all of the process slots have been freed.



122
123
124
125
126
127
# File 'lib/process_pool.rb', line 122

def block_until_complete!
  loop do
    return unless still_running?
    sleep SCHEDULER_SLEEP_SECONDS # Breathing room
  end
end

#fork_task(&blk) ⇒ Object

Run the given block in a forked subprocess. This method will block the thread it is called from until a slot in the process table becomes free. Once that happens, the given block will be forked off and the method will return.



69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
# File 'lib/process_pool.rb', line 69

def fork_task(&blk)
  destination_slot_idx = nil
  
  # Try to find a slot in the process table where this job can go
  catch :_found do
    loop do
      @semaphore.synchronize do
        if destination_slot_idx = @pids.index(nil)
          @pids[destination_slot_idx] = true # occupy it
          throw :_found
        end
      end
      sleep SCHEDULER_SLEEP_SECONDS # Breathing room
    end
  end
  
  task_pid = fork(&blk)
  
  # No need to lock this because we already reserved that slot
  @pids[destination_slot_idx] = task_pid
  
  puts("Spun off a task process #{task_pid} into slot #{destination_slot_idx}") if $VERBOSE
  
  # Dispatch the watcher thread that will record that the process has quit into the
  # process table
  Thread.new do
    Process.wait(task_pid) # This call will block until that process quites
    # Now we can remove that process from the process table
    @semaphore.synchronize { @pids[destination_slot_idx] = nil }
  end
  
  # Dispatch the killer thread which kicks in after KILL_AFTER_SECONDS.
  # Note that we do not manage the @pids table here because once the process
  # gets terminated it will bounce back to the standard wait() above.
  # Thread.new do
  #   sleep KILL_AFTER_SECONDS
  #   begin
  #     TERMINATION_SIGNALS.each do | sig |
  #       Process.kill(sig, task_pid)
  #       sleep 5 # Give it some time to react
  #     end
  #   rescue Errno::ESRCH
  #     # It has already quit, nothing to do
  #   end
  # end
end

#map_fork(arguments_per_job, &blk) ⇒ Object

Distributes the elements in the given Enumerable to parallel workers, N workers at a time. The method will return once all the workers for all the elements of the Enumerable have terminated.

pool = ProcessPool.new(5)
pool.map_fork(array_of_urls) do | single_url |
  Faraday.get(single_url).response ...
  ...
  ...
end


58
59
60
61
62
63
# File 'lib/process_pool.rb', line 58

def map_fork(arguments_per_job, &blk)
  arguments_per_job.each do | single_block_argument |
    fork_task { yield(single_block_argument) }
  end
  block_until_complete!
end

#still_running?Boolean

Tells whether some processes are still churning

Returns:

  • (Boolean)


117
118
119
# File 'lib/process_pool.rb', line 117

def still_running?
  @pids.any?{|e| e }
end