Class: Checklinks::Worklist
- Inherits:
-
Object
- Object
- Checklinks::Worklist
- Defined in:
- lib/checklinks/worklist.rb
Overview
A worklist is a list of jobs to do and possibly a pool of threads working on it
Class Method Summary collapse
-
.close(*worklists) ⇒ Object
Close a set of worklists.
-
.collector ⇒ Object
Create a worklist that collects values into an array and returns it when closed.
Instance Method Summary collapse
-
#close ⇒ Object
Declare that you’re not going to push any more jobs, and wait for the current jobs to be processed if you’ve started that.
-
#initialize(*jobs) ⇒ Worklist
constructor
Create a worklist, possibly with a list of jobs to start with.
-
#process(state = nil) ⇒ Object
Process the jobs in the worklist in a background thread.
-
#process_concurrently(n) ⇒ Object
Process the jobs in the worklist in multiple concurrent background threads.
-
#push(*jobs) ⇒ Object
Push more jobs onto the worklist.
-
#size ⇒ Object
How many jobs are left to process.
Constructor Details
#initialize(*jobs) ⇒ Worklist
Create a worklist, possibly with a list of jobs to start with
6 7 8 9 10 11 12 |
# File 'lib/checklinks/worklist.rb', line 6 def initialize(*jobs) @state = nil @queue = Queue.new @active = Queue.new @processing = false push *jobs end |
Class Method Details
.close(*worklists) ⇒ Object
Close a set of worklists
65 66 67 |
# File 'lib/checklinks/worklist.rb', line 65 def self.close(*worklists) worklists.map(&:close) end |
.collector ⇒ Object
Create a worklist that collects values into an array and returns it when closed
71 72 73 74 75 76 77 |
# File 'lib/checklinks/worklist.rb', line 71 def self.collector worklist = Worklist.new worklist.process([]) do |value, collected| collected.push value end worklist end |
Instance Method Details
#close ⇒ Object
Declare that you’re not going to push any more jobs, and wait for the current jobs to be processed if you’ve started that
55 56 57 58 59 60 61 62 |
# File 'lib/checklinks/worklist.rb', line 55 def close @threads.size.times do @queue.push nil end @queue.close @threads.each &:join @state end |
#process(state = nil) ⇒ Object
Process the jobs in the worklist in a background thread. You can pass a state object which is then passed into the block to process each job.
24 25 26 27 28 29 |
# File 'lib/checklinks/worklist.rb', line 24 def process(state=nil) @state = state process_concurrently(1) do |job| yield job, @state end end |
#process_concurrently(n) ⇒ Object
Process the jobs in the worklist in multiple concurrent background threads
32 33 34 35 36 37 38 39 40 41 42 43 44 45 |
# File 'lib/checklinks/worklist.rb', line 32 def process_concurrently(n) raise if @threads @threads = n.times.map { Thread.new { until @queue.closed? && @queue.empty? job = @queue.pop next unless job @active.push :active yield job @active.pop end } } end |
#push(*jobs) ⇒ Object
Push more jobs onto the worklist
15 16 17 18 19 20 |
# File 'lib/checklinks/worklist.rb', line 15 def push(*jobs) jobs.each do |job| raise unless job @queue.push job end end |
#size ⇒ Object
How many jobs are left to process
48 49 50 51 |
# File 'lib/checklinks/worklist.rb', line 48 def size # This is a a bit racy - we don't update these two queues atomically @queue.size + @active.size end |