Class: ProducerConsumer::WorkerPool

Inherits:
Object
  • Object
show all
Includes:
Enumerable
Defined in:
lib/producer_consumer/worker_pool.rb

Direct Known Subclasses

ConsumerPool

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Enumerable

#consume, #produce

Constructor Details

#initialize(input, number_of_threads = 1) ⇒ WorkerPool

Returns a new instance of WorkerPool.



6
7
8
9
10
11
12
# File 'lib/producer_consumer/worker_pool.rb', line 6

def initialize(input, number_of_threads=1)
  @input = input
  @number_of_threads = number_of_threads
  @workers = []
  @worker_input_queue = Queue.new
  @output_queue = Queue.new
end

Instance Attribute Details

#output_queueObject (readonly)

Returns the value of attribute output_queue.



4
5
6
# File 'lib/producer_consumer/worker_pool.rb', line 4

def output_queue
  @output_queue
end

Instance Method Details

#alive?Boolean

Returns:

  • (Boolean)


32
33
34
# File 'lib/producer_consumer/worker_pool.rb', line 32

def alive?
  @workers.any? { |t| t.alive? }
end

#do_workObject

worker’s unit of work, can be overriden



26
27
28
29
30
# File 'lib/producer_consumer/worker_pool.rb', line 26

def do_work
  until @worker_input_queue.empty?
    @output_queue << yield(@worker_input_queue.pop)
  end
end

#run(&blk) ⇒ Object



14
15
16
17
18
19
20
21
22
23
# File 'lib/producer_consumer/worker_pool.rb', line 14

def run(&blk)
  # put input's items into queue for thread safe access
  @input.each { |item| @worker_input_queue << item }

  @number_of_threads.times do 
    @workers << Thread.new do
      do_work(&blk)
    end
  end
end

#waitObject



36
37
38
# File 'lib/producer_consumer/worker_pool.rb', line 36

def wait
  @workers.each { |t| t.join }
end