Class: Puma::ThreadPool

Inherits:
Object
  • Object
show all
Defined in:
lib/puma/thread_pool.rb

Overview

Internal Docs for A simple thread pool management object.

Each Puma “worker” has a thread pool to process requests.

First a connection to a client is made in ‘Puma::Server`. It is wrapped in a `Puma::Client` instance and then passed to the `Puma::Reactor` to ensure the whole request is buffered into memory. Once the request is ready, it is passed into a thread pool via the `Puma::ThreadPool#<<` operator where it is stored in a `@todo` array.

Each thread in the pool has an internal loop where it pulls a request from the ‘@todo` array and proceses it.

Defined Under Namespace

Classes: AutoTrim, ForceShutdown, Reaper

Constant Summary collapse

SHUTDOWN_GRACE_TIME =

How long, after raising the ForceShutdown of a thread during forced shutdown mode, to wait for the thread to try and finish up its work before leaving the thread to die on the vine.

5

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(min, max, *extra, &block) ⇒ ThreadPool

Maintain a minimum of min and maximum of max threads in the pool.

The block passed is the work that will be performed in each thread.



32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
# File 'lib/puma/thread_pool.rb', line 32

def initialize(min, max, *extra, &block)
  @not_empty = ConditionVariable.new
  @not_full = ConditionVariable.new
  @mutex = Mutex.new

  @todo = []

  @spawned = 0
  @waiting = 0

  @min = Integer(min)
  @max = Integer(max)
  @block = block
  @extra = extra

  @shutdown = false

  @trim_requested = 0

  @workers = []

  @auto_trim = nil
  @reaper = nil

  @mutex.synchronize do
    @min.times { spawn_thread }
  end

  @clean_thread_locals = false
end

Instance Attribute Details

#clean_thread_localsObject

Returns the value of attribute clean_thread_locals.



64
65
66
# File 'lib/puma/thread_pool.rb', line 64

def clean_thread_locals
  @clean_thread_locals
end

#spawnedObject (readonly)

Returns the value of attribute spawned.



63
64
65
# File 'lib/puma/thread_pool.rb', line 63

def spawned
  @spawned
end

#trim_requestedObject (readonly)

Returns the value of attribute trim_requested.



63
64
65
# File 'lib/puma/thread_pool.rb', line 63

def trim_requested
  @trim_requested
end

#waitingObject (readonly)

Returns the value of attribute waiting.



63
64
65
# File 'lib/puma/thread_pool.rb', line 63

def waiting
  @waiting
end

Class Method Details

.clean_thread_localsObject



66
67
68
69
70
# File 'lib/puma/thread_pool.rb', line 66

def self.clean_thread_locals
  Thread.current.keys.each do |key| # rubocop: disable Performance/HashEachMethods
    Thread.current[key] = nil unless key == :__recursive_key__
  end
end

Instance Method Details

#<<(work) ⇒ Object

Add work to the todo list for a Thread to pickup and process.



155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
# File 'lib/puma/thread_pool.rb', line 155

def <<(work)
  @mutex.synchronize do
    if @shutdown
      raise "Unable to add work while shutting down"
    end

    @todo << work

    if @waiting < @todo.size and @spawned < @max
      spawn_thread
    end

    @not_empty.signal
  end
end

#auto_reap!(timeout = 5) ⇒ Object



296
297
298
299
# File 'lib/puma/thread_pool.rb', line 296

def auto_reap!(timeout=5)
  @reaper = Reaper.new(self, timeout)
  @reaper.start!
end

#auto_trim!(timeout = 30) ⇒ Object



267
268
269
270
# File 'lib/puma/thread_pool.rb', line 267

def auto_trim!(timeout=30)
  @auto_trim = AutoTrim.new(self, timeout)
  @auto_trim.start!
end

#backlogObject

How many objects have yet to be processed by the pool?



74
75
76
# File 'lib/puma/thread_pool.rb', line 74

def backlog
  @mutex.synchronize { @todo.size }
end

#pool_capacityObject



78
79
80
# File 'lib/puma/thread_pool.rb', line 78

def pool_capacity
  waiting + (@max - spawned)
end

#reapObject

If there are dead threads in the pool make them go away while decreasing spawned counter so that new healthy threads could be created again.



228
229
230
231
232
233
234
235
236
237
238
239
240
241
# File 'lib/puma/thread_pool.rb', line 228

def reap
  @mutex.synchronize do
    dead_workers = @workers.reject(&:alive?)

    dead_workers.each do |worker|
      worker.kill
      @spawned -= 1
    end

    @workers.delete_if do |w|
      dead_workers.include?(w)
    end
  end
end

#shutdown(timeout = -1)) ⇒ Object

Tell all threads in the pool to exit and wait for them to finish.



303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
# File 'lib/puma/thread_pool.rb', line 303

def shutdown(timeout=-1)
  threads = @mutex.synchronize do
    @shutdown = true
    @not_empty.broadcast
    @not_full.broadcast

    @auto_trim.stop if @auto_trim
    @reaper.stop if @reaper
    # dup workers so that we join them all safely
    @workers.dup
  end

  if timeout == -1
    # Wait for threads to finish without force shutdown.
    threads.each(&:join)
  else
    # Wait for threads to finish after n attempts (+timeout+).
    # If threads are still running, it will forcefully kill them.
    timeout.times do
      threads.delete_if do |t|
        t.join 1
      end

      if threads.empty?
        break
      else
        sleep 1
      end
    end

    threads.each do |t|
      t.raise ForceShutdown
    end

    threads.each do |t|
      t.join SHUTDOWN_GRACE_TIME
    end
  end

  @spawned = 0
  @workers = []
end

#trim(force = false) ⇒ Object

If too many threads are in the pool, tell one to finish go ahead and exit. If force is true, then a trim request is requested even if all threads are being utilized.



217
218
219
220
221
222
223
224
# File 'lib/puma/thread_pool.rb', line 217

def trim(force=false)
  @mutex.synchronize do
    if (force or @waiting > 0) and @spawned - @trim_requested > @min
      @trim_requested += 1
      @not_empty.signal
    end
  end
end

#wait_until_not_fullObject

This method is used by ‘Puma::Server` to let the server know when the thread pool can pull more requests from the socket and pass to the reactor.

The general idea is that the thread pool can only work on a fixed number of requests at the same time. If it is already processing that number of requests then it is at capacity. If another Puma process has spare capacity, then the request can be left on the socket so the other worker can pick it up and process it.

For example: if there are 5 threads, but only 4 working on requests, this method will not wait and the ‘Puma::Server` can pull a request right away.

If there are 5 threads and all 5 of them are busy, then it will pause here, and wait until the ‘not_full` condition variable is signaled, usually this indicates that a request has been processed.

It’s important to note that even though the server might accept another request, it might not be added to the ‘@todo` array right away. For example if a slow client has only sent a header, but not a body then the `@todo` array would stay the same size as the reactor works to try to buffer the request. In tha scenario the next call to this method would not block and another request would be added into the reactor by the server. This would continue until a fully bufferend request makes it through the reactor and can then be processed by the thread pool.



197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
# File 'lib/puma/thread_pool.rb', line 197

def wait_until_not_full
  @mutex.synchronize do
    while true
      return if @shutdown

      # If we can still spin up new threads and there
      # is work queued that cannot be handled by waiting
      # threads, then accept more work until we would
      # spin up the max number of threads.
      return if @todo.size - @waiting < @max - @spawned

      @not_full.wait @mutex
    end
  end
end