Class: Bosh::ThreadPool
- Inherits:
-
Object
- Object
- Bosh::ThreadPool
- Defined in:
- lib/common/thread_pool.rb
Instance Method Summary collapse
- #create_thread ⇒ Object
-
#initialize(options = {}) ⇒ ThreadPool
constructor
A new instance of ThreadPool.
- #pause ⇒ Object
- #process(&block) ⇒ Object
- #raise_worker_exception(exception) ⇒ Object
- #resume ⇒ Object
- #shutdown ⇒ Object
- #wait ⇒ Object
- #working? ⇒ Boolean
- #wrap ⇒ Object
Constructor Details
#initialize(options = {}) ⇒ ThreadPool
Returns a new instance of ThreadPool.
5 6 7 8 9 10 11 12 13 14 15 16 |
# File 'lib/common/thread_pool.rb', line 5 def initialize( = {}) @actions = [] @lock = Mutex.new @cv = ConditionVariable.new @max_threads = [:max_threads] || 1 @available_threads = @max_threads @logger = [:logger] @boom = nil @original_thread = Thread.current @threads = [] @state = :open end |
Instance Method Details
#create_thread ⇒ Object
60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 |
# File 'lib/common/thread_pool.rb', line 60 def create_thread thread = Thread.new do begin loop do action = nil @lock.synchronize do action = @actions.shift unless @boom unless action @logger.debug('Thread is no longer needed, cleaning up') @available_threads += 1 @threads.delete(thread) if @state == :open end end break unless action begin action.call rescue Exception => e raise_worker_exception(e) end end end @lock.synchronize { @cv.signal unless working? } end @threads << thread end |
#pause ⇒ Object
27 28 29 30 31 |
# File 'lib/common/thread_pool.rb', line 27 def pause @lock.synchronize do @state = :paused end end |
#process(&block) ⇒ Object
43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 |
# File 'lib/common/thread_pool.rb', line 43 def process(&block) @lock.synchronize do @actions << block if @state == :open if @available_threads > 0 @logger.debug('Creating new thread') @available_threads -= 1 create_thread else @logger.debug('All threads are currently busy, queuing action') end elsif @state == :paused @logger.debug('Pool is paused, queueing action') end end end |
#raise_worker_exception(exception) ⇒ Object
88 89 90 91 92 93 94 95 96 97 |
# File 'lib/common/thread_pool.rb', line 88 def raise_worker_exception(exception) if exception.respond_to?(:backtrace) @logger.error("Worker thread raised exception: #{exception} - #{exception.backtrace.join("\n")}") else @logger.error("Worker thread raised exception: #{exception}") end @lock.synchronize do @boom = exception if @boom.nil? end end |
#resume ⇒ Object
33 34 35 36 37 38 39 40 41 |
# File 'lib/common/thread_pool.rb', line 33 def resume @lock.synchronize do @state = :open [@available_threads, @actions.size].min.times do @available_threads -= 1 create_thread end end end |
#shutdown ⇒ Object
111 112 113 114 115 116 117 118 119 120 |
# File 'lib/common/thread_pool.rb', line 111 def shutdown return if @state == :closed @logger.debug('Shutting down pool') @lock.synchronize do return if @state == :closed @state = :closed @actions.clear end @threads.each { |t| t.join } end |
#wait ⇒ Object
103 104 105 106 107 108 109 |
# File 'lib/common/thread_pool.rb', line 103 def wait @logger.debug('Waiting for tasks to complete') @lock.synchronize do @cv.wait(@lock) while working? raise @boom if @boom end end |
#working? ⇒ Boolean
99 100 101 |
# File 'lib/common/thread_pool.rb', line 99 def working? @boom.nil? && (@available_threads != @max_threads || !@actions.empty?) end |
#wrap ⇒ Object
18 19 20 21 22 23 24 25 |
# File 'lib/common/thread_pool.rb', line 18 def wrap begin yield self wait ensure shutdown end end |