Class: MedularisDaemonsCommon::ThreadPool

Inherits:
Object
  • Object
show all
Defined in:
lib/threadpool.rb

Overview

Launches a specified number of threads on instantiation, assigning work to them as it arrives

Defined Under Namespace

Classes: ThreadPoolJobRunner

Instance Method Summary collapse

Constructor Details

#initialize(num_threads, args = {}) ⇒ ThreadPool

Create a thread pool with a specified number of threads



64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
# File 'lib/threadpool.rb', line 64

def initialize(num_threads, args = {})
	@num_threads = num_threads
	@timeout     = args[:timeout]
	@job_runners = []
	@job_runners_lock = Mutex.new
	@terminate = false
	@terminate_lock = Mutex.new
	
	@work_queue  = Queue.new
	
	0.upto(@num_threads) { |i|
		runner = ThreadPoolJobRunner.new(self)
		@job_runners << runner
		runner.run
	}
	
	# optional timeout thread
	unless @timeout.nil?
		@timeout_thread = Thread.new {
			until terminate
				sleep @timeout
				@job_runners_lock.synchronize {
					@job_runners.each { |jr|
						jr.check_timeout(@timeout)
					}
				}
			end
		}
	end
end

Instance Method Details

#<<(work) ⇒ Object

Add work to the pool



106
107
108
# File 'lib/threadpool.rb', line 106

def <<(work)
	@work_queue.push work
end

#next_jobObject

Return the next job queued up



111
112
113
# File 'lib/threadpool.rb', line 111

def next_job
	@work_queue.pop
end

#stopObject

Terminate the thread pool



116
117
118
119
120
121
# File 'lib/threadpool.rb', line 116

def stop
	terminate = true
	@timeout_thread.join unless @timout_thread.nil?
	@work_queue.clear
	@job_runners_lock.synchronize { @job_runners.each { |jr| jr.stop } }
end

#terminateObject

terminate reader



96
97
98
# File 'lib/threadpool.rb', line 96

def terminate
	@terminate_lock.synchronize { @terminate }
end

#terminate=(val) ⇒ Object

terminate setter



101
102
103
# File 'lib/threadpool.rb', line 101

def terminate=(val)
	@terminate_lock.synchronize { @terminate = val }
end