Class: Rspider::ThreadPool

Inherits:
Object
  • Object
show all
Defined in:
lib/rspider/ThreadPool.rb

Overview

This class acts as a manager and control threads to run in order and avoid to make CPU load heavy.

Examples

* tp=ThreadPool.new(3) * * 1.upto(30) { |j| * t=tp.dispatch(j) {|i| * puts “thread ##{i} start” * sleep rand * puts “thread ##i end” * } * } * tp.shutdown

Instance Method Summary collapse

Constructor Details

#initialize(max_size) ⇒ ThreadPool

initialize method Param: max_size: Max threads can be active at same time



29
30
31
32
33
34
# File 'lib/rspider/ThreadPool.rb', line 29

def initialize(max_size)
	@pool=[]
	@max_size=max_size
	@pool_mutex=Mutex.new
	@pool_cv=ConditionVariable.new
end

Instance Method Details

#dispatch(*args) ⇒ Object

Add a new thread to the pool



37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
# File 'lib/rspider/ThreadPool.rb', line 37

def dispatch(*args)	
	Thread.new do
		@pool_mutex.synchronize do
			while @pool.size >=@max_size
				print "pool full;waiting run #{args.join(',')}...\n" if $DEBUG
				@pool_cv.wait(@pool_mutex)
			end
		end
		@pool << Thread.current
		begin 
			yield(* args)
		rescue => e
			exception(self,e,*args)
		ensure
			@pool_mutex.synchronize do
				@pool.delete(Thread.current)
				@pool_cv.signal
			end	
		end
	end
end

#exception(thread, exception, *original_args) ⇒ Object

we got an error



65
66
67
# File 'lib/rspider/ThreadPool.rb', line 65

def exception (thread,exception,*original_args)
	puts "Exception in thread #{thread}:#{exception}"
end

#shutdownObject

wait all the threads to exit



59
60
61
62
63
# File 'lib/rspider/ThreadPool.rb', line 59

def shutdown
	@pool_mutex.synchronize {
		@pool_cv.wait(@pool_mutex) until @pool.empty?
	}
end