Class: TheForce::ThreadPool

Inherits:
Object show all
Defined in:
lib/the_force/thread_pool.rb

Instance Method Summary collapse

Constructor Details

#initialize(max_size) ⇒ ThreadPool

Returns a new instance of ThreadPool.



10
11
12
13
14
15
# File 'lib/the_force/thread_pool.rb', line 10

def initialize(max_size)
  @pool = []
  @max_size = max_size
  @pool_mutex = Mutex.new
  @pool_cv = ConditionVariable.new  
end

Instance Method Details

#busy?Boolean

Returns:

  • (Boolean)


47
48
49
# File 'lib/the_force/thread_pool.rb', line 47

def busy?
  @pool.length > 0
end

#dispatch(*args) ⇒ Object



17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
# File 'lib/the_force/thread_pool.rb', line 17

def dispatch(*args)    
  Thread.new do
    # Wait for space in the pool.
    @pool_mutex.synchronize do
      while @pool.size >= @max_size          
        # Sleep until some other thread calls @pool_cv.signal.
        @pool_cv.wait(@pool_mutex)
      end
    end

    @pool << Thread.current
    begin
      yield(*args)
    rescue => e
      exception(self, e, *args)
    ensure
      @pool_mutex.synchronize do
        # Remove the thread from the pool.
        @pool.delete(Thread.current)
        # Signal the next waiting thread that there's a space in the pool.
        @pool_cv.signal            
      end
    end
  end
end

#exception(thread, exception, *original_args) ⇒ Object



51
52
53
54
# File 'lib/the_force/thread_pool.rb', line 51

def exception(thread, exception, *original_args)
  # Subclass this method to handle an exception within a thread.
  puts "Exception in thread #{thread}: #{exception}"
end

#shutdownObject



43
44
45
# File 'lib/the_force/thread_pool.rb', line 43

def shutdown
  @pool_mutex.synchronize { @pool_cv.wait(@pool_mutex) until @pool.empty? }
end