Class: Puma::ThreadPool
- Inherits:
-
Object
- Object
- Puma::ThreadPool
- Defined in:
- lib/puma/thread_pool.rb
Overview
Internal Docs for A simple thread pool management object.
Each Puma “worker” has a thread pool to process requests.
First a connection to a client is made in ‘Puma::Server`. It is wrapped in a `Puma::Client` instance and then passed to the `Puma::Reactor` to ensure the whole request is buffered into memory. Once the request is ready, it is passed into a thread pool via the `Puma::ThreadPool#<<` operator where it is stored in a `@todo` array.
Each thread in the pool has an internal loop where it pulls a request from the ‘@todo` array and proceses it.
Defined Under Namespace
Classes: AutoTrim, ForceShutdown, Reaper
Constant Summary collapse
- SHUTDOWN_GRACE_TIME =
How long, after raising the ForceShutdown of a thread during forced shutdown mode, to wait for the thread to try and finish up its work before leaving the thread to die on the vine.
5
Instance Attribute Summary collapse
-
#clean_thread_locals ⇒ Object
Returns the value of attribute clean_thread_locals.
-
#spawned ⇒ Object
readonly
Returns the value of attribute spawned.
-
#trim_requested ⇒ Object
readonly
Returns the value of attribute trim_requested.
-
#waiting ⇒ Object
readonly
Returns the value of attribute waiting.
Class Method Summary collapse
Instance Method Summary collapse
-
#<<(work) ⇒ Object
Add
workto the todo list for a Thread to pickup and process. - #auto_reap!(timeout = 5) ⇒ Object
- #auto_trim!(timeout = 30) ⇒ Object
-
#backlog ⇒ Object
How many objects have yet to be processed by the pool?.
-
#initialize(min, max, *extra, &block) ⇒ ThreadPool
constructor
Maintain a minimum of
minand maximum ofmaxthreads in the pool. - #pool_capacity ⇒ Object
-
#reap ⇒ Object
If there are dead threads in the pool make them go away while decreasing spawned counter so that new healthy threads could be created again.
-
#shutdown(timeout = -1)) ⇒ Object
Tell all threads in the pool to exit and wait for them to finish.
-
#trim(force = false) ⇒ Object
If too many threads are in the pool, tell one to finish go ahead and exit.
-
#wait_until_not_full ⇒ Object
This method is used by ‘Puma::Server` to let the server know when the thread pool can pull more requests from the socket and pass to the reactor.
Constructor Details
#initialize(min, max, *extra, &block) ⇒ ThreadPool
Maintain a minimum of min and maximum of max threads in the pool.
The block passed is the work that will be performed in each thread.
32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 |
# File 'lib/puma/thread_pool.rb', line 32 def initialize(min, max, *extra, &block) @not_empty = ConditionVariable.new @not_full = ConditionVariable.new @mutex = Mutex.new @todo = [] @spawned = 0 @waiting = 0 @min = Integer(min) @max = Integer(max) @block = block @extra = extra @shutdown = false @trim_requested = 0 @workers = [] @auto_trim = nil @reaper = nil @mutex.synchronize do @min.times { spawn_thread } end @clean_thread_locals = false end |
Instance Attribute Details
#clean_thread_locals ⇒ Object
Returns the value of attribute clean_thread_locals.
64 65 66 |
# File 'lib/puma/thread_pool.rb', line 64 def clean_thread_locals @clean_thread_locals end |
#spawned ⇒ Object (readonly)
Returns the value of attribute spawned.
63 64 65 |
# File 'lib/puma/thread_pool.rb', line 63 def spawned @spawned end |
#trim_requested ⇒ Object (readonly)
Returns the value of attribute trim_requested.
63 64 65 |
# File 'lib/puma/thread_pool.rb', line 63 def trim_requested @trim_requested end |
#waiting ⇒ Object (readonly)
Returns the value of attribute waiting.
63 64 65 |
# File 'lib/puma/thread_pool.rb', line 63 def waiting @waiting end |
Class Method Details
.clean_thread_locals ⇒ Object
66 67 68 69 70 |
# File 'lib/puma/thread_pool.rb', line 66 def self.clean_thread_locals Thread.current.keys.each do |key| # rubocop: disable Performance/HashEachMethods Thread.current[key] = nil unless key == :__recursive_key__ end end |
Instance Method Details
#<<(work) ⇒ Object
Add work to the todo list for a Thread to pickup and process.
155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 |
# File 'lib/puma/thread_pool.rb', line 155 def <<(work) @mutex.synchronize do if @shutdown raise "Unable to add work while shutting down" end @todo << work if @waiting < @todo.size and @spawned < @max spawn_thread end @not_empty.signal end end |
#auto_reap!(timeout = 5) ⇒ Object
296 297 298 299 |
# File 'lib/puma/thread_pool.rb', line 296 def auto_reap!(timeout=5) @reaper = Reaper.new(self, timeout) @reaper.start! end |
#auto_trim!(timeout = 30) ⇒ Object
267 268 269 270 |
# File 'lib/puma/thread_pool.rb', line 267 def auto_trim!(timeout=30) @auto_trim = AutoTrim.new(self, timeout) @auto_trim.start! end |
#backlog ⇒ Object
How many objects have yet to be processed by the pool?
74 75 76 |
# File 'lib/puma/thread_pool.rb', line 74 def backlog @mutex.synchronize { @todo.size } end |
#pool_capacity ⇒ Object
78 79 80 |
# File 'lib/puma/thread_pool.rb', line 78 def pool_capacity waiting + (@max - spawned) end |
#reap ⇒ Object
If there are dead threads in the pool make them go away while decreasing spawned counter so that new healthy threads could be created again.
228 229 230 231 232 233 234 235 236 237 238 239 240 241 |
# File 'lib/puma/thread_pool.rb', line 228 def reap @mutex.synchronize do dead_workers = @workers.reject(&:alive?) dead_workers.each do |worker| worker.kill @spawned -= 1 end @workers.delete_if do |w| dead_workers.include?(w) end end end |
#shutdown(timeout = -1)) ⇒ Object
Tell all threads in the pool to exit and wait for them to finish.
303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 |
# File 'lib/puma/thread_pool.rb', line 303 def shutdown(timeout=-1) threads = @mutex.synchronize do @shutdown = true @not_empty.broadcast @not_full.broadcast @auto_trim.stop if @auto_trim @reaper.stop if @reaper # dup workers so that we join them all safely @workers.dup end if timeout == -1 # Wait for threads to finish without force shutdown. threads.each(&:join) else # Wait for threads to finish after n attempts (+timeout+). # If threads are still running, it will forcefully kill them. timeout.times do threads.delete_if do |t| t.join 1 end if threads.empty? break else sleep 1 end end threads.each do |t| t.raise ForceShutdown end threads.each do |t| t.join SHUTDOWN_GRACE_TIME end end @spawned = 0 @workers = [] end |
#trim(force = false) ⇒ Object
If too many threads are in the pool, tell one to finish go ahead and exit. If force is true, then a trim request is requested even if all threads are being utilized.
217 218 219 220 221 222 223 224 |
# File 'lib/puma/thread_pool.rb', line 217 def trim(force=false) @mutex.synchronize do if (force or @waiting > 0) and @spawned - @trim_requested > @min @trim_requested += 1 @not_empty.signal end end end |
#wait_until_not_full ⇒ Object
This method is used by ‘Puma::Server` to let the server know when the thread pool can pull more requests from the socket and pass to the reactor.
The general idea is that the thread pool can only work on a fixed number of requests at the same time. If it is already processing that number of requests then it is at capacity. If another Puma process has spare capacity, then the request can be left on the socket so the other worker can pick it up and process it.
For example: if there are 5 threads, but only 4 working on requests, this method will not wait and the ‘Puma::Server` can pull a request right away.
If there are 5 threads and all 5 of them are busy, then it will pause here, and wait until the ‘not_full` condition variable is signaled, usually this indicates that a request has been processed.
It’s important to note that even though the server might accept another request, it might not be added to the ‘@todo` array right away. For example if a slow client has only sent a header, but not a body then the `@todo` array would stay the same size as the reactor works to try to buffer the request. In tha scenario the next call to this method would not block and another request would be added into the reactor by the server. This would continue until a fully bufferend request makes it through the reactor and can then be processed by the thread pool.
197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 |
# File 'lib/puma/thread_pool.rb', line 197 def wait_until_not_full @mutex.synchronize do while true return if @shutdown # If we can still spin up new threads and there # is work queued that cannot be handled by waiting # threads, then accept more work until we would # spin up the max number of threads. return if @todo.size - @waiting < @max - @spawned @not_full.wait @mutex end end end |