Class: Puma::ThreadPool
- Inherits:
-
Object
- Object
- Puma::ThreadPool
- Defined in:
- lib/puma/thread_pool.rb
Overview
A simple thread pool management object.
Defined Under Namespace
Classes: AutoTrim, ForceShutdown, Reaper
Constant Summary collapse
- SHUTDOWN_GRACE_TIME =
How long, after raising the ForceShutdown of a thread during forced shutdown mode, to wait for the thread to try and finish up its work before leaving the thread to die on the vine.
5
Instance Attribute Summary collapse
-
#clean_thread_locals ⇒ Object
Returns the value of attribute clean_thread_locals.
-
#spawned ⇒ Object
readonly
Returns the value of attribute spawned.
-
#trim_requested ⇒ Object
readonly
Returns the value of attribute trim_requested.
Class Method Summary collapse
Instance Method Summary collapse
-
#<<(work) ⇒ Object
Add
workto the todo list for a Thread to pickup and process. - #auto_reap!(timeout = 5) ⇒ Object
- #auto_trim!(timeout = 30) ⇒ Object
-
#backlog ⇒ Object
How many objects have yet to be processed by the pool?.
-
#initialize(min, max, *extra, &block) ⇒ ThreadPool
constructor
Maintain a minimum of
minand maximum ofmaxthreads in the pool. -
#reap ⇒ Object
If there are dead threads in the pool make them go away while decreasing spawned counter so that new healthy threads could be created again.
-
#shutdown(timeout = -1)) ⇒ Object
Tell all threads in the pool to exit and wait for them to finish.
-
#trim(force = false) ⇒ Object
If too many threads are in the pool, tell one to finish go ahead and exit.
- #wait_until_not_full ⇒ Object
Constructor Details
#initialize(min, max, *extra, &block) ⇒ ThreadPool
Maintain a minimum of min and maximum of max threads in the pool.
The block passed is the work that will be performed in each thread.
21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 |
# File 'lib/puma/thread_pool.rb', line 21 def initialize(min, max, *extra, &block) @not_empty = ConditionVariable.new @not_full = ConditionVariable.new @mutex = Mutex.new @todo = [] @spawned = 0 @waiting = 0 @min = Integer(min) @max = Integer(max) @block = block @extra = extra @shutdown = false @trim_requested = 0 @workers = [] @auto_trim = nil @reaper = nil @mutex.synchronize do @min.times { spawn_thread } end @clean_thread_locals = false end |
Instance Attribute Details
#clean_thread_locals ⇒ Object
Returns the value of attribute clean_thread_locals.
53 54 55 |
# File 'lib/puma/thread_pool.rb', line 53 def clean_thread_locals @clean_thread_locals end |
#spawned ⇒ Object (readonly)
Returns the value of attribute spawned.
52 53 54 |
# File 'lib/puma/thread_pool.rb', line 52 def spawned @spawned end |
#trim_requested ⇒ Object (readonly)
Returns the value of attribute trim_requested.
52 53 54 |
# File 'lib/puma/thread_pool.rb', line 52 def trim_requested @trim_requested end |
Class Method Details
.clean_thread_locals ⇒ Object
55 56 57 58 59 |
# File 'lib/puma/thread_pool.rb', line 55 def self.clean_thread_locals Thread.current.keys.each do |key| Thread.current[key] = nil unless key == :__recursive_key__ end end |
Instance Method Details
#<<(work) ⇒ Object
Add work to the todo list for a Thread to pickup and process.
140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 |
# File 'lib/puma/thread_pool.rb', line 140 def <<(work) @mutex.synchronize do if @shutdown raise "Unable to add work while shutting down" end @todo << work if @waiting < @todo.size and @spawned < @max spawn_thread end @not_empty.signal end end |
#auto_reap!(timeout = 5) ⇒ Object
247 248 249 250 |
# File 'lib/puma/thread_pool.rb', line 247 def auto_reap!(timeout=5) @reaper = Reaper.new(self, timeout) @reaper.start! end |
#auto_trim!(timeout = 30) ⇒ Object
218 219 220 221 |
# File 'lib/puma/thread_pool.rb', line 218 def auto_trim!(timeout=30) @auto_trim = AutoTrim.new(self, timeout) @auto_trim.start! end |
#backlog ⇒ Object
How many objects have yet to be processed by the pool?
63 64 65 |
# File 'lib/puma/thread_pool.rb', line 63 def backlog @mutex.synchronize { @todo.size } end |
#reap ⇒ Object
If there are dead threads in the pool make them go away while decreasing spawned counter so that new healthy threads could be created again.
179 180 181 182 183 184 185 186 187 188 189 190 191 192 |
# File 'lib/puma/thread_pool.rb', line 179 def reap @mutex.synchronize do dead_workers = @workers.reject(&:alive?) dead_workers.each do |worker| worker.kill @spawned -= 1 end @workers.delete_if do |w| dead_workers.include?(w) end end end |
#shutdown(timeout = -1)) ⇒ Object
Tell all threads in the pool to exit and wait for them to finish.
254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 |
# File 'lib/puma/thread_pool.rb', line 254 def shutdown(timeout=-1) threads = @mutex.synchronize do @shutdown = true @not_empty.broadcast @not_full.broadcast @auto_trim.stop if @auto_trim @reaper.stop if @reaper # dup workers so that we join them all safely @workers.dup end if timeout == -1 # Wait for threads to finish without force shutdown. threads.each(&:join) else # Wait for threads to finish after n attempts (+timeout+). # If threads are still running, it will forcefully kill them. timeout.times do threads.delete_if do |t| t.join 1 end if threads.empty? break else sleep 1 end end threads.each do |t| t.raise ForceShutdown end threads.each do |t| t.join SHUTDOWN_GRACE_TIME end end @spawned = 0 @workers = [] end |
#trim(force = false) ⇒ Object
If too many threads are in the pool, tell one to finish go ahead and exit. If force is true, then a trim request is requested even if all threads are being utilized.
168 169 170 171 172 173 174 175 |
# File 'lib/puma/thread_pool.rb', line 168 def trim(force=false) @mutex.synchronize do if (force or @waiting > 0) and @spawned - @trim_requested > @min @trim_requested += 1 @not_empty.signal end end end |
#wait_until_not_full ⇒ Object
156 157 158 159 160 161 162 |
# File 'lib/puma/thread_pool.rb', line 156 def wait_until_not_full @mutex.synchronize do until @todo.size - @waiting < @max - @spawned or @shutdown @not_full.wait @mutex end end end |