Class: ThreadPool

Inherits:
Object
  • Object
show all
Defined in:
lib/bulkmail/thread_pool.rb

Overview

This class implements a pool of threads. The pool hands out blocks to be processed to any available worker thread. If no worker thread is available, the pool blocks until any of the workers have become available. The pool size can be specified when constructing the pool. For exmaple:

pool = ThreadPool.new(10)
10.times {pool.process {sleep 3}}
10.times {pool.process {sleep 3}} # this will block for about 3 seconds.
pool.join # this will block for about 3 more seconds.

Defined Under Namespace

Classes: Worker

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(max_size = 10) ⇒ ThreadPool

Creates a new pool with a default maximum size of 10.



59
60
61
62
63
# File 'lib/bulkmail/thread_pool.rb', line 59

def initialize(max_size = 10)
  @max_size = max_size
  @workers = []
  @mutex = Mutex.new
end

Instance Attribute Details

#max_sizeObject

Returns the value of attribute max_size.



55
56
57
# File 'lib/bulkmail/thread_pool.rb', line 55

def max_size
  @max_size
end

#workersObject (readonly)

Returns the value of attribute workers.



56
57
58
# File 'lib/bulkmail/thread_pool.rb', line 56

def workers
  @workers
end

Instance Method Details

#busy?Boolean

Returns true if any of the workers in the pool are busy.

Returns:

  • (Boolean)


71
72
73
# File 'lib/bulkmail/thread_pool.rb', line 71

def busy?
  @mutex.synchronize {@workers.any? {|w| w.busy?}}
end

#create_workerObject

Creates a worker if the maximum pool size has not been reached.



107
108
109
110
111
112
# File 'lib/bulkmail/thread_pool.rb', line 107

def create_worker
  return nil if @workers.size >= @max_size
  worker = Worker.new
  @workers << worker
  worker
end

#find_available_workerObject

Returns an available worker or a new worker if the maximum pool size has not been reached.



97
98
99
# File 'lib/bulkmail/thread_pool.rb', line 97

def find_available_worker
  @mutex.synchronize {free_worker || create_worker}
end

#free_workerObject

Returns the first available worker.



102
103
104
# File 'lib/bulkmail/thread_pool.rb', line 102

def free_worker
  @workers.each {|w| return w unless w.busy?}; nil
end

#joinObject

Blocks until all workers have become available.



76
77
78
# File 'lib/bulkmail/thread_pool.rb', line 76

def join
  sleep 0.01 while busy?
end

#process(&block) ⇒ Object

Hands the block to an available worker. If no workers are available, this method will block until one becomes available.



82
83
84
# File 'lib/bulkmail/thread_pool.rb', line 82

def process(&block)
  wait_for_worker.set_block(block)
end

#sizeObject

Returns the number of workers in the pool.



66
67
68
# File 'lib/bulkmail/thread_pool.rb', line 66

def size
  @mutex.synchronize {@workers.size}
end

#wait_for_workerObject

Waits for a worker to become available.



87
88
89
90
91
92
93
# File 'lib/bulkmail/thread_pool.rb', line 87

def wait_for_worker
  while true
    worker = find_available_worker
    return worker if worker
    sleep 0.01
  end
end