Class: ThreadPool
- Inherits:
-
Object
- Object
- ThreadPool
- Defined in:
- lib/threadpool.rb
Overview
This is unsurprisingly a thread pool. It can run your jobs asynchronously. It can grow and shrink depending on the load. Like any good pool it can be… closed!
Defined Under Namespace
Classes: Job
Constant Summary collapse
- DEFAULT_CORE_WORKERS =
4- DEFAULT_KEEP_ALIVE_TIME =
5- @@controllers =
ThreadGroup.new
Instance Method Summary collapse
-
#alive? ⇒ Boolean
alive? => boolean.
-
#close ⇒ Object
close.
-
#close! ⇒ Object
close!.
-
#initialize(*args) ⇒ ThreadPool
constructor
new([[core_workers[, max_workers[, keep_alive_time]],] options]) [{|pool| … }].
-
#run(*args, &block) ⇒ Object
run([arg1[, arg2[, …]]]) {|[arg1[, arg2[, …]]]| … } -> pool.
-
#run?(*args, &block) ⇒ Boolean
try_run([arg1[, arg2[, …]]]) {|[arg1[, arg2[, …]]]| … } -> pool or nil.
Constructor Details
#initialize(*args) ⇒ ThreadPool
new([[core_workers[, max_workers[, keep_alive_time]],] options]) [{|pool| … }]
Arguments
core_workers-
Number of core worker threads. The pool will never shrink below this point.
max_workers-
Maximum number of worker threads allowed per this pool. The pool will never expand over this limit. Default is core_workers * 2
keep_alive_time-
Time to keep non-core workers alive. Default is 5 sec.
options-
:core => core_workers, :max => max_workers, :keep_alive => keep_alive_time, :init_core => false to defer initial setup of core workers.
When called with a block the pool will be closed upon exit from the block. Graceful close will be used, a non-bang version.
Example:
ThreadPool.new 10, 25, 6.7, :init_core => false do |pool|
...
end
45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 |
# File 'lib/threadpool.rb', line 45 def initialize(*args) extend MonitorMixin = args.last.is_a?(Hash) ? args.pop : {} @core_workers = (args[0] || [:core] || DEFAULT_CORE_WORKERS).to_i raise ArgumentError, "core_workers must be a positive integer" if @core_workers <= 0 @max_workers = (args[1] || [:max] || @core_workers * 2).to_i raise ArgumentError, "max_workers must be >= core_workers" if @max_workers < @core_workers @keep_alive_time = (args[2] || [:keep_alive] || DEFAULT_KEEP_ALIVE_TIME).to_f raise ArgumentError, "keep_alive_time must be a non-negative real number" if @keep_alive_time < 0 @workers, @jobs = [], Queue.new @controller = Thread.new do loop do sleep(@keep_alive_time) break if @dead synchronize do n = @jobs.num_waiting - @core_workers stop_workers([n / 2, 1].max) if n >= 0 end end end @@controllers.add(@controller) create_workers(@core_workers) if .fetch(:init_core, true) begin yield self ensure shutdown end if block_given? end |
Instance Method Details
#alive? ⇒ Boolean
alive? => boolean
Pool is live when it’s not dead. Pool is dead when it’s closed.
86 87 88 |
# File 'lib/threadpool.rb', line 86 def alive? synchronize { !@dead } end |
#close ⇒ Object
close
Rape me gently. Waits until all the jobs are done and destroys the pool.
121 122 123 124 125 126 127 128 129 |
# File 'lib/threadpool.rb', line 121 def close _sync do @dead = true @controller.run stop_workers(@workers.size) end ThreadsWait.all_waits(@controller, *@workers) self end |
#close! ⇒ Object
close!
Rape me hard. Instantly kills the workers. Ensure blocks will be called though (last prayer on).
134 135 136 137 138 139 140 141 |
# File 'lib/threadpool.rb', line 134 def close! _sync do @dead = true @controller.run @workers.each {|w| w.kill } end self end |
#run(*args, &block) ⇒ Object
run([arg1[, arg2[, …]]]) {|[arg1[, arg2[, …]]]| … } -> pool
Schedule the block to run asynchronously on a worker thread. Return immediately. Any arguments passed to this method will be passed to the block.
When there are no idle workers the pool will grow. When max pool size is reached the job will be queued up until better times.
Example:
pool.run('go to hell') do |greeting|
puts greeting
end
102 103 104 |
# File 'lib/threadpool.rb', line 102 def run(*args, &block) run_core(true, *args, &block) end |
#run?(*args, &block) ⇒ Boolean
try_run([arg1[, arg2[, …]]]) {|[arg1[, arg2[, …]]]| … } -> pool or nil
Try to run the block asynchronously on a worker thread (see run). If there are no idle workers immediately available and the pool reached its maximum size, then do not enqueue the job and return nil.
Example:
puts 'zomg' unless pool.try_run('go to hell') {|greeting| puts greeting }
114 115 116 |
# File 'lib/threadpool.rb', line 114 def run?(*args, &block) run_core(false, *args, &block) end |