Class: Rspider::ThreadPool
- Inherits:
-
Object
- Object
- Rspider::ThreadPool
- Defined in:
- lib/rspider/ThreadPool.rb
Overview
This class acts as a manager and control threads to run in order and avoid to make CPU load heavy.
Examples
* tp=ThreadPool.new(3) * * 1.upto(30) { |j| * t=tp.dispatch(j) {|i| * puts “thread ##{i} start” * sleep rand * puts “thread ##i end” * } * } * tp.shutdown
Instance Method Summary collapse
-
#dispatch(*args) ⇒ Object
Add a new thread to the pool.
-
#exception(thread, exception, *original_args) ⇒ Object
we got an error.
-
#initialize(max_size) ⇒ ThreadPool
constructor
initialize method Param: max_size: Max threads can be active at same time.
-
#shutdown ⇒ Object
wait all the threads to exit.
Constructor Details
#initialize(max_size) ⇒ ThreadPool
initialize method Param: max_size: Max threads can be active at same time
29 30 31 32 33 34 |
# File 'lib/rspider/ThreadPool.rb', line 29 def initialize(max_size) @pool=[] @max_size=max_size @pool_mutex=Mutex.new @pool_cv=ConditionVariable.new end |
Instance Method Details
#dispatch(*args) ⇒ Object
Add a new thread to the pool
37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 |
# File 'lib/rspider/ThreadPool.rb', line 37 def dispatch(*args) Thread.new do @pool_mutex.synchronize do while @pool.size >=@max_size print "pool full;waiting run #{args.join(',')}...\n" if $DEBUG @pool_cv.wait(@pool_mutex) end end @pool << Thread.current begin yield(* args) rescue => e exception(self,e,*args) ensure @pool_mutex.synchronize do @pool.delete(Thread.current) @pool_cv.signal end end end end |
#exception(thread, exception, *original_args) ⇒ Object
we got an error
65 66 67 |
# File 'lib/rspider/ThreadPool.rb', line 65 def exception (thread,exception,*original_args) puts "Exception in thread #{thread}:#{exception}" end |
#shutdown ⇒ Object
wait all the threads to exit
59 60 61 62 63 |
# File 'lib/rspider/ThreadPool.rb', line 59 def shutdown @pool_mutex.synchronize { @pool_cv.wait(@pool_mutex) until @pool.empty? } end |