Class: Bixby::ThreadPool

Inherits:
Object
  • Object
show all
Includes:
Log
Defined in:
lib/bixby-common/util/thread_pool.rb,
lib/bixby-common/util/thread_pool/task.rb,
lib/bixby-common/util/thread_pool/worker.rb

Defined Under Namespace

Classes: Task, Worker

Constant Summary collapse

DEFAULT_MIN =
1
DEFAULT_MAX =
8
DEFAULT_IDLE_TIMEOUT =
60

Instance Method Summary collapse

Methods included from Log

bin_regex, clean_ex, clean_ex_for_console, console_appender?, gems_regex, #log, ruby_regex, setup_logger

Constructor Details

#initialize(options = {}) ⇒ ThreadPool

Returns a new instance of ThreadPool.



19
20
21
22
23
24
25
26
27
28
29
# File 'lib/bixby-common/util/thread_pool.rb', line 19

def initialize(options = {})
  @input_queue = Queue.new
  @lock = Monitor.new
  @workers = []
  @min_size = options[:min_size] || DEFAULT_MIN
  @max_size = options[:max_size] || DEFAULT_MAX
  @idle_timeout = options[:idle_timeout] || DEFAULT_IDLE_TIMEOUT
  @size = 0

  expand(@min_size)
end

Instance Method Details

#<<(proc) ⇒ Object



45
46
47
48
# File 'lib/bixby-common/util/thread_pool.rb', line 45

def <<(proc)
  enqueue(:perform, block)
  nil
end

#contract(count, &block) ⇒ Object



117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
# File 'lib/bixby-common/util/thread_pool.rb', line 117

def contract(count, &block)
  @lock.synchronize do
    raise 'Count is too large.' if count > @size

    count.times do
      callback = Proc.new do |worker|
        remove_worker(worker)
        block.call if block
      end

      enqueue(:shutdown, callback)
    end
  end

  nil
end

#disposeObject



83
84
85
86
87
88
89
90
# File 'lib/bixby-common/util/thread_pool.rb', line 83

def dispose
  @lock.synchronize do
    shutdown
    join
  end

  nil
end

#enqueue(command, block = nil) ⇒ Object



31
32
33
34
35
36
37
38
# File 'lib/bixby-common/util/thread_pool.rb', line 31

def enqueue(command, block=nil)
  logger.debug { "enqueue new task: #{command}" }
  @input_queue.push(Task.new(command, block))
  if command == :perform then
    grow
  end
  nil
end

#expand(count) ⇒ Object



106
107
108
109
110
111
112
113
114
115
# File 'lib/bixby-common/util/thread_pool.rb', line 106

def expand(count)
  @lock.synchronize do
    logger.debug "expanding by #{count} threads (from #{@size})"
    count.times do
      create_worker
    end
  end

  nil
end

#inspectObject



92
93
94
# File 'lib/bixby-common/util/thread_pool.rb', line 92

def inspect
  "#<#{self.class.to_s}:0x#{(object_id << 1).to_s(16)} threads=#{size} jobs=#{num_jobs}>"
end

#join(max_wait = nil) ⇒ Object



75
76
77
78
79
80
81
# File 'lib/bixby-common/util/thread_pool.rb', line 75

def join(max_wait = nil)
  results = @workers.map { |w| w.join(max_wait) }
  @workers.clear
  @size = 0

  return results
end

#num_busyObject Also known as: num_working



58
59
60
61
62
# File 'lib/bixby-common/util/thread_pool.rb', line 58

def num_busy
  @lock.synchronize do
    return @workers.find_all{ |w| w.working? }.size
  end
end

#num_idleObject



54
55
56
# File 'lib/bixby-common/util/thread_pool.rb', line 54

def num_idle
  @size - num_busy
end

#num_jobsObject



50
51
52
# File 'lib/bixby-common/util/thread_pool.rb', line 50

def num_jobs
  @input_queue.size
end

#perform(&block) ⇒ Object



40
41
42
43
# File 'lib/bixby-common/util/thread_pool.rb', line 40

def perform(&block)
  enqueue(:perform, block)
  nil
end

#resize(new_size) ⇒ Object



134
135
136
137
138
139
140
141
142
143
144
# File 'lib/bixby-common/util/thread_pool.rb', line 134

def resize(new_size)
  @lock.synchronize do
    if new_size > @size
      expand(new_size - @size)
    elsif new_size < @size
      contract(@size - new_size)
    end
  end

  nil
end

#shutdown(&block) ⇒ Object



65
66
67
68
69
70
71
72
73
# File 'lib/bixby-common/util/thread_pool.rb', line 65

def shutdown(&block)
  @lock.synchronize do
    @size.times do
      enqueue(:shutdown, block)
    end
  end

  nil
end

#sizeObject



100
101
102
103
104
# File 'lib/bixby-common/util/thread_pool.rb', line 100

def size
  @lock.synchronize do
    return @size
  end
end

#summaryObject

For debugging



147
148
149
150
151
152
153
154
155
# File 'lib/bixby-common/util/thread_pool.rb', line 147

def summary
  @lock.synchronize do
    puts "jobs: #{@input_queue.size}"
    puts "workers: #{@workers.size}"
    @workers.each do |w|
      puts "  " + w.thread.inspect
    end
  end
end

#to_sObject



96
97
98
# File 'lib/bixby-common/util/thread_pool.rb', line 96

def to_s
  inspect
end