Class: Concurrent::CachedThreadPool
- Inherits:
-
Object
- Object
- Concurrent::CachedThreadPool
- Defined in:
- lib/concurrent/cached_thread_pool.rb,
lib/concurrent/cached_thread_pool/worker.rb
Defined Under Namespace
Classes: Worker
Constant Summary collapse
- MIN_POOL_SIZE =
1- MAX_POOL_SIZE =
256- DEFAULT_THREAD_IDLETIME =
60
Instance Attribute Summary collapse
-
#max_threads ⇒ Object
Returns the value of attribute max_threads.
Instance Method Summary collapse
- #<<(block) ⇒ Object
-
#initialize(opts = {}) ⇒ CachedThreadPool
constructor
A new instance of CachedThreadPool.
- #kill ⇒ Object
- #length ⇒ Object
- #on_end_task(worker) ⇒ Object
- #on_worker_exit(worker) ⇒ Object
- #post(*args, &block) ⇒ Object
- #running? ⇒ Boolean
- #shutdown ⇒ Object
- #wait_for_termination(timeout = nil) ⇒ Object
Constructor Details
#initialize(opts = {}) ⇒ CachedThreadPool
Returns a new instance of CachedThreadPool.
17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
# File 'lib/concurrent/cached_thread_pool.rb', line 17 def initialize(opts = {}) @idletime = (opts[:idletime] || DEFAULT_THREAD_IDLETIME).to_i raise ArgumentError.new('idletime must be greater than zero') if @idletime <= 0 @max_threads = opts[:max_threads] || opts[:max] || MAX_POOL_SIZE if @max_threads < MIN_POOL_SIZE || @max_threads > MAX_POOL_SIZE raise ArgumentError.new("size must be from #{MIN_POOL_SIZE} to #{MAX_POOL_SIZE}") end @state = :running @pool = [] @terminator = Event.new @mutex = Mutex.new @busy = [] @idle = [] end |
Instance Attribute Details
#max_threads ⇒ Object
Returns the value of attribute max_threads.
15 16 17 |
# File 'lib/concurrent/cached_thread_pool.rb', line 15 def max_threads @max_threads end |
Instance Method Details
#<<(block) ⇒ Object
35 36 37 38 |
# File 'lib/concurrent/cached_thread_pool.rb', line 35 def <<(block) self.post(&block) return self end |
#kill ⇒ Object
85 86 87 88 89 90 91 92 93 |
# File 'lib/concurrent/cached_thread_pool.rb', line 85 def kill @mutex.synchronize do break if @state == :shutdown @state = :shutdown @idle.each{|worker| worker.kill } @busy.each{|worker| worker.kill } @terminator.set end end |
#length ⇒ Object
95 96 97 98 99 |
# File 'lib/concurrent/cached_thread_pool.rb', line 95 def length @mutex.synchronize do @state == :running ? @busy.length + @idle.length : 0 end end |
#on_end_task(worker) ⇒ Object
112 113 114 115 116 117 118 |
# File 'lib/concurrent/cached_thread_pool.rb', line 112 def on_end_task(worker) @mutex.synchronize do break unless @state == :running @busy.delete(worker) @idle.push(worker) end end |
#on_worker_exit(worker) ⇒ Object
101 102 103 104 105 106 107 108 109 110 |
# File 'lib/concurrent/cached_thread_pool.rb', line 101 def on_worker_exit(worker) @mutex.synchronize do @idle.delete(worker) @busy.delete(worker) if @idle.empty? && @busy.empty? && @state != :running @state = :shutdown @terminator.set end end end |
#post(*args, &block) ⇒ Object
40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 |
# File 'lib/concurrent/cached_thread_pool.rb', line 40 def post(*args, &block) raise ArgumentError.new('no block given') if block.nil? @mutex.synchronize do break false unless @state == :running if @idle.empty? if @idle.length + @busy.length < @max_threads worker = create_worker_thread else worker = @busy.shift end else worker = @idle.pop end @busy.push(worker) worker.signal(*args, &block) prune_stale_workers true end end |
#running? ⇒ Boolean
63 64 65 |
# File 'lib/concurrent/cached_thread_pool.rb', line 63 def running? return @state == :running end |
#shutdown ⇒ Object
71 72 73 74 75 76 77 78 79 80 81 82 83 |
# File 'lib/concurrent/cached_thread_pool.rb', line 71 def shutdown @mutex.synchronize do break unless @state == :running if @idle.empty? && @busy.empty? @state = :shutdown @terminator.set else @state = :shuttingdown @idle.each{|worker| worker.stop } @busy.each{|worker| worker.stop } end end end |
#wait_for_termination(timeout = nil) ⇒ Object
67 68 69 |
# File 'lib/concurrent/cached_thread_pool.rb', line 67 def wait_for_termination(timeout = nil) return @terminator.wait(timeout) end |