Class: Rmega::Pool
Instance Method Summary collapse
-
#initialize ⇒ Pool
constructor
A new instance of Pool.
- #mutex ⇒ Object
- #process(&block) ⇒ Object
- #threads_raises_exceptions ⇒ Object
- #wait_done ⇒ Object
Methods included from Options
Constructor Details
#initialize ⇒ Pool
Returns a new instance of Pool.
5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
# File 'lib/rmega/pool.rb', line 5 def initialize threads_raises_exceptions @queue = Queue.new @queue_closed = false @threads = [] @cv = ConditionVariable.new @working_threads = 0 .thread_pool_size.times do @threads << Thread.new do while proc = @queue.pop mutex.synchronize do @working_threads += 1 end proc.call mutex.synchronize do @working_threads -= 1 if @queue_closed and @queue.empty? and @working_threads == 0 @cv.signal end end end end end end |
Instance Method Details
#mutex ⇒ Object
35 36 37 |
# File 'lib/rmega/pool.rb', line 35 def mutex @mutex ||= Mutex.new end |
#process(&block) ⇒ Object
43 44 45 |
# File 'lib/rmega/pool.rb', line 43 def process(&block) @queue << block end |
#threads_raises_exceptions ⇒ Object
39 40 41 |
# File 'lib/rmega/pool.rb', line 39 def threads_raises_exceptions Thread.abort_on_exception = true end |
#wait_done ⇒ Object
47 48 49 50 51 52 53 54 55 56 |
# File 'lib/rmega/pool.rb', line 47 def wait_done @queue.close if @queue.respond_to?(:close) @queue_closed = true mutex.synchronize do @cv.wait(mutex) end @threads.each(&:kill) end |