Class: ThreadPool

Inherits:
Object
  • Object
show all
Defined in:
lib/threadpool.rb

Overview

This is unsurprisingly a thread pool. It can run your jobs asynchronously. It can grow and shrink depending on the load. Like any good pool it can be… closed!

Defined Under Namespace

Classes: Job

Constant Summary collapse

DEFAULT_CORE_WORKERS =
4
DEFAULT_KEEP_ALIVE_TIME =
5
@@controllers =
ThreadGroup.new

Instance Method Summary collapse

Constructor Details

#initialize(*args) ⇒ ThreadPool

new([[core_workers[, max_workers[, keep_alive_time]],] options]) [{|pool| … }]

Arguments

core_workers

Number of core worker threads. The pool will never shrink below this point.

max_workers

Maximum number of worker threads allowed per this pool. The pool will never expand over this limit. Default is core_workers * 2

keep_alive_time

Time to keep non-core workers alive. Default is 5 sec.

options

:core => core_workers, :max => max_workers, :keep_alive => keep_alive_time, :init_core => false to defer initial setup of core workers.

When called with a block the pool will be closed upon exit from the block. Graceful close will be used, a non-bang version.

Example:

ThreadPool.new 10, 25, 6.7, :init_core => false do |pool|
...
end

Raises:

  • (ArgumentError)


45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
# File 'lib/threadpool.rb', line 45

def initialize(*args)
  extend MonitorMixin
  
  options = args.last.is_a?(Hash) ? args.pop : {}

  @core_workers = (args[0] || options[:core] || DEFAULT_CORE_WORKERS).to_i
  raise ArgumentError, "core_workers must be a positive integer" if @core_workers <= 0
  
  @max_workers = (args[1] || options[:max] || @core_workers * 2).to_i
  raise ArgumentError, "max_workers must be >= core_workers" if @max_workers < @core_workers

  @keep_alive_time = (args[2] || options[:keep_alive] || DEFAULT_KEEP_ALIVE_TIME).to_f
  raise ArgumentError, "keep_alive_time must be a non-negative real number" if @keep_alive_time < 0

  @workers, @jobs = [], Queue.new

  @controller = Thread.new do
    loop do
      sleep(@keep_alive_time)
      break if @dead
      synchronize do
        n = @jobs.num_waiting - @core_workers
        stop_workers([n / 2, 1].max) if n >= 0
      end
    end
  end
  @@controllers.add(@controller)
  
  create_workers(@core_workers) if options.fetch(:init_core, true)
  
  begin
    yield self
  ensure
    shutdown
  end if block_given?
end

Instance Method Details

#alive?Boolean

alive? => boolean

Pool is live when it’s not dead. Pool is dead when it’s closed.

Returns:

  • (Boolean)


86
87
88
# File 'lib/threadpool.rb', line 86

def alive?
  synchronize { !@dead }
end

#closeObject

close

Rape me gently. Waits until all the jobs are done and destroys the pool.



121
122
123
124
125
126
127
128
129
# File 'lib/threadpool.rb', line 121

def close
  _sync do
    @dead = true
    @controller.run
    stop_workers(@workers.size)
  end
  ThreadsWait.all_waits(@controller, *@workers)
  self
end

#close!Object

close!

Rape me hard. Instantly kills the workers. Ensure blocks will be called though (last prayer on).



134
135
136
137
138
139
140
141
# File 'lib/threadpool.rb', line 134

def close!
  _sync do
    @dead = true
    @controller.run
    @workers.each {|w| w.kill }
  end
  self
end

#run(*args, &block) ⇒ Object

run([arg1[, arg2[, …]]]) {|[arg1[, arg2[, …]]]| … } -> pool

Schedule the block to run asynchronously on a worker thread. Return immediately. Any arguments passed to this method will be passed to the block.

When there are no idle workers the pool will grow. When max pool size is reached the job will be queued up until better times.

Example:

pool.run('go to hell') do |greeting|
  puts greeting
end


102
103
104
# File 'lib/threadpool.rb', line 102

def run(*args, &block)
  run_core(true, *args, &block)
end

#run?(*args, &block) ⇒ Boolean

try_run([arg1[, arg2[, …]]]) {|[arg1[, arg2[, …]]]| … } -> pool or nil

Try to run the block asynchronously on a worker thread (see run). If there are no idle workers immediately available and the pool reached its maximum size, then do not enqueue the job and return nil.

Example:

puts 'zomg' unless pool.try_run('go to hell') {|greeting| puts greeting }

Returns:

  • (Boolean)


114
115
116
# File 'lib/threadpool.rb', line 114

def run?(*args, &block)
  run_core(false, *args, &block)
end