Class: Puma::ThreadPool
- Inherits:
-
Object
- Object
- Puma::ThreadPool
- Defined in:
- lib/puma/thread_pool.rb
Overview
Internal Docs for A simple thread pool management object.
Each Puma “worker” has a thread pool to process requests.
First a connection to a client is made in ‘Puma::Server`. It is wrapped in a `Puma::Client` instance and then passed to the `Puma::Reactor` to ensure the whole request is buffered into memory. Once the request is ready, it is passed into a thread pool via the `Puma::ThreadPool#<<` operator where it is stored in a `@todo` array.
Each thread in the pool has an internal loop where it pulls a request from the ‘@todo` array and processes it.
Defined Under Namespace
Classes: Automaton, ForceShutdown
Constant Summary collapse
- SHUTDOWN_GRACE_TIME =
How long, after raising the ForceShutdown of a thread during forced shutdown mode, to wait for the thread to try and finish up its work before leaving the thread to die on the vine.
5
Instance Attribute Summary collapse
- #busy_threads ⇒ Object readonly
-
#out_of_band_running ⇒ Object
readonly
seconds.
- #pool_capacity ⇒ Object readonly
-
#spawned ⇒ Object
readonly
Returns the value of attribute spawned.
-
#trim_requested ⇒ Object
readonly
Returns the value of attribute trim_requested.
-
#waiting ⇒ Object
readonly
Returns the value of attribute waiting.
Instance Method Summary collapse
-
#<<(work) ⇒ Object
Add
workto the todo list for a Thread to pickup and process. - #auto_reap!(timeout = @reaping_time) ⇒ Object
- #auto_trim!(timeout = @auto_trim_time) ⇒ Object
-
#backlog ⇒ Object
How many objects have yet to be processed by the pool?.
-
#backlog_max ⇒ Object
The maximum size of the backlog.
-
#initialize(name, options = {}, server: nil, &block) ⇒ ThreadPool
constructor
Maintain a minimum of
minand maximum ofmaxthreads in the pool. -
#reap ⇒ Object
If there are dead threads in the pool make them go away while decreasing spawned counter so that new healthy threads could be created again.
- #reset_max ⇒ Object
-
#shutdown(timeout = -1)) ⇒ Object
Tell all threads in the pool to exit and wait for them to finish.
-
#stats ⇒ Hash
generate stats hash so as not to perform multiple locks.
-
#trim(force = false) ⇒ Object
If there are any free threads in the pool, tell one to go ahead and exit.
- #wait_while_out_of_band_running ⇒ Object
-
#with_force_shutdown ⇒ Object
Allows ThreadPool::ForceShutdown to be raised within the provided block if the thread is forced to shutdown during execution.
- #with_mutex(&block) ⇒ Object
Constructor Details
#initialize(name, options = {}, server: nil, &block) ⇒ ThreadPool
Maintain a minimum of min and maximum of max threads in the pool.
The block passed is the work that will be performed in each thread.
40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 |
# File 'lib/puma/thread_pool.rb', line 40 def initialize(name, = {}, server: nil, &block) @server = server @not_empty = ConditionVariable.new @not_full = ConditionVariable.new @mutex = Mutex.new @todo = Queue.new @backlog_max = 0 @spawned = 0 @waiting = 0 @name = name @min = Integer([:min_threads]) @max = Integer([:max_threads]) # Not an 'exposed' option, options[:pool_shutdown_grace_time] is used in CI # to shorten @shutdown_grace_time from SHUTDOWN_GRACE_TIME. Parallel CI # makes stubbing constants difficult. @shutdown_grace_time = Float([:pool_shutdown_grace_time] || SHUTDOWN_GRACE_TIME) @block = block @out_of_band = [:out_of_band] @out_of_band_running = false @out_of_band_condvar = ConditionVariable.new @before_thread_start = [:before_thread_start] @before_thread_exit = [:before_thread_exit] @reaping_time = [:reaping_time] @auto_trim_time = [:auto_trim_time] @shutdown = false @trim_requested = 0 @out_of_band_pending = false @workers = [] @auto_trim = nil @reaper = nil @mutex.synchronize do @min.times do spawn_thread @not_full.wait(@mutex) end end @force_shutdown = false @shutdown_mutex = Mutex.new end |
Instance Attribute Details
#busy_threads ⇒ Object (readonly)
129 130 131 |
# File 'lib/puma/thread_pool.rb', line 129 def busy_threads with_mutex { @spawned - @waiting + @todo.size } end |
#out_of_band_running ⇒ Object (readonly)
seconds
32 33 34 |
# File 'lib/puma/thread_pool.rb', line 32 def out_of_band_running @out_of_band_running end |
#pool_capacity ⇒ Object (readonly)
123 124 125 |
# File 'lib/puma/thread_pool.rb', line 123 def pool_capacity waiting + (@max - spawned) end |
#spawned ⇒ Object (readonly)
Returns the value of attribute spawned.
89 90 91 |
# File 'lib/puma/thread_pool.rb', line 89 def spawned @spawned end |
#trim_requested ⇒ Object (readonly)
Returns the value of attribute trim_requested.
89 90 91 |
# File 'lib/puma/thread_pool.rb', line 89 def trim_requested @trim_requested end |
#waiting ⇒ Object (readonly)
Returns the value of attribute waiting.
89 90 91 |
# File 'lib/puma/thread_pool.rb', line 89 def waiting @waiting end |
Instance Method Details
#<<(work) ⇒ Object
Add work to the todo list for a Thread to pickup and process.
261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 |
# File 'lib/puma/thread_pool.rb', line 261 def <<(work) with_mutex do if @shutdown raise "Unable to add work while shutting down" end @todo << work t = @todo.size @backlog_max = t if t > @backlog_max if @waiting < @todo.size and @spawned < @max spawn_thread end @not_empty.signal end end |
#auto_reap!(timeout = @reaping_time) ⇒ Object
342 343 344 345 |
# File 'lib/puma/thread_pool.rb', line 342 def auto_reap!(timeout=@reaping_time) @reaper = Automaton.new(self, timeout, "#{@name} tp reap", :reap) @reaper.start! end |
#auto_trim!(timeout = @auto_trim_time) ⇒ Object
337 338 339 340 |
# File 'lib/puma/thread_pool.rb', line 337 def auto_trim!(timeout=@auto_trim_time) @auto_trim = Automaton.new(self, timeout, "#{@name} tp trim", :trim) @auto_trim.start! end |
#backlog ⇒ Object
How many objects have yet to be processed by the pool?
112 113 114 |
# File 'lib/puma/thread_pool.rb', line 112 def backlog with_mutex { @todo.size } end |
#backlog_max ⇒ Object
The maximum size of the backlog
118 119 120 |
# File 'lib/puma/thread_pool.rb', line 118 def backlog_max with_mutex { @backlog_max } end |
#reap ⇒ Object
If there are dead threads in the pool make them go away while decreasing spawned counter so that new healthy threads could be created again.
295 296 297 298 299 300 301 302 303 304 305 306 307 308 |
# File 'lib/puma/thread_pool.rb', line 295 def reap with_mutex do dead_workers = @workers.reject(&:alive?) dead_workers.each do |worker| worker.kill @spawned -= 1 end @workers.delete_if do |w| dead_workers.include?(w) end end end |
#reset_max ⇒ Object
106 107 108 |
# File 'lib/puma/thread_pool.rb', line 106 def reset_max with_mutex { @backlog_max = 0 } end |
#shutdown(timeout = -1)) ⇒ Object
Tell all threads in the pool to exit and wait for them to finish. Wait timeout seconds then raise ForceShutdown in remaining threads. Next, wait an extra @shutdown_grace_time seconds then force-kill remaining threads. Finally, wait 1 second for remaining threads to exit.
365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 |
# File 'lib/puma/thread_pool.rb', line 365 def shutdown(timeout=-1) threads = with_mutex do @shutdown = true @trim_requested = @spawned @not_empty.broadcast @not_full.broadcast @auto_trim&.stop @reaper&.stop # dup workers so that we join them all safely @workers.dup end if timeout == -1 # Wait for threads to finish without force shutdown. threads.each(&:join) else join = ->(inner_timeout) do start = Process.clock_gettime(Process::CLOCK_MONOTONIC) threads.reject! do |t| elapsed = Process.clock_gettime(Process::CLOCK_MONOTONIC) - start t.join inner_timeout - elapsed end end # Wait +timeout+ seconds for threads to finish. join.call(timeout) # If threads are still running, raise ForceShutdown and wait to finish. @shutdown_mutex.synchronize do @force_shutdown = true threads.each do |t| t.raise ForceShutdown if t[:with_force_shutdown] end end join.call(@shutdown_grace_time) # If threads are _still_ running, forcefully kill them and wait to finish. threads.each(&:kill) join.call(1) end @spawned = 0 @workers = [] end |
#stats ⇒ Hash
generate stats hash so as not to perform multiple locks
93 94 95 96 97 98 99 100 101 102 103 104 |
# File 'lib/puma/thread_pool.rb', line 93 def stats with_mutex do temp = @backlog_max @backlog_max = 0 { backlog: @todo.size, running: @spawned, pool_capacity: @waiting + (@max - @spawned), busy_threads: @spawned - @waiting + @todo.size, backlog_max: temp } end end |
#trim(force = false) ⇒ Object
If there are any free threads in the pool, tell one to go ahead and exit. If force is true, then a trim request is requested even if all threads are being utilized.
283 284 285 286 287 288 289 290 291 |
# File 'lib/puma/thread_pool.rb', line 283 def trim(force=false) with_mutex do free = @waiting - @todo.size if (force or free > 0) and @spawned - @trim_requested > @min @trim_requested += 1 @not_empty.signal end end end |
#wait_while_out_of_band_running ⇒ Object
245 246 247 248 249 250 251 |
# File 'lib/puma/thread_pool.rb', line 245 def wait_while_out_of_band_running return unless @out_of_band_running with_mutex do @out_of_band_condvar.wait(@mutex) while @out_of_band_running end end |
#with_force_shutdown ⇒ Object
Allows ThreadPool::ForceShutdown to be raised within the provided block if the thread is forced to shutdown during execution.
349 350 351 352 353 354 355 356 357 358 |
# File 'lib/puma/thread_pool.rb', line 349 def with_force_shutdown t = Thread.current @shutdown_mutex.synchronize do raise ForceShutdown if @force_shutdown t[:with_force_shutdown] = true end yield ensure t[:with_force_shutdown] = false end |
#with_mutex(&block) ⇒ Object
254 255 256 257 258 |
# File 'lib/puma/thread_pool.rb', line 254 def with_mutex(&block) @mutex.owned? ? yield : @mutex.synchronize(&block) end |