Class: Bixby::ThreadPool
- Inherits:
-
Object
- Object
- Bixby::ThreadPool
- Includes:
- Log
- Defined in:
- lib/bixby-common/util/thread_pool.rb,
lib/bixby-common/util/thread_pool/task.rb,
lib/bixby-common/util/thread_pool/worker.rb
Defined Under Namespace
Constant Summary collapse
- DEFAULT_MIN =
1
- DEFAULT_MAX =
8
- DEFAULT_IDLE_TIMEOUT =
60
Instance Method Summary collapse
- #<<(proc) ⇒ Object
- #contract(count, &block) ⇒ Object
- #dispose ⇒ Object
- #enqueue(command, block = nil) ⇒ Object
- #expand(count) ⇒ Object
-
#initialize(options = {}) ⇒ ThreadPool
constructor
A new instance of ThreadPool.
- #inspect ⇒ Object
- #join(max_wait = nil) ⇒ Object
- #num_busy ⇒ Object (also: #num_working)
- #num_idle ⇒ Object
- #num_jobs ⇒ Object
- #perform(&block) ⇒ Object
- #resize(new_size) ⇒ Object
- #shutdown(&block) ⇒ Object
- #size ⇒ Object
-
#summary ⇒ Object
For debugging.
- #to_s ⇒ Object
Methods included from Log
Constructor Details
#initialize(options = {}) ⇒ ThreadPool
Returns a new instance of ThreadPool.
19 20 21 22 23 24 25 26 27 28 29 |
# File 'lib/bixby-common/util/thread_pool.rb', line 19 def initialize( = {}) @input_queue = Queue.new @lock = Monitor.new @workers = [] @min_size = [:min_size] || DEFAULT_MIN @max_size = [:max_size] || DEFAULT_MAX @idle_timeout = [:idle_timeout] || DEFAULT_IDLE_TIMEOUT @size = 0 (@min_size) end |
Instance Method Details
#<<(proc) ⇒ Object
45 46 47 48 |
# File 'lib/bixby-common/util/thread_pool.rb', line 45 def <<(proc) enqueue(:perform, block) nil end |
#contract(count, &block) ⇒ Object
117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 |
# File 'lib/bixby-common/util/thread_pool.rb', line 117 def contract(count, &block) @lock.synchronize do raise 'Count is too large.' if count > @size count.times do callback = Proc.new do |worker| remove_worker(worker) block.call if block end enqueue(:shutdown, callback) end end nil end |
#dispose ⇒ Object
83 84 85 86 87 88 89 90 |
# File 'lib/bixby-common/util/thread_pool.rb', line 83 def dispose @lock.synchronize do shutdown join end nil end |
#enqueue(command, block = nil) ⇒ Object
31 32 33 34 35 36 37 38 |
# File 'lib/bixby-common/util/thread_pool.rb', line 31 def enqueue(command, block=nil) logger.debug { "enqueue new task: #{command}" } @input_queue.push(Task.new(command, block)) if command == :perform then grow end nil end |
#expand(count) ⇒ Object
106 107 108 109 110 111 112 113 114 115 |
# File 'lib/bixby-common/util/thread_pool.rb', line 106 def (count) @lock.synchronize do logger.debug "expanding by #{count} threads (from #{@size})" count.times do create_worker end end nil end |
#inspect ⇒ Object
92 93 94 |
# File 'lib/bixby-common/util/thread_pool.rb', line 92 def inspect "#<#{self.class.to_s}:0x#{(object_id << 1).to_s(16)} threads=#{size} jobs=#{num_jobs}>" end |
#join(max_wait = nil) ⇒ Object
75 76 77 78 79 80 81 |
# File 'lib/bixby-common/util/thread_pool.rb', line 75 def join(max_wait = nil) results = @workers.map { |w| w.join(max_wait) } @workers.clear @size = 0 return results end |
#num_busy ⇒ Object Also known as: num_working
58 59 60 61 62 |
# File 'lib/bixby-common/util/thread_pool.rb', line 58 def num_busy @lock.synchronize do return @workers.find_all{ |w| w.working? }.size end end |
#num_idle ⇒ Object
54 55 56 |
# File 'lib/bixby-common/util/thread_pool.rb', line 54 def num_idle @size - num_busy end |
#num_jobs ⇒ Object
50 51 52 |
# File 'lib/bixby-common/util/thread_pool.rb', line 50 def num_jobs @input_queue.size end |
#perform(&block) ⇒ Object
40 41 42 43 |
# File 'lib/bixby-common/util/thread_pool.rb', line 40 def perform(&block) enqueue(:perform, block) nil end |
#resize(new_size) ⇒ Object
134 135 136 137 138 139 140 141 142 143 144 |
# File 'lib/bixby-common/util/thread_pool.rb', line 134 def resize(new_size) @lock.synchronize do if new_size > @size (new_size - @size) elsif new_size < @size contract(@size - new_size) end end nil end |
#shutdown(&block) ⇒ Object
65 66 67 68 69 70 71 72 73 |
# File 'lib/bixby-common/util/thread_pool.rb', line 65 def shutdown(&block) @lock.synchronize do @size.times do enqueue(:shutdown, block) end end nil end |
#size ⇒ Object
100 101 102 103 104 |
# File 'lib/bixby-common/util/thread_pool.rb', line 100 def size @lock.synchronize do return @size end end |
#summary ⇒ Object
For debugging
147 148 149 150 151 152 153 154 155 |
# File 'lib/bixby-common/util/thread_pool.rb', line 147 def summary @lock.synchronize do puts "jobs: #{@input_queue.size}" puts "workers: #{@workers.size}" @workers.each do |w| puts " " + w.thread.inspect end end end |
#to_s ⇒ Object
96 97 98 |
# File 'lib/bixby-common/util/thread_pool.rb', line 96 def to_s inspect end |