Class: Puma::ThreadPool
- Inherits:
-
Object
- Object
- Puma::ThreadPool
- Defined in:
- lib/puma/thread_pool.rb
Overview
A simple thread pool management object.
Defined Under Namespace
Classes: AutoTrim, ForceShutdown, Reaper
Constant Summary collapse
- SHUTDOWN_GRACE_TIME =
How long, after raising the ForceShutdown of a thread during forced shutdown mode, to wait for the thread to try and finish up its work before leaving the thread to die on the vine.
5
Instance Attribute Summary collapse
-
#clean_thread_locals ⇒ Object
Returns the value of attribute clean_thread_locals.
-
#spawned ⇒ Object
readonly
Returns the value of attribute spawned.
-
#trim_requested ⇒ Object
readonly
Returns the value of attribute trim_requested.
Class Method Summary collapse
Instance Method Summary collapse
-
#<<(work) ⇒ Object
Add
work
to the todo list for a Thread to pickup and process. - #auto_reap!(timeout = 5) ⇒ Object
- #auto_trim!(timeout = 30) ⇒ Object
-
#backlog ⇒ Object
How many objects have yet to be processed by the pool?.
-
#initialize(min, max, *extra, &block) ⇒ ThreadPool
constructor
Maintain a minimum of
min
and maximum ofmax
threads in the pool. -
#reap ⇒ Object
If there are dead threads in the pool make them go away while decreasing spawned counter so that new healthy threads could be created again.
-
#shutdown(timeout = -1)) ⇒ Object
Tell all threads in the pool to exit and wait for them to finish.
-
#trim(force = false) ⇒ Object
If too many threads are in the pool, tell one to finish go ahead and exit.
- #wait_until_not_full ⇒ Object
Constructor Details
#initialize(min, max, *extra, &block) ⇒ ThreadPool
Maintain a minimum of min
and maximum of max
threads in the pool.
The block passed is the work that will be performed in each thread.
21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 |
# File 'lib/puma/thread_pool.rb', line 21 def initialize(min, max, *extra, &block) @not_empty = ConditionVariable.new @not_full = ConditionVariable.new @mutex = Mutex.new @todo = [] @spawned = 0 @waiting = 0 @min = Integer(min) @max = Integer(max) @block = block @extra = extra @shutdown = false @trim_requested = 0 @workers = [] @auto_trim = nil @reaper = nil @mutex.synchronize do @min.times { spawn_thread } end @clean_thread_locals = false end |
Instance Attribute Details
#clean_thread_locals ⇒ Object
Returns the value of attribute clean_thread_locals.
53 54 55 |
# File 'lib/puma/thread_pool.rb', line 53 def clean_thread_locals @clean_thread_locals end |
#spawned ⇒ Object (readonly)
Returns the value of attribute spawned.
52 53 54 |
# File 'lib/puma/thread_pool.rb', line 52 def spawned @spawned end |
#trim_requested ⇒ Object (readonly)
Returns the value of attribute trim_requested.
52 53 54 |
# File 'lib/puma/thread_pool.rb', line 52 def trim_requested @trim_requested end |
Class Method Details
.clean_thread_locals ⇒ Object
55 56 57 58 59 |
# File 'lib/puma/thread_pool.rb', line 55 def self.clean_thread_locals Thread.current.keys.each do |key| # rubocop: disable Performance/HashEachMethods Thread.current[key] = nil unless key == :__recursive_key__ end end |
Instance Method Details
#<<(work) ⇒ Object
Add work
to the todo list for a Thread to pickup and process.
140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 |
# File 'lib/puma/thread_pool.rb', line 140 def <<(work) @mutex.synchronize do if @shutdown raise "Unable to add work while shutting down" end @todo << work if @waiting < @todo.size and @spawned < @max spawn_thread end @not_empty.signal end end |
#auto_reap!(timeout = 5) ⇒ Object
255 256 257 258 |
# File 'lib/puma/thread_pool.rb', line 255 def auto_reap!(timeout=5) @reaper = Reaper.new(self, timeout) @reaper.start! end |
#auto_trim!(timeout = 30) ⇒ Object
226 227 228 229 |
# File 'lib/puma/thread_pool.rb', line 226 def auto_trim!(timeout=30) @auto_trim = AutoTrim.new(self, timeout) @auto_trim.start! end |
#backlog ⇒ Object
How many objects have yet to be processed by the pool?
63 64 65 |
# File 'lib/puma/thread_pool.rb', line 63 def backlog @mutex.synchronize { @todo.size } end |
#reap ⇒ Object
If there are dead threads in the pool make them go away while decreasing spawned counter so that new healthy threads could be created again.
187 188 189 190 191 192 193 194 195 196 197 198 199 200 |
# File 'lib/puma/thread_pool.rb', line 187 def reap @mutex.synchronize do dead_workers = @workers.reject(&:alive?) dead_workers.each do |worker| worker.kill @spawned -= 1 end @workers.delete_if do |w| dead_workers.include?(w) end end end |
#shutdown(timeout = -1)) ⇒ Object
Tell all threads in the pool to exit and wait for them to finish.
262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 |
# File 'lib/puma/thread_pool.rb', line 262 def shutdown(timeout=-1) threads = @mutex.synchronize do @shutdown = true @not_empty.broadcast @not_full.broadcast @auto_trim.stop if @auto_trim @reaper.stop if @reaper # dup workers so that we join them all safely @workers.dup end if timeout == -1 # Wait for threads to finish without force shutdown. threads.each(&:join) else # Wait for threads to finish after n attempts (+timeout+). # If threads are still running, it will forcefully kill them. timeout.times do threads.delete_if do |t| t.join 1 end if threads.empty? break else sleep 1 end end threads.each do |t| t.raise ForceShutdown end threads.each do |t| t.join SHUTDOWN_GRACE_TIME end end @spawned = 0 @workers = [] end |
#trim(force = false) ⇒ Object
If too many threads are in the pool, tell one to finish go ahead and exit. If force
is true, then a trim request is requested even if all threads are being utilized.
176 177 178 179 180 181 182 183 |
# File 'lib/puma/thread_pool.rb', line 176 def trim(force=false) @mutex.synchronize do if (force or @waiting > 0) and @spawned - @trim_requested > @min @trim_requested += 1 @not_empty.signal end end end |
#wait_until_not_full ⇒ Object
156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 |
# File 'lib/puma/thread_pool.rb', line 156 def wait_until_not_full @mutex.synchronize do while true return if @shutdown return if @waiting > 0 # If we can still spin up new threads and there # is work queued, then accept more work until we would # spin up the max number of threads. return if @todo.size < @max - @spawned @not_full.wait @mutex end end end |