Class: Checklinks::Worklist

Inherits:
Object
  • Object
show all
Defined in:
lib/checklinks/worklist.rb

Overview

A worklist is a list of jobs to do and possibly a pool of threads working on it

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(*jobs) ⇒ Worklist

Create a worklist, possibly with a list of jobs to start with



6
7
8
9
10
11
12
# File 'lib/checklinks/worklist.rb', line 6

def initialize(*jobs)
  @state = nil
  @queue = Queue.new
  @active = Queue.new
  @processing = false
  push *jobs
end

Class Method Details

.close(*worklists) ⇒ Object

Close a set of worklists



65
66
67
# File 'lib/checklinks/worklist.rb', line 65

def self.close(*worklists)
  worklists.map(&:close)
end

.collectorObject

Create a worklist that collects values into an array and returns it when closed



71
72
73
74
75
76
77
# File 'lib/checklinks/worklist.rb', line 71

def self.collector
  worklist = Worklist.new
  worklist.process([]) do |value, collected|
    collected.push value
  end
  worklist
end

Instance Method Details

#closeObject

Declare that you’re not going to push any more jobs, and wait for the current jobs to be processed if you’ve started that



55
56
57
58
59
60
61
62
# File 'lib/checklinks/worklist.rb', line 55

def close
  @threads.size.times do
    @queue.push nil
  end
  @queue.close
  @threads.each &:join
  @state
end

#process(state = nil) ⇒ Object

Process the jobs in the worklist in a background thread. You can pass a state object which is then passed into the block to process each job.



24
25
26
27
28
29
# File 'lib/checklinks/worklist.rb', line 24

def process(state=nil)
  @state = state
  process_concurrently(1) do |job|
    yield job, @state
  end
end

#process_concurrently(n) ⇒ Object

Process the jobs in the worklist in multiple concurrent background threads



32
33
34
35
36
37
38
39
40
41
42
43
44
45
# File 'lib/checklinks/worklist.rb', line 32

def process_concurrently(n)
  raise if @threads
  @threads = n.times.map {
    Thread.new {
      until @queue.closed? && @queue.empty?
        job = @queue.pop
        next unless job
        @active.push :active
        yield job
        @active.pop
      end
    }
  }
end

#push(*jobs) ⇒ Object

Push more jobs onto the worklist



15
16
17
18
19
20
# File 'lib/checklinks/worklist.rb', line 15

def push(*jobs)
  jobs.each do |job|
    raise unless job
    @queue.push job
  end
end

#sizeObject

How many jobs are left to process



48
49
50
51
# File 'lib/checklinks/worklist.rb', line 48

def size
  # This is a a bit racy - we don't update these two queues atomically
  @queue.size + @active.size
end