Class: WorkQueue
- Inherits:
-
Object
show all
- Defined in:
- lib/workqueue.rb,
lib/workqueue/version.rb
Defined Under Namespace
Classes: ThreadsafeCounter
Instance Attribute Summary collapse
Class Method Summary
collapse
Instance Method Summary
collapse
Constructor Details
#initialize(init_queue = [], opts = {}, &job) ⇒ WorkQueue
Returns a new instance of WorkQueue.
17
18
19
20
21
22
23
24
|
# File 'lib/workqueue.rb', line 17
def initialize(init_queue=[], opts={}, &job)
@job = job
@queue = Queue.new
opts.each { |k, v| send(:"#{k}=", v) }
concat(init_queue)
end
|
Instance Attribute Details
#job ⇒ Object
Returns the value of attribute job.
16
17
18
|
# File 'lib/workqueue.rb', line 16
def job
@job
end
|
#queue ⇒ Object
Returns the value of attribute queue.
15
16
17
|
# File 'lib/workqueue.rb', line 15
def queue
@queue
end
|
#size ⇒ Object
27
28
29
|
# File 'lib/workqueue.rb', line 27
def size
@size ||= 2
end
|
Class Method Details
.version ⇒ Object
2
3
4
|
# File 'lib/workqueue/version.rb', line 2
def self.version
'0.1.3'
end
|
Instance Method Details
#abort! ⇒ Object
46
47
48
|
# File 'lib/workqueue.rb', line 46
def abort!
@aborted = true
end
|
#concat(arr) ⇒ Object
65
66
67
|
# File 'lib/workqueue.rb', line 65
def concat(arr)
arr.each { |x| push(x) }
end
|
#join ⇒ Object
69
70
71
72
73
74
75
|
# File 'lib/workqueue.rb', line 69
def join
concat([:__break!] * size)
workers.each(&:join)
self
end
|
#push(e) ⇒ Object
Also known as:
<<
58
59
60
61
62
|
# File 'lib/workqueue.rb', line 58
def push(e)
queue.push([e, cursor.incr])
self
end
|
#results ⇒ Object
77
78
79
80
|
# File 'lib/workqueue.rb', line 77
def results
join
aggregate
end
|
#run ⇒ Object
50
51
52
53
54
55
56
|
# File 'lib/workqueue.rb', line 50
def run
@workers = (1..size).map do
Thread.new { work! }
end
self
end
|
#work! ⇒ Object
31
32
33
34
35
36
37
38
39
40
41
42
43
44
|
# File 'lib/workqueue.rb', line 31
def work!
loop do
begin
break if @aborted
payload, index = queue.shift
break if payload == :__break!
aggregate[index] = job.call(payload)
rescue Exception
abort!
raise
end
end
end
|