Class: Puma::ThreadPool

Inherits:
Object
  • Object
show all
Defined in:
lib/puma/thread_pool.rb

Overview

A simple thread pool management object.

Defined Under Namespace

Classes: AutoTrim, ForceShutdown, Reaper

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(min, max, *extra, &block) ⇒ ThreadPool

Maintain a minimum of min and maximum of max threads in the pool.

The block passed is the work that will be performed in each thread.



17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
# File 'lib/puma/thread_pool.rb', line 17

def initialize(min, max, *extra, &block)
  @not_empty = ConditionVariable.new
  @not_full = ConditionVariable.new
  @mutex = Mutex.new

  @todo = []

  @spawned = 0
  @waiting = 0

  @min = Integer(min)
  @max = Integer(max)
  @block = block
  @extra = extra

  @shutdown = false

  @trim_requested = 0

  @workers = []

  @auto_trim = nil
  @reaper = nil

  @mutex.synchronize do
    @min.times { spawn_thread }
  end

  @clean_thread_locals = false
end

Instance Attribute Details

#clean_thread_localsObject

Returns the value of attribute clean_thread_locals.



49
50
51
# File 'lib/puma/thread_pool.rb', line 49

def clean_thread_locals
  @clean_thread_locals
end

#spawnedObject (readonly)

Returns the value of attribute spawned.



48
49
50
# File 'lib/puma/thread_pool.rb', line 48

def spawned
  @spawned
end

#trim_requestedObject (readonly)

Returns the value of attribute trim_requested.



48
49
50
# File 'lib/puma/thread_pool.rb', line 48

def trim_requested
  @trim_requested
end

Class Method Details

.clean_thread_localsObject



51
52
53
54
55
# File 'lib/puma/thread_pool.rb', line 51

def self.clean_thread_locals
  Thread.current.keys.each do |key|
    Thread.current[key] = nil unless key == :__recursive_key__
  end
end

Instance Method Details

#<<(work) ⇒ Object

Add work to the todo list for a Thread to pickup and process.



133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
# File 'lib/puma/thread_pool.rb', line 133

def <<(work)
  @mutex.synchronize do
    if @shutdown
      raise "Unable to add work while shutting down"
    end

    @todo << work

    if @waiting < @todo.size and @spawned < @max
      spawn_thread
    end

    @not_empty.signal
  end
end

#auto_reap!(timeout = 5) ⇒ Object



238
239
240
241
# File 'lib/puma/thread_pool.rb', line 238

def auto_reap!(timeout=5)
  @reaper = Reaper.new(self, timeout)
  @reaper.start!
end

#auto_trim!(timeout = 5) ⇒ Object



209
210
211
212
# File 'lib/puma/thread_pool.rb', line 209

def auto_trim!(timeout=5)
  @auto_trim = AutoTrim.new(self, timeout)
  @auto_trim.start!
end

#backlogObject

How many objects have yet to be processed by the pool?



59
60
61
# File 'lib/puma/thread_pool.rb', line 59

def backlog
  @mutex.synchronize { @todo.size }
end

#reapObject

If there are dead threads in the pool make them go away while decreasing spawned counter so that new healthy threads could be created again.



172
173
174
175
176
177
178
179
180
181
182
183
# File 'lib/puma/thread_pool.rb', line 172

def reap
  @mutex.synchronize do
    dead_workers = @workers.reject(&:alive?)

    dead_workers.each do |worker|
      worker.kill
      @spawned -= 1
    end

    @workers -= dead_workers
  end
end

#shutdown(timeout = -1)) ⇒ Object

Tell all threads in the pool to exit and wait for them to finish.



245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
# File 'lib/puma/thread_pool.rb', line 245

def shutdown(timeout=-1)
  threads = @mutex.synchronize do
    @shutdown = true
    @not_empty.broadcast
    @not_full.broadcast

    @auto_trim.stop if @auto_trim
    @reaper.stop if @reaper
    # dup workers so that we join them all safely
    @workers.dup
  end

  case timeout
  when -1
    threads.each(&:join)
  when 0
    threads.each do |t|
      t.raise ForceShutdown
    end

    threads.each do |t|
      t.join Const::SHUTDOWN_GRACE_TIME
    end
  else
    timeout.times do
      threads.delete_if do |t|
        t.join 1
      end

      if threads.empty?
        break
      else
        sleep 1
      end
    end

    threads.each do |t|
      t.raise ForceShutdown
    end

    threads.each do |t|
      t.join Const::SHUTDOWN_GRACE_TIME
    end
  end

  @spawned = 0
  @workers = []
end

#trim(force = false) ⇒ Object

If too many threads are in the pool, tell one to finish go ahead and exit. If force is true, then a trim request is requested even if all threads are being utilized.



161
162
163
164
165
166
167
168
# File 'lib/puma/thread_pool.rb', line 161

def trim(force=false)
  @mutex.synchronize do
    if (force or @waiting > 0) and @spawned - @trim_requested > @min
      @trim_requested += 1
      @not_empty.signal
    end
  end
end

#wait_until_not_fullObject



149
150
151
152
153
154
155
# File 'lib/puma/thread_pool.rb', line 149

def wait_until_not_full
  @mutex.synchronize do
    until @todo.size - @waiting < @max - @spawned or @shutdown
      @not_full.wait @mutex
    end
  end
end