Class: Workers::Pool

Inherits:
Object
  • Object
show all
Includes:
Helpers
Defined in:
lib/workers/pool.rb

Constant Summary collapse

DEFAULT_POOL_SIZE =
20

Instance Method Summary collapse

Methods included from Helpers

#concat_e, #log_debug, #log_error, #log_info, #log_warn

Constructor Details

#initialize(options = {}) ⇒ Pool

Returns a new instance of Pool.



7
8
9
10
11
12
13
14
15
16
17
18
19
# File 'lib/workers/pool.rb', line 7

def initialize(options = {})
  @logger = Workers::LogProxy.new(options[:logger])
  @worker_class = options[:worker_class] || Workers::Worker
  @input_queue = Queue.new
  @lock = Monitor.new
  @workers = Set.new
  @size = 0
  @exception_callback = options[:on_exception]

  expand(options[:size] || Workers::Pool::DEFAULT_POOL_SIZE)

  nil
end

Instance Method Details

#contract(count, &block) ⇒ Object



79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
# File 'lib/workers/pool.rb', line 79

def contract(count, &block)
  @lock.synchronize do
    raise Workers::PoolSizeError, 'Count is too large.' if count > @size

    count.times do
      callback = Proc.new do |worker|
        remove_worker(worker)
        block.call if block
      end

      enqueue(:shutdown, callback)
      @size -= 1
    end
  end

  nil
end

#dispose(max_wait = nil) ⇒ Object



51
52
53
54
# File 'lib/workers/pool.rb', line 51

def dispose(max_wait = nil)
  shutdown  
  join(max_wait)
end

#enqueue(command, data = nil) ⇒ Object



21
22
23
24
25
# File 'lib/workers/pool.rb', line 21

def enqueue(command, data = nil)
  @input_queue.push(Event.new(command, data))

  nil
end

#expand(count) ⇒ Object



66
67
68
69
70
71
72
73
74
75
76
77
# File 'lib/workers/pool.rb', line 66

def expand(count)
  @lock.synchronize do
    count.times do
      worker = @worker_class.new(:input_queue => @input_queue, :die_on_exception => false,
                                 :on_exception => @exception_callback, :logger => @logger)
      @workers << worker
      @size += 1
    end
  end

  nil
end

#inspectObject



56
57
58
# File 'lib/workers/pool.rb', line 56

def inspect
  "#<#{self.class.to_s}:0x#{(object_id << 1).to_s(16)} size=#{size}>"
end

#join(max_wait = nil) ⇒ Object



43
44
45
46
47
48
49
# File 'lib/workers/pool.rb', line 43

def join(max_wait = nil)
  results = @workers.map { |w| w.join(max_wait) }
  @workers.clear
  @size = 0

  results
end

#perform(&block) ⇒ Object



27
28
29
30
31
# File 'lib/workers/pool.rb', line 27

def perform(&block)
  enqueue(:perform, block)

  nil
end

#resize(new_size) ⇒ Object



97
98
99
100
101
102
103
104
105
106
107
# File 'lib/workers/pool.rb', line 97

def resize(new_size)
  @lock.synchronize do
    if new_size > @size
      expand(new_size - @size)
    elsif new_size < @size
      contract(@size - new_size)
    end
  end

  nil
end

#shutdown(&block) ⇒ Object



33
34
35
36
37
38
39
40
41
# File 'lib/workers/pool.rb', line 33

def shutdown(&block)
  @lock.synchronize do
    @size.times do
      enqueue(:shutdown, block)
    end
  end

  nil
end

#sizeObject



60
61
62
63
64
# File 'lib/workers/pool.rb', line 60

def size
  @lock.synchronize do
    @size
  end
end