Class: Bixby::ThreadPool
- Inherits:
-
Object
show all
- Includes:
- Log
- Defined in:
- lib/bixby-common/util/thread_pool.rb,
lib/bixby-common/util/thread_pool/task.rb,
lib/bixby-common/util/thread_pool/worker.rb
Defined Under Namespace
Classes: Task, Worker
Constant Summary
collapse
- DEFAULT_MIN =
1
- DEFAULT_MAX =
8
- DEFAULT_IDLE_TIMEOUT =
60
Instance Method Summary
collapse
Methods included from Log
bin_regex, clean_ex, clean_ex_for_console, console_appender?, gems_regex, #log, ruby_regex, setup_logger
Constructor Details
#initialize(options = {}) ⇒ ThreadPool
Returns a new instance of ThreadPool.
19
20
21
22
23
24
25
26
27
28
29
|
# File 'lib/bixby-common/util/thread_pool.rb', line 19
def initialize(options = {})
@input_queue = Queue.new
@lock = Monitor.new
@workers = []
@min_size = options[:min_size] || DEFAULT_MIN
@max_size = options[:max_size] || DEFAULT_MAX
@idle_timeout = options[:idle_timeout] || DEFAULT_IDLE_TIMEOUT
@size = 0
expand(@min_size)
end
|
Instance Method Details
#<<(proc) ⇒ Object
45
46
47
48
|
# File 'lib/bixby-common/util/thread_pool.rb', line 45
def <<(proc)
enqueue(:perform, block)
nil
end
|
#contract(count, &block) ⇒ Object
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
|
# File 'lib/bixby-common/util/thread_pool.rb', line 117
def contract(count, &block)
@lock.synchronize do
raise 'Count is too large.' if count > @size
count.times do
callback = Proc.new do |worker|
remove_worker(worker)
block.call if block
end
enqueue(:shutdown, callback)
end
end
nil
end
|
#dispose ⇒ Object
83
84
85
86
87
88
89
90
|
# File 'lib/bixby-common/util/thread_pool.rb', line 83
def dispose
@lock.synchronize do
shutdown
join
end
nil
end
|
#enqueue(command, block = nil) ⇒ Object
31
32
33
34
35
36
37
38
|
# File 'lib/bixby-common/util/thread_pool.rb', line 31
def enqueue(command, block=nil)
logger.debug { "enqueue new task: #{command}" }
@input_queue.push(Task.new(command, block))
if command == :perform then
grow
end
nil
end
|
#expand(count) ⇒ Object
106
107
108
109
110
111
112
113
114
115
|
# File 'lib/bixby-common/util/thread_pool.rb', line 106
def expand(count)
@lock.synchronize do
logger.debug "expanding by #{count} threads (from #{@size})"
count.times do
create_worker
end
end
nil
end
|
#inspect ⇒ Object
92
93
94
|
# File 'lib/bixby-common/util/thread_pool.rb', line 92
def inspect
"#<#{self.class.to_s}:0x#{(object_id << 1).to_s(16)} threads=#{size} jobs=#{num_jobs}>"
end
|
#join(max_wait = nil) ⇒ Object
75
76
77
78
79
80
81
|
# File 'lib/bixby-common/util/thread_pool.rb', line 75
def join(max_wait = nil)
results = @workers.map { |w| w.join(max_wait) }
@workers.clear
@size = 0
return results
end
|
#num_busy ⇒ Object
Also known as:
num_working
58
59
60
61
62
|
# File 'lib/bixby-common/util/thread_pool.rb', line 58
def num_busy
@lock.synchronize do
return @workers.find_all{ |w| w.working? }.size
end
end
|
#num_idle ⇒ Object
54
55
56
|
# File 'lib/bixby-common/util/thread_pool.rb', line 54
def num_idle
@size - num_busy
end
|
#num_jobs ⇒ Object
50
51
52
|
# File 'lib/bixby-common/util/thread_pool.rb', line 50
def num_jobs
@input_queue.size
end
|
40
41
42
43
|
# File 'lib/bixby-common/util/thread_pool.rb', line 40
def perform(&block)
enqueue(:perform, block)
nil
end
|
#resize(new_size) ⇒ Object
134
135
136
137
138
139
140
141
142
143
144
|
# File 'lib/bixby-common/util/thread_pool.rb', line 134
def resize(new_size)
@lock.synchronize do
if new_size > @size
expand(new_size - @size)
elsif new_size < @size
contract(@size - new_size)
end
end
nil
end
|
#shutdown(&block) ⇒ Object
65
66
67
68
69
70
71
72
73
|
# File 'lib/bixby-common/util/thread_pool.rb', line 65
def shutdown(&block)
@lock.synchronize do
@size.times do
enqueue(:shutdown, block)
end
end
nil
end
|
#size ⇒ Object
100
101
102
103
104
|
# File 'lib/bixby-common/util/thread_pool.rb', line 100
def size
@lock.synchronize do
return @size
end
end
|
#summary ⇒ Object
147
148
149
150
151
152
153
154
155
|
# File 'lib/bixby-common/util/thread_pool.rb', line 147
def summary
@lock.synchronize do
puts "jobs: #{@input_queue.size}"
puts "workers: #{@workers.size}"
@workers.each do |w|
puts " " + w.thread.inspect
end
end
end
|
#to_s ⇒ Object
96
97
98
|
# File 'lib/bixby-common/util/thread_pool.rb', line 96
def to_s
inspect
end
|