Class: Forkomatic

Inherits:
Object
  • Object
show all
Defined in:
lib/forkomatic.rb

Defined Under Namespace

Classes: Job

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(args) ⇒ Forkomatic

Initialize the runners.



21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# File 'lib/forkomatic.rb', line 21

def initialize(args)
  params = {}
  if args.is_a?(String)
    # Load config from a file.
    params = load_config(args)
  elsif args.is_a?(Integer)
    # Given an integer, forkomatic will only run N runners 1 time.
    params[:wait_for_children] = true
    params[:max_children] = args
    params[:work_interval] = 0
    params[:max_iterations] = 1
  elsif args.is_a?(Hash)
    # Specify the parameters directly.
    params = args
  end
  t = params
  params.inject({}) {|t, (key, val)| t[key.to_sym] = val; t}
  self.jobs = []
  self.max_children = params[:max_children] || 1
  self.work_interval = params[:work_interval].nil? || params[:work_interval] < 0 ? 0 : params[:work_interval]
  self.max_iterations = params[:max_iterations]
  self.wait_for_children = params[:wait_for_children].nil? ? true : params[:wait_for_children]
end

Instance Attribute Details

#jobsObject

Returns the value of attribute jobs.



17
18
19
# File 'lib/forkomatic.rb', line 17

def jobs
  @jobs
end

#max_childrenObject

Returns the value of attribute max_children.



14
15
16
# File 'lib/forkomatic.rb', line 14

def max_children
  @max_children
end

#max_iterationsObject

Returns the value of attribute max_iterations.



16
17
18
# File 'lib/forkomatic.rb', line 16

def max_iterations
  @max_iterations
end

#wait_for_childrenObject

Returns the value of attribute wait_for_children.



18
19
20
# File 'lib/forkomatic.rb', line 18

def wait_for_children
  @wait_for_children
end

#work_intervalObject

Returns the value of attribute work_interval.



15
16
17
# File 'lib/forkomatic.rb', line 15

def work_interval
  @work_interval
end

Instance Method Details

#availableObject

See how many children are available.



130
131
132
133
134
135
136
# File 'lib/forkomatic.rb', line 130

def available
  # Initialize if need be.
  return @max_children if @jobs.nil? || @jobs.empty?
  # Reap children runners without waiting.
  reap_all
  @max_children - @jobs.length
end

#build_jobs(count) ⇒ Object

Create workers.



101
102
103
# File 'lib/forkomatic.rb', line 101

def build_jobs(count)
  (1..count).each.collect {Forkomatic::Job.new}
end

#child_pidsObject

Get a list of current process IDs.



139
140
141
142
143
144
# File 'lib/forkomatic.rb', line 139

def child_pids
  reap_all
  pids = []
  @jobs.each {|job| pids.push(job.pid) if job.pid}
  pids
end

#load_config(config_file) ⇒ Object

Load a configuration from a file.



46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
# File 'lib/forkomatic.rb', line 46

def load_config(config_file)
  params = {}
  # Allowed options.
  options = ['max_children', 'work_interval', 'max_iterations', 'wait_for_children']
  begin
    # Try to read the config file, and store the values.
    data = File.open(config_file, "r").read.split(/\n/)
    data.each do |line|
      if line =~ /^\s*([a-zA-Z_]+)\s+([0-9]+)/
        config_item = $1
        config_value = $2
        # Make sure option is valid.
        if options.map(&:downcase).include?(config_item.downcase)
          params[config_item.to_sym] = config_value.to_i
        end
      end
    end
  rescue => e
    puts "Error loading config file: #{e.to_s}"
  end
  params
end

#reap(pid) ⇒ Object

Reap child processes that finished.



106
107
108
109
110
111
112
113
114
115
116
# File 'lib/forkomatic.rb', line 106

def reap(pid)
  return true if pid.nil?
  begin
    return true if Process.waitpid(pid, Process::WNOHANG)
  rescue Errno::ECHILD
    return true
  rescue => e
    puts "ERROR: #{e.to_s}"
  end
  return false
end

#reap_allObject

Try to reap all child processes.



119
120
121
122
123
124
125
126
127
# File 'lib/forkomatic.rb', line 119

def reap_all
  finished = []
  @jobs.each do |job|
    if reap(job.pid)
      finished.push(job.pid)
    end
  end
  @jobs.delete_if {|job| finished.include?(job.pid)}
end

#runObject

Do work.



82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
# File 'lib/forkomatic.rb', line 82

def run
  Signal.trap("INT")  { shutdown(true) }
  iteration = 0
  while (@max_iterations.nil? || iteration < @max_iterations) do
    iteration += 1
    current_jobs = build_jobs(available)
    current_jobs.each do |job|
      pid = Process.fork do
        job.work!
      end
      job.pid = pid
      @jobs.push(job)
    end
    sleep @work_interval if @work_interval > 0
  end
  Process.waitall if @wait_for_children
end

#shutdown(all) ⇒ Object

Kill all child processes and shutdown.



70
71
72
73
74
75
76
77
78
79
# File 'lib/forkomatic.rb', line 70

def shutdown(all)
  child_pids.each do |pid|
    begin
      Process.kill("TERM", pid)
    rescue => e
      puts e.to_s
    end
  end
  exit if all
end