Class: JobRunner

Inherits:
Object show all
Defined in:
lib/epitools/job_runner.rb

Overview

Runs many jobs in parallel, and returns their interleaved results. (NOTE: The JobRunner can be run multiple times; each time the blocks

will be executed again.)

Examples:

JobRunner.new do |jr|

jr.add { 3 }
jr.add { sleep 0.1; 2 }
jr.add { sleep 0.2; 1 }

jr.each_result do |result|
  p result
end

end

jr = JobRunner.new(

proc { 1 },
proc { 2 },
proc { 3 }

)

2.times do

jr.each_result { |result| p result }

end

Instance Method Summary collapse

Constructor Details

#initialize(*blocks, debug: false) ⇒ JobRunner

Returns a new instance of JobRunner.


29
30
31
32
33
34
35
36
37
38
39
40
41
# File 'lib/epitools/job_runner.rb', line 29

def initialize(*blocks, debug: false)
  @threads = []
  @results = Thread::Queue.new
  @jobs    = []
  @started = false
  @debug   = debug

  if blocks.any?
    blocks.each { |block| add &block }
  else
    yield self if block_given?
  end
end

Instance Method Details

#add(&block) ⇒ Object


47
48
49
50
# File 'lib/epitools/job_runner.rb', line 47

def add(&block)
  dmsg("added job #{block}")
  @jobs << block
end

#dmsg(msg) ⇒ Object


43
44
45
# File 'lib/epitools/job_runner.rb', line 43

def dmsg(msg)
  puts "[#{Time.now}] #{msg}" if @debug
end

#each_resultObject


78
79
80
81
82
83
84
85
86
87
88
# File 'lib/epitools/job_runner.rb', line 78

def each_result
  go! unless @started

  loop do
    yield @results.pop
    reap!
    break if @threads.empty? and @results.empty?
  end

  @started = false
end

#go!Object


61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
# File 'lib/epitools/job_runner.rb', line 61

def go!
  if @started
    raise "Error: already started"
  else
    dmsg("starting #{@threads.size} jobs")
  end

  @started = true
  @jobs.each do |job|
    dmsg("adding #{job}")
    @threads << Thread.new do
      @results << job.call
      dmsg("job #{job} complete")
    end
  end
end

#reap!Object


52
53
54
55
56
57
58
59
# File 'lib/epitools/job_runner.rb', line 52

def reap!
  if @threads.any?
    dmsg("reaping #{@threads.size} threads")
    @threads.delete_if { |t| not t.alive? }
  else
    dmsg("reap failed: no threads")
  end
end