Class: Forkomatic
- Inherits:
-
Object
- Object
- Forkomatic
- Defined in:
- lib/forkomatic.rb
Defined Under Namespace
Classes: Job
Instance Attribute Summary collapse
-
#jobs ⇒ Object
Returns the value of attribute jobs.
-
#max_children ⇒ Object
Returns the value of attribute max_children.
-
#max_iterations ⇒ Object
Returns the value of attribute max_iterations.
-
#wait_for_children ⇒ Object
Returns the value of attribute wait_for_children.
-
#work_interval ⇒ Object
Returns the value of attribute work_interval.
Instance Method Summary collapse
-
#available ⇒ Object
See how many children are available.
-
#build_jobs(count) ⇒ Object
Create workers.
-
#child_pids ⇒ Object
Get a list of current process IDs.
-
#initialize(args) ⇒ Forkomatic
constructor
Initialize the runners.
-
#load_config(config_file) ⇒ Object
Load a configuration from a file.
-
#reap(pid) ⇒ Object
Reap child processes that finished.
-
#reap_all ⇒ Object
Try to reap all child processes.
-
#run ⇒ Object
Do work.
-
#shutdown(all) ⇒ Object
Kill all child processes and shutdown.
Constructor Details
#initialize(args) ⇒ Forkomatic
Initialize the runners.
21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 |
# File 'lib/forkomatic.rb', line 21 def initialize(args) params = {} if args.is_a?(String) # Load config from a file. params = load_config(args) elsif args.is_a?(Integer) # Given an integer, forkomatic will only run N runners 1 time. params[:wait_for_children] = true params[:max_children] = args params[:work_interval] = 0 params[:max_iterations] = 1 elsif args.is_a?(Hash) # Specify the parameters directly. params = args end t = params params.inject({}) {|t, (key, val)| t[key.to_sym] = val; t} self.jobs = [] self.max_children = params[:max_children] || 1 self.work_interval = params[:work_interval].nil? || params[:work_interval] < 0 ? 0 : params[:work_interval] self.max_iterations = params[:max_iterations] self.wait_for_children = params[:wait_for_children].nil? ? true : params[:wait_for_children] end |
Instance Attribute Details
#jobs ⇒ Object
Returns the value of attribute jobs.
17 18 19 |
# File 'lib/forkomatic.rb', line 17 def jobs @jobs end |
#max_children ⇒ Object
Returns the value of attribute max_children.
14 15 16 |
# File 'lib/forkomatic.rb', line 14 def max_children @max_children end |
#max_iterations ⇒ Object
Returns the value of attribute max_iterations.
16 17 18 |
# File 'lib/forkomatic.rb', line 16 def max_iterations @max_iterations end |
#wait_for_children ⇒ Object
Returns the value of attribute wait_for_children.
18 19 20 |
# File 'lib/forkomatic.rb', line 18 def wait_for_children @wait_for_children end |
#work_interval ⇒ Object
Returns the value of attribute work_interval.
15 16 17 |
# File 'lib/forkomatic.rb', line 15 def work_interval @work_interval end |
Instance Method Details
#available ⇒ Object
See how many children are available.
130 131 132 133 134 135 136 |
# File 'lib/forkomatic.rb', line 130 def available # Initialize if need be. return @max_children if @jobs.nil? || @jobs.empty? # Reap children runners without waiting. reap_all @max_children - @jobs.length end |
#build_jobs(count) ⇒ Object
Create workers.
101 102 103 |
# File 'lib/forkomatic.rb', line 101 def build_jobs(count) (1..count).each.collect {Forkomatic::Job.new} end |
#child_pids ⇒ Object
Get a list of current process IDs.
139 140 141 142 143 144 |
# File 'lib/forkomatic.rb', line 139 def child_pids reap_all pids = [] @jobs.each {|job| pids.push(job.pid) if job.pid} pids end |
#load_config(config_file) ⇒ Object
Load a configuration from a file.
46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 |
# File 'lib/forkomatic.rb', line 46 def load_config(config_file) params = {} # Allowed options. = ['max_children', 'work_interval', 'max_iterations', 'wait_for_children'] begin # Try to read the config file, and store the values. data = File.open(config_file, "r").read.split(/\n/) data.each do |line| if line =~ /^\s*([a-zA-Z_]+)\s+([0-9]+)/ config_item = $1 config_value = $2 # Make sure option is valid. if .map(&:downcase).include?(config_item.downcase) params[config_item.to_sym] = config_value.to_i end end end rescue => e puts "Error loading config file: #{e.to_s}" end params end |
#reap(pid) ⇒ Object
Reap child processes that finished.
106 107 108 109 110 111 112 113 114 115 116 |
# File 'lib/forkomatic.rb', line 106 def reap(pid) return true if pid.nil? begin return true if Process.waitpid(pid, Process::WNOHANG) rescue Errno::ECHILD return true rescue => e puts "ERROR: #{e.to_s}" end return false end |
#reap_all ⇒ Object
Try to reap all child processes.
119 120 121 122 123 124 125 126 127 |
# File 'lib/forkomatic.rb', line 119 def reap_all finished = [] @jobs.each do |job| if reap(job.pid) finished.push(job.pid) end end @jobs.delete_if {|job| finished.include?(job.pid)} end |
#run ⇒ Object
Do work.
82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 |
# File 'lib/forkomatic.rb', line 82 def run Signal.trap("INT") { shutdown(true) } iteration = 0 while (@max_iterations.nil? || iteration < @max_iterations) do iteration += 1 current_jobs = build_jobs(available) current_jobs.each do |job| pid = Process.fork do job.work! end job.pid = pid @jobs.push(job) end sleep @work_interval if @work_interval > 0 end Process.waitall if @wait_for_children end |
#shutdown(all) ⇒ Object
Kill all child processes and shutdown.
70 71 72 73 74 75 76 77 78 79 |
# File 'lib/forkomatic.rb', line 70 def shutdown(all) child_pids.each do |pid| begin Process.kill("TERM", pid) rescue => e puts e.to_s end end exit if all end |