Class: ThreadedProcessor

Inherits:
Object
  • Object
show all
Defined in:
lib/threaded_processor.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(object_list = [], thread_count = 3, &block) ⇒ ThreadedProcessor

Provide an object list array, thread count (Integer), and a block to run



34
35
36
37
38
39
40
# File 'lib/threaded_processor.rb', line 34

def initialize(object_list=[], thread_count=3, &block)
  @list = object_list
  @max_threads = thread_count
  @queue = Queue.new
  @block = block
  self
end

Instance Attribute Details

#blockObject

Returns the value of attribute block.



30
31
32
# File 'lib/threaded_processor.rb', line 30

def block
  @block
end

#listObject

Returns the value of attribute list.



30
31
32
# File 'lib/threaded_processor.rb', line 30

def list
  @list
end

#max_threadsObject

Returns the value of attribute max_threads.



30
31
32
# File 'lib/threaded_processor.rb', line 30

def max_threads
  @max_threads
end

Instance Method Details

#<<(obj) ⇒ Object

Append an item to the object list



43
44
45
# File 'lib/threaded_processor.rb', line 43

def <<(obj)
  @list << obj
end

#empty!Object

Empty the object list



48
49
50
# File 'lib/threaded_processor.rb', line 48

def empty!
  @list = []
end

#run!Object

Run the multithreaded process.



53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
# File 'lib/threaded_processor.rb', line 53

def run!
  raise ThreadedProcessorError.new("Object list is empty!") if (@list.empty? || !@list.is_a?(Array))
  raise ThreadedProcessorError.new("Assign block") if @block.nil?
  threads = (1..@max_threads).map do |i|
   Thread.new(@queue) do |q|
     until ( q == ( obj = q.deq ) )
       @block.call(obj)
     end
   end
  end
  @list.each{|o| @queue.enq o}
  threads.size.times{@queue.enq @queue}
  threads.each{|t| t.join}
  true
end