Class: Thpool

Inherits:
Object
  • Object
show all
Defined in:
lib/thpool.rb

Defined Under Namespace

Classes: Worker

Constant Summary collapse

OPTIONS_DEFAULT =

Constants ============================================================

{
  :worker_class => Thpool::Worker,
  :workers_min => 0,
  :workers_max => 20,
  :count_per_worker => 1,
  :args => [ ]
}

Instance Method Summary collapse

Constructor Details

#initialize(options = nil) ⇒ Thpool

Creates a new instance of Pool with an optional set of options:

  • :worker_class - The class of worker to spawn when tasks arrive.

  • :workers_min - The minimum number of workers to have running.

  • :workers_max - The maximum number of workers to spawn.

  • :count_per_worker - Ratio of items in queue to workers.

  • :args - An array of arguments to be passed through to the workers.



30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
# File 'lib/thpool.rb', line 30

def initialize(options = nil)
  @queue = Queue.new
  @workers = [ ]

  options = options ? OPTIONS_DEFAULT.merge(options) : OPTIONS_DEFAULT

  @worker_class = options[:worker_class]
  @workers_min = options[:workers_min]
  @workers_max = options[:workers_max]
  @count_per_worker = options[:count_per_worker]
  @args = options[:args]

  @workers_min.times do
    self.worker_create!
  end
end

Instance Method Details

#block_popObject

Makes a blocking call to pop an item from the queue, returning that item. If the queue is empty, also has the effect of sleeping the calling thread until something is pushed into the queue.



68
69
70
# File 'lib/thpool.rb', line 68

def block_pop
  @queue.pop
end

#busy?Boolean

Returns true if there is some outstanding work to be performed, false otherwise.

Returns:

  • (Boolean)


99
100
101
# File 'lib/thpool.rb', line 99

def busy?
  @queue.num_waiting < @workers.length
end

#performObject

Schedules a block to be acted upon by the workers.



126
127
128
129
130
131
132
133
134
# File 'lib/thpool.rb', line 126

def perform
  @queue << Proc.new

  if (self.workers_count < self.workers_needed)
    self.worker_create!
  end

  true
end

#queue?Boolean

Returns true if anything is queued, false otherwise. Note that this does not count anything that might be active within a worker.

Returns:

  • (Boolean)


110
111
112
# File 'lib/thpool.rb', line 110

def queue?
  @queue.length > 0
end

#queue_sizeObject

Returns the number of items in the queue. Note that this does not count anything that might be active within a worker.



116
117
118
# File 'lib/thpool.rb', line 116

def queue_size
  @queue.size
end

#report_exception!(worker, exception, block = nil) ⇒ Object

Receives reports of exceptions from workers. Default behavior is to re-raise.



121
122
123
# File 'lib/thpool.rb', line 121

def report_exception!(worker, exception, block = nil)
  raise(exception)
end

#waitingObject

Returns the number of workers that are waiting for something to do.



104
105
106
# File 'lib/thpool.rb', line 104

def waiting
  @queue.num_waiting
end

#worker_finished!(worker) ⇒ Object

Called by a worker when it’s finished. Should not be called otherwise.



137
138
139
140
# File 'lib/thpool.rb', line 137

def worker_finished!(worker)
  @workers.delete(worker)
  worker.join
end

#worker_needed?(worker) ⇒ Boolean

Returns:

  • (Boolean)


78
79
80
# File 'lib/thpool.rb', line 78

def worker_needed?(worker)
  @queue.length > 0 or @workers.length <= self.workers_needed
end

#workersObject

Returns an array of the current workers.



83
84
85
# File 'lib/thpool.rb', line 83

def workers
  @workers.dup
end

#workers?Boolean

Returns true if there are any workers, false otherwise.

Returns:

  • (Boolean)


93
94
95
# File 'lib/thpool.rb', line 93

def workers?
  @workers.any?
end

#workers_countObject

Returns the current number of workers.



48
49
50
# File 'lib/thpool.rb', line 48

def workers_count
  @workers.length
end

#workers_neededObject

Returns the number of workers required for the current loading.



53
54
55
56
57
58
59
60
61
62
63
# File 'lib/thpool.rb', line 53

def workers_needed
  n = ((@queue.length + @workers.length - @queue.num_waiting) / @count_per_worker)

  if (n > @workers_max)
    @workers_max
  elsif (n < @workers_min)
    @workers_min
  else
    n
  end  
end

#workers_needed?Boolean

Returns true if more workers are needed to satisfy the current backlog, or false otherwise.

Returns:

  • (Boolean)


74
75
76
# File 'lib/thpool.rb', line 74

def workers_needed?
  @workers.length < self.workers_needed
end