Class: ProducerConsumer::WorkerPool
- Inherits:
-
Object
- Object
- ProducerConsumer::WorkerPool
- Includes:
- Enumerable
- Defined in:
- lib/producer_consumer/worker_pool.rb
Direct Known Subclasses
Instance Attribute Summary collapse
-
#output_queue ⇒ Object
readonly
Returns the value of attribute output_queue.
Instance Method Summary collapse
- #alive? ⇒ Boolean
-
#do_work ⇒ Object
worker’s unit of work, can be overriden.
-
#initialize(input, number_of_threads = 1) ⇒ WorkerPool
constructor
A new instance of WorkerPool.
- #run(&blk) ⇒ Object
- #wait ⇒ Object
Methods included from Enumerable
Constructor Details
#initialize(input, number_of_threads = 1) ⇒ WorkerPool
Returns a new instance of WorkerPool.
6 7 8 9 10 11 12 |
# File 'lib/producer_consumer/worker_pool.rb', line 6 def initialize(input, number_of_threads=1) @input = input @number_of_threads = number_of_threads @workers = [] @worker_input_queue = Queue.new @output_queue = Queue.new end |
Instance Attribute Details
#output_queue ⇒ Object (readonly)
Returns the value of attribute output_queue.
4 5 6 |
# File 'lib/producer_consumer/worker_pool.rb', line 4 def output_queue @output_queue end |
Instance Method Details
#alive? ⇒ Boolean
32 33 34 |
# File 'lib/producer_consumer/worker_pool.rb', line 32 def alive? @workers.any? { |t| t.alive? } end |
#do_work ⇒ Object
worker’s unit of work, can be overriden
26 27 28 29 30 |
# File 'lib/producer_consumer/worker_pool.rb', line 26 def do_work until @worker_input_queue.empty? @output_queue << yield(@worker_input_queue.pop) end end |
#run(&blk) ⇒ Object
14 15 16 17 18 19 20 21 22 23 |
# File 'lib/producer_consumer/worker_pool.rb', line 14 def run(&blk) # put input's items into queue for thread safe access @input.each { |item| @worker_input_queue << item } @number_of_threads.times do @workers << Thread.new do do_work(&blk) end end end |
#wait ⇒ Object
36 37 38 |
# File 'lib/producer_consumer/worker_pool.rb', line 36 def wait @workers.each { |t| t.join } end |