Class: Puma::ThreadPool
- Inherits:
-
Object
- Object
- Puma::ThreadPool
- Defined in:
- lib/puma/thread_pool.rb
Overview
A simple thread pool management object.
Defined Under Namespace
Instance Attribute Summary collapse
-
#clean_thread_locals ⇒ Object
Returns the value of attribute clean_thread_locals.
-
#spawned ⇒ Object
readonly
Returns the value of attribute spawned.
-
#trim_requested ⇒ Object
readonly
Returns the value of attribute trim_requested.
Class Method Summary collapse
Instance Method Summary collapse
-
#<<(work) ⇒ Object
Add
work
to the todo list for a Thread to pickup and process. - #auto_reap!(timeout = 5) ⇒ Object
- #auto_trim!(timeout = 5) ⇒ Object
-
#backlog ⇒ Object
How many objects have yet to be processed by the pool?.
-
#initialize(min, max, *extra, &block) ⇒ ThreadPool
constructor
Maintain a minimum of
min
and maximum ofmax
threads in the pool. -
#reap ⇒ Object
If there are dead threads in the pool make them go away while decreasing spawned counter so that new healthy threads could be created again.
-
#shutdown ⇒ Object
Tell all threads in the pool to exit and wait for them to finish.
-
#trim(force = false) ⇒ Object
If too many threads are in the pool, tell one to finish go ahead and exit.
- #wait_until_not_full ⇒ Object
Constructor Details
#initialize(min, max, *extra, &block) ⇒ ThreadPool
Maintain a minimum of min
and maximum of max
threads in the pool.
The block passed is the work that will be performed in each thread.
14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 |
# File 'lib/puma/thread_pool.rb', line 14 def initialize(min, max, *extra, &block) @not_empty = ConditionVariable.new @not_full = ConditionVariable.new @mutex = Mutex.new @todo = [] @spawned = 0 @waiting = 0 @min = Integer(min) @max = Integer(max) @block = block @extra = extra @shutdown = false @trim_requested = 0 @workers = [] @auto_trim = nil @reaper = nil @mutex.synchronize do @min.times { spawn_thread } end @clean_thread_locals = false end |
Instance Attribute Details
#clean_thread_locals ⇒ Object
Returns the value of attribute clean_thread_locals.
46 47 48 |
# File 'lib/puma/thread_pool.rb', line 46 def clean_thread_locals @clean_thread_locals end |
#spawned ⇒ Object (readonly)
Returns the value of attribute spawned.
45 46 47 |
# File 'lib/puma/thread_pool.rb', line 45 def spawned @spawned end |
#trim_requested ⇒ Object (readonly)
Returns the value of attribute trim_requested.
45 46 47 |
# File 'lib/puma/thread_pool.rb', line 45 def trim_requested @trim_requested end |
Class Method Details
.clean_thread_locals ⇒ Object
48 49 50 51 52 |
# File 'lib/puma/thread_pool.rb', line 48 def self.clean_thread_locals Thread.current.keys.each do |key| Thread.current[key] = nil unless key == :__recursive_key__ end end |
Instance Method Details
#<<(work) ⇒ Object
Add work
to the todo list for a Thread to pickup and process.
130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 |
# File 'lib/puma/thread_pool.rb', line 130 def <<(work) @mutex.synchronize do if @shutdown raise "Unable to add work while shutting down" end @todo << work if @waiting < @todo.size and @spawned < @max spawn_thread end @not_empty.signal end end |
#auto_reap!(timeout = 5) ⇒ Object
235 236 237 238 |
# File 'lib/puma/thread_pool.rb', line 235 def auto_reap!(timeout=5) @reaper = Reaper.new(self, timeout) @reaper.start! end |
#auto_trim!(timeout = 5) ⇒ Object
206 207 208 209 |
# File 'lib/puma/thread_pool.rb', line 206 def auto_trim!(timeout=5) @auto_trim = AutoTrim.new(self, timeout) @auto_trim.start! end |
#backlog ⇒ Object
How many objects have yet to be processed by the pool?
56 57 58 |
# File 'lib/puma/thread_pool.rb', line 56 def backlog @mutex.synchronize { @todo.size } end |
#reap ⇒ Object
If there are dead threads in the pool make them go away while decreasing spawned counter so that new healthy threads could be created again.
169 170 171 172 173 174 175 176 177 178 179 180 |
# File 'lib/puma/thread_pool.rb', line 169 def reap @mutex.synchronize do dead_workers = @workers.reject(&:alive?) dead_workers.each do |worker| worker.kill @spawned -= 1 end @workers -= dead_workers end end |
#shutdown ⇒ Object
Tell all threads in the pool to exit and wait for them to finish.
242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 |
# File 'lib/puma/thread_pool.rb', line 242 def shutdown threads = @mutex.synchronize do @shutdown = true @not_empty.broadcast @not_full.broadcast @auto_trim.stop if @auto_trim @reaper.stop if @reaper # dup workers so that we join them all safely @workers.dup end threads.each(&:join) @spawned = 0 @workers = [] end |
#trim(force = false) ⇒ Object
If too many threads are in the pool, tell one to finish go ahead and exit. If force
is true, then a trim request is requested even if all threads are being utilized.
158 159 160 161 162 163 164 165 |
# File 'lib/puma/thread_pool.rb', line 158 def trim(force=false) @mutex.synchronize do if (force or @waiting > 0) and @spawned - @trim_requested > @min @trim_requested += 1 @not_empty.signal end end end |
#wait_until_not_full ⇒ Object
146 147 148 149 150 151 152 |
# File 'lib/puma/thread_pool.rb', line 146 def wait_until_not_full @mutex.synchronize do until @todo.size - @waiting < @max - @spawned or @shutdown @not_full.wait @mutex end end end |