Class: Puma::ThreadPool

Inherits:
Object
  • Object
show all
Defined in:
lib/puma/thread_pool.rb

Overview

Internal Docs for A simple thread pool management object.

Each Puma “worker” has a thread pool to process requests.

First a connection to a client is made in Puma::Server. It is wrapped in a Puma::Client instance and then passed to the Puma::Reactor to ensure the whole request is buffered into memory. Once the request is ready, it is passed into a thread pool via the ‘Puma::ThreadPool#<<` operator where it is stored in a `@todo` array.

Each thread in the pool has an internal loop where it pulls a request from the ‘@todo` array and processes it.

Defined Under Namespace

Classes: Automaton, ForceShutdown

Constant Summary collapse

SHUTDOWN_GRACE_TIME =

How long, after raising the ForceShutdown of a thread during forced shutdown mode, to wait for the thread to try and finish up its work before leaving the thread to die on the vine.

5

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(name, options = {}, &block) ⇒ ThreadPool

Maintain a minimum of min and maximum of max threads in the pool.

The block passed is the work that will be performed in each thread.



36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
# File 'lib/puma/thread_pool.rb', line 36

def initialize(name, options = {}, &block)
  @not_empty = ConditionVariable.new
  @not_full = ConditionVariable.new
  @mutex = Mutex.new
  @todo = Queue.new

  @backlog_max = 0
  @spawned = 0
  @waiting = 0

  @name = name
  @min = Integer(options[:min_threads])
  @max = Integer(options[:max_threads])
  # Not an 'exposed' option, options[:pool_shutdown_grace_time] is used in CI
  # to shorten @shutdown_grace_time from SHUTDOWN_GRACE_TIME. Parallel CI
  # makes stubbing constants difficult.
  @shutdown_grace_time = Float(options[:pool_shutdown_grace_time] || SHUTDOWN_GRACE_TIME)
  @block = block
  @out_of_band = options[:out_of_band]
  @out_of_band_running = false
  @out_of_band_condvar = ConditionVariable.new
  @before_thread_start = options[:before_thread_start]
  @before_thread_exit = options[:before_thread_exit]
  @reaping_time = options[:reaping_time]
  @auto_trim_time = options[:auto_trim_time]

  @shutdown = false

  @trim_requested = 0
  @out_of_band_pending = false

  @workers = []

  @auto_trim = nil
  @reaper = nil

  @mutex.synchronize do
    @min.times do
      spawn_thread
      @not_full.wait(@mutex)
    end
  end

  @force_shutdown = false
  @shutdown_mutex = Mutex.new
end

Instance Attribute Details

#busy_threadsObject (readonly)

Version:

  • 5.0.0



123
124
125
# File 'lib/puma/thread_pool.rb', line 123

def busy_threads
  with_mutex { @spawned - @waiting + @todo.size }
end

#out_of_band_runningObject (readonly)

seconds



28
29
30
# File 'lib/puma/thread_pool.rb', line 28

def out_of_band_running
  @out_of_band_running
end

#pool_capacityObject (readonly)



117
118
119
# File 'lib/puma/thread_pool.rb', line 117

def pool_capacity
  waiting + (@max - spawned)
end

#spawnedObject (readonly)

Returns the value of attribute spawned.



83
84
85
# File 'lib/puma/thread_pool.rb', line 83

def spawned
  @spawned
end

#trim_requestedObject (readonly)

Returns the value of attribute trim_requested.



83
84
85
# File 'lib/puma/thread_pool.rb', line 83

def trim_requested
  @trim_requested
end

#waitingObject (readonly)

Returns the value of attribute waiting.



83
84
85
# File 'lib/puma/thread_pool.rb', line 83

def waiting
  @waiting
end

Instance Method Details

#<<(work) ⇒ Object

Add work to the todo list for a Thread to pickup and process.



252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
# File 'lib/puma/thread_pool.rb', line 252

def <<(work)
  with_mutex do
    if @shutdown
      raise "Unable to add work while shutting down"
    end

    @todo << work
    t = @todo.size
    @backlog_max = t if t > @backlog_max

    if @waiting < @todo.size and @spawned < @max
      spawn_thread
    end

    @not_empty.signal
  end
end

#auto_reap!(timeout = @reaping_time) ⇒ Object



333
334
335
336
# File 'lib/puma/thread_pool.rb', line 333

def auto_reap!(timeout=@reaping_time)
  @reaper = Automaton.new(self, timeout, "#{@name} tp reap", :reap)
  @reaper.start!
end

#auto_trim!(timeout = @auto_trim_time) ⇒ Object



328
329
330
331
# File 'lib/puma/thread_pool.rb', line 328

def auto_trim!(timeout=@auto_trim_time)
  @auto_trim = Automaton.new(self, timeout, "#{@name} tp trim", :trim)
  @auto_trim.start!
end

#backlogObject

How many objects have yet to be processed by the pool?



106
107
108
# File 'lib/puma/thread_pool.rb', line 106

def backlog
  with_mutex { @todo.size }
end

#backlog_maxObject

The maximum size of the backlog



112
113
114
# File 'lib/puma/thread_pool.rb', line 112

def backlog_max
  with_mutex { @backlog_max }
end

#reapObject

If there are dead threads in the pool make them go away while decreasing spawned counter so that new healthy threads could be created again.



286
287
288
289
290
291
292
293
294
295
296
297
298
299
# File 'lib/puma/thread_pool.rb', line 286

def reap
  with_mutex do
    dead_workers = @workers.reject(&:alive?)

    dead_workers.each do |worker|
      worker.kill
      @spawned -= 1
    end

    @workers.delete_if do |w|
      dead_workers.include?(w)
    end
  end
end

#reset_maxObject



100
101
102
# File 'lib/puma/thread_pool.rb', line 100

def reset_max
  with_mutex { @backlog_max = 0 }
end

#shutdown(timeout = -1)) ⇒ Object

Tell all threads in the pool to exit and wait for them to finish. Wait timeout seconds then raise ForceShutdown in remaining threads. Next, wait an extra @shutdown_grace_time seconds then force-kill remaining threads. Finally, wait 1 second for remaining threads to exit.



356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
# File 'lib/puma/thread_pool.rb', line 356

def shutdown(timeout=-1)
  threads = with_mutex do
    @shutdown = true
    @trim_requested = @spawned
    @not_empty.broadcast
    @not_full.broadcast

    @auto_trim&.stop
    @reaper&.stop
    # dup workers so that we join them all safely
    @workers.dup
  end

  if timeout == -1
    # Wait for threads to finish without force shutdown.
    threads.each(&:join)
  else
    join = ->(inner_timeout) do
      start = Process.clock_gettime(Process::CLOCK_MONOTONIC)
      threads.reject! do |t|
        elapsed = Process.clock_gettime(Process::CLOCK_MONOTONIC) - start
        t.join inner_timeout - elapsed
      end
    end

    # Wait +timeout+ seconds for threads to finish.
    join.call(timeout)

    # If threads are still running, raise ForceShutdown and wait to finish.
    @shutdown_mutex.synchronize do
      @force_shutdown = true
      threads.each do |t|
        t.raise ForceShutdown if t[:with_force_shutdown]
      end
    end
    join.call(@shutdown_grace_time)

    # If threads are _still_ running, forcefully kill them and wait to finish.
    threads.each(&:kill)
    join.call(1)
  end

  @spawned = 0
  @workers = []
end

#statsHash

generate stats hash so as not to perform multiple locks

Returns:

  • (Hash)

    hash containing stat info from ThreadPool



87
88
89
90
91
92
93
94
95
96
97
98
# File 'lib/puma/thread_pool.rb', line 87

def stats
  with_mutex do
    temp = @backlog_max
    @backlog_max = 0
    { backlog: @todo.size,
      running: @spawned,
      pool_capacity: @waiting + (@max - @spawned),
      busy_threads: @spawned - @waiting + @todo.size,
      backlog_max: temp
    }
  end
end

#trim(force = false) ⇒ Object

If there are any free threads in the pool, tell one to go ahead and exit. If force is true, then a trim request is requested even if all threads are being utilized.



274
275
276
277
278
279
280
281
282
# File 'lib/puma/thread_pool.rb', line 274

def trim(force=false)
  with_mutex do
    free = @waiting - @todo.size
    if (force or free > 0) and @spawned - @trim_requested > @min
      @trim_requested += 1
      @not_empty.signal
    end
  end
end

#wait_while_out_of_band_runningObject



236
237
238
239
240
241
242
# File 'lib/puma/thread_pool.rb', line 236

def wait_while_out_of_band_running
  return unless @out_of_band_running

  with_mutex do
    @out_of_band_condvar.wait(@mutex) while @out_of_band_running
  end
end

#with_force_shutdownObject

Allows ThreadPool::ForceShutdown to be raised within the provided block if the thread is forced to shutdown during execution.



340
341
342
343
344
345
346
347
348
349
# File 'lib/puma/thread_pool.rb', line 340

def with_force_shutdown
  t = Thread.current
  @shutdown_mutex.synchronize do
    raise ForceShutdown if @force_shutdown
    t[:with_force_shutdown] = true
  end
  yield
ensure
  t[:with_force_shutdown] = false
end

#with_mutex(&block) ⇒ Object

Version:

  • 5.0.0



245
246
247
248
249
# File 'lib/puma/thread_pool.rb', line 245

def with_mutex(&block)
  @mutex.owned? ?
    yield :
    @mutex.synchronize(&block)
end