Class: WorkQueue

Inherits:
Object
  • Object
show all
Defined in:
lib/workqueue.rb,
lib/workqueue/version.rb

Defined Under Namespace

Classes: ThreadsafeCounter

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(init_queue = [], opts = {}, &job) ⇒ WorkQueue

Returns a new instance of WorkQueue.



17
18
19
20
21
22
23
24
# File 'lib/workqueue.rb', line 17

def initialize(init_queue=[], opts={}, &job)
  @job = job
  @queue = Queue.new

  opts.each { |k, v| send(:"#{k}=", v) }

  concat(init_queue)
end

Instance Attribute Details

#jobObject (readonly)

Returns the value of attribute job.



16
17
18
# File 'lib/workqueue.rb', line 16

def job
  @job
end

#queueObject (readonly)

Returns the value of attribute queue.



15
16
17
# File 'lib/workqueue.rb', line 15

def queue
  @queue
end

#sizeObject



27
28
29
# File 'lib/workqueue.rb', line 27

def size
  @size ||= 2
end

Class Method Details

.versionObject



2
3
4
# File 'lib/workqueue/version.rb', line 2

def self.version
  '0.1.3'
end

Instance Method Details

#abort!Object



46
47
48
# File 'lib/workqueue.rb', line 46

def abort!
  @aborted = true
end

#concat(arr) ⇒ Object



65
66
67
# File 'lib/workqueue.rb', line 65

def concat(arr)
  arr.each { |x| push(x) }
end

#joinObject



69
70
71
72
73
74
75
# File 'lib/workqueue.rb', line 69

def join
  concat([:__break!] * size)

  workers.each(&:join)

  self
end

#push(e) ⇒ Object Also known as: <<



58
59
60
61
62
# File 'lib/workqueue.rb', line 58

def push(e)
  queue.push([e, cursor.incr])

  self
end

#resultsObject



77
78
79
80
# File 'lib/workqueue.rb', line 77

def results
  join
  aggregate
end

#runObject



50
51
52
53
54
55
56
# File 'lib/workqueue.rb', line 50

def run
  @workers = (1..size).map do
    Thread.new { work! }
  end

  self
end

#work!Object



31
32
33
34
35
36
37
38
39
40
41
42
43
44
# File 'lib/workqueue.rb', line 31

def work!
  loop do
    begin
      break if @aborted
      payload, index = queue.shift
      break if payload == :__break!

      aggregate[index] = job.call(payload)
    rescue Exception
      abort!
      raise
    end
  end
end