Class: Concurrent::FixedThreadPool
- Inherits:
-
Object
- Object
- Concurrent::FixedThreadPool
- Defined in:
- lib/concurrent/fixed_thread_pool.rb,
lib/concurrent/fixed_thread_pool/worker.rb
Defined Under Namespace
Classes: Worker
Constant Summary collapse
- MIN_POOL_SIZE =
1- MAX_POOL_SIZE =
256
Instance Attribute Summary collapse
-
#max_threads ⇒ Object
Returns the value of attribute max_threads.
Instance Method Summary collapse
- #<<(block) ⇒ Object
- #clean_pool ⇒ Object
- #create_worker_thread ⇒ Object
- #drain_pool ⇒ Object
- #fill_pool ⇒ Object
-
#initialize(size, opts = {}) ⇒ FixedThreadPool
constructor
A new instance of FixedThreadPool.
- #kill ⇒ Object
- #length ⇒ Object
- #on_end_task(worker) ⇒ Object
- #on_start_task(worker) ⇒ Object
- #on_worker_exit(worker) ⇒ Object
- #post(*args, &block) ⇒ Object
- #running? ⇒ Boolean
- #shutdown ⇒ Object
- #wait_for_termination(timeout = nil) ⇒ Object
Constructor Details
#initialize(size, opts = {}) ⇒ FixedThreadPool
Returns a new instance of FixedThreadPool.
15 16 17 18 19 20 21 22 23 24 25 26 |
# File 'lib/concurrent/fixed_thread_pool.rb', line 15 def initialize(size, opts = {}) @max_threads = size || MAX_POOL_SIZE if @max_threads < MIN_POOL_SIZE || @max_threads > MAX_POOL_SIZE raise ArgumentError.new("size must be from #{MIN_POOL_SIZE} to #{MAX_POOL_SIZE}") end @state = :running @pool = [] @terminator = Event.new @queue = Queue.new @mutex = Mutex.new end |
Instance Attribute Details
#max_threads ⇒ Object
Returns the value of attribute max_threads.
13 14 15 |
# File 'lib/concurrent/fixed_thread_pool.rb', line 13 def max_threads @max_threads end |
Instance Method Details
#<<(block) ⇒ Object
47 48 49 50 |
# File 'lib/concurrent/fixed_thread_pool.rb', line 47 def <<(block) self.post(&block) return self end |
#clean_pool ⇒ Object
98 99 100 |
# File 'lib/concurrent/fixed_thread_pool.rb', line 98 def clean_pool @pool.reject! {|worker| worker.dead? } end |
#create_worker_thread ⇒ Object
81 82 83 84 85 86 87 88 89 |
# File 'lib/concurrent/fixed_thread_pool.rb', line 81 def create_worker_thread wrkr = Worker.new(@queue, self) Thread.new(wrkr, self) do |worker, parent| Thread.current.abort_on_exception = false worker.run parent.on_worker_exit(worker) end return wrkr end |
#drain_pool ⇒ Object
102 103 104 105 |
# File 'lib/concurrent/fixed_thread_pool.rb', line 102 def drain_pool @pool.each {|worker| worker.kill } @pool.clear end |
#fill_pool ⇒ Object
91 92 93 94 95 96 |
# File 'lib/concurrent/fixed_thread_pool.rb', line 91 def fill_pool return unless @state == :running while @pool.length < @max_threads @pool << create_worker_thread end end |
#kill ⇒ Object
65 66 67 68 69 70 71 72 73 |
# File 'lib/concurrent/fixed_thread_pool.rb', line 65 def kill @mutex.synchronize do break if @state == :shutdown @state = :shutdown @queue.clear drain_pool @terminator.set end end |
#length ⇒ Object
75 76 77 78 79 |
# File 'lib/concurrent/fixed_thread_pool.rb', line 75 def length @mutex.synchronize do @state == :running ? @pool.length : 0 end end |
#on_end_task(worker) ⇒ Object
110 111 112 113 114 115 116 |
# File 'lib/concurrent/fixed_thread_pool.rb', line 110 def on_end_task(worker) @mutex.synchronize do break unless @state == :running clean_pool fill_pool end end |
#on_start_task(worker) ⇒ Object
107 108 |
# File 'lib/concurrent/fixed_thread_pool.rb', line 107 def on_start_task(worker) end |
#on_worker_exit(worker) ⇒ Object
118 119 120 121 122 123 124 125 126 |
# File 'lib/concurrent/fixed_thread_pool.rb', line 118 def on_worker_exit(worker) @mutex.synchronize do @pool.delete(worker) if @pool.empty? && @state != :running @state = :shutdown @terminator.set end end end |
#post(*args, &block) ⇒ Object
36 37 38 39 40 41 42 43 44 45 |
# File 'lib/concurrent/fixed_thread_pool.rb', line 36 def post(*args, &block) raise ArgumentError.new('no block given') if block.nil? @mutex.synchronize do break false unless @state == :running @queue << [args, block] clean_pool fill_pool true end end |
#running? ⇒ Boolean
28 29 30 |
# File 'lib/concurrent/fixed_thread_pool.rb', line 28 def running? return @state == :running end |
#shutdown ⇒ Object
52 53 54 55 56 57 58 59 60 61 62 63 |
# File 'lib/concurrent/fixed_thread_pool.rb', line 52 def shutdown @mutex.synchronize do break unless @state == :running if @pool.empty? @state = :shutdown @terminator.set else @state = :shuttingdown @pool.length.times{ @queue << :stop } end end end |
#wait_for_termination(timeout = nil) ⇒ Object
32 33 34 |
# File 'lib/concurrent/fixed_thread_pool.rb', line 32 def wait_for_termination(timeout = nil) return @terminator.wait(timeout) end |