Class: ThreadPool
- Inherits:
-
Object
- Object
- ThreadPool
- Defined in:
- lib/bulkmail/thread_pool.rb
Overview
This class implements a pool of threads. The pool hands out blocks to be processed to any available worker thread. If no worker thread is available, the pool blocks until any of the workers have become available. The pool size can be specified when constructing the pool. For exmaple:
pool = ThreadPool.new(10)
10.times {pool.process {sleep 3}}
10.times {pool.process {sleep 3}} # this will block for about 3 seconds.
pool.join # this will block for about 3 more seconds.
Defined Under Namespace
Classes: Worker
Instance Attribute Summary collapse
-
#max_size ⇒ Object
Returns the value of attribute max_size.
-
#workers ⇒ Object
readonly
Returns the value of attribute workers.
Instance Method Summary collapse
-
#busy? ⇒ Boolean
Returns true if any of the workers in the pool are busy.
-
#create_worker ⇒ Object
Creates a worker if the maximum pool size has not been reached.
-
#find_available_worker ⇒ Object
Returns an available worker or a new worker if the maximum pool size has not been reached.
-
#free_worker ⇒ Object
Returns the first available worker.
-
#initialize(max_size = 10) ⇒ ThreadPool
constructor
Creates a new pool with a default maximum size of 10.
-
#join ⇒ Object
Blocks until all workers have become available.
-
#process(&block) ⇒ Object
Hands the block to an available worker.
-
#size ⇒ Object
Returns the number of workers in the pool.
-
#wait_for_worker ⇒ Object
Waits for a worker to become available.
Constructor Details
#initialize(max_size = 10) ⇒ ThreadPool
Creates a new pool with a default maximum size of 10.
59 60 61 62 63 |
# File 'lib/bulkmail/thread_pool.rb', line 59 def initialize(max_size = 10) @max_size = max_size @workers = [] @mutex = Mutex.new end |
Instance Attribute Details
#max_size ⇒ Object
Returns the value of attribute max_size.
55 56 57 |
# File 'lib/bulkmail/thread_pool.rb', line 55 def max_size @max_size end |
#workers ⇒ Object (readonly)
Returns the value of attribute workers.
56 57 58 |
# File 'lib/bulkmail/thread_pool.rb', line 56 def workers @workers end |
Instance Method Details
#busy? ⇒ Boolean
Returns true if any of the workers in the pool are busy.
71 72 73 |
# File 'lib/bulkmail/thread_pool.rb', line 71 def busy? @mutex.synchronize {@workers.any? {|w| w.busy?}} end |
#create_worker ⇒ Object
Creates a worker if the maximum pool size has not been reached.
107 108 109 110 111 112 |
# File 'lib/bulkmail/thread_pool.rb', line 107 def create_worker return nil if @workers.size >= @max_size worker = Worker.new @workers << worker worker end |
#find_available_worker ⇒ Object
Returns an available worker or a new worker if the maximum pool size has not been reached.
97 98 99 |
# File 'lib/bulkmail/thread_pool.rb', line 97 def find_available_worker @mutex.synchronize {free_worker || create_worker} end |
#free_worker ⇒ Object
Returns the first available worker.
102 103 104 |
# File 'lib/bulkmail/thread_pool.rb', line 102 def free_worker @workers.each {|w| return w unless w.busy?}; nil end |
#join ⇒ Object
Blocks until all workers have become available.
76 77 78 |
# File 'lib/bulkmail/thread_pool.rb', line 76 def join sleep 0.01 while busy? end |
#process(&block) ⇒ Object
Hands the block to an available worker. If no workers are available, this method will block until one becomes available.
82 83 84 |
# File 'lib/bulkmail/thread_pool.rb', line 82 def process(&block) wait_for_worker.set_block(block) end |
#size ⇒ Object
Returns the number of workers in the pool.
66 67 68 |
# File 'lib/bulkmail/thread_pool.rb', line 66 def size @mutex.synchronize {@workers.size} end |
#wait_for_worker ⇒ Object
Waits for a worker to become available.
87 88 89 90 91 92 93 |
# File 'lib/bulkmail/thread_pool.rb', line 87 def wait_for_worker while true worker = find_available_worker return worker if worker sleep 0.01 end end |