Class: Workers::Pool

Inherits:
Object
  • Object
show all
Includes:
Helpers
Defined in:
lib/workers/pool.rb

Constant Summary collapse

DEFAULT_POOL_SIZE =
20

Instance Method Summary collapse

Methods included from Helpers

#concat_e, #log_debug, #log_error, #log_info, #log_warn

Constructor Details

#initialize(options = {}) ⇒ Pool

Returns a new instance of Pool.



7
8
9
10
11
12
13
14
15
16
17
18
# File 'lib/workers/pool.rb', line 7

def initialize(options = {})
  @logger = Workers::LogProxy.new(options[:logger])
  @worker_class = options[:worker_class] || Workers::Worker
  @input_queue = Queue.new
  @lock = Monitor.new
  @workers = Set.new
  @size = 0

  expand(options[:size] || Workers::Pool::DEFAULT_POOL_SIZE)

  return nil
end

Instance Method Details

#contract(count, &block) ⇒ Object



80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
# File 'lib/workers/pool.rb', line 80

def contract(count, &block)
  @lock.synchronize do
    raise 'Count is too large.' if count > @size

    count.times do
      callback = Proc.new do |worker|
        remove_worker(worker)
        block.call if block
      end

      enqueue(:shutdown, callback)
      @size -= 1
    end
  end

  return nil
end

#disposeObject



50
51
52
53
54
55
56
57
# File 'lib/workers/pool.rb', line 50

def dispose
  @lock.synchronize do
    shutdown
    join
  end

  return nil
end

#enqueue(command, data = nil) ⇒ Object



20
21
22
23
24
# File 'lib/workers/pool.rb', line 20

def enqueue(command, data = nil)
  @input_queue.push(Event.new(command, data))

  return nil
end

#expand(count) ⇒ Object



69
70
71
72
73
74
75
76
77
78
# File 'lib/workers/pool.rb', line 69

def expand(count)
  @lock.synchronize do
    count.times do
        @workers << @worker_class.new(:input_queue => @input_queue)
        @size += 1
    end
  end

  return nil
end

#inspectObject



59
60
61
# File 'lib/workers/pool.rb', line 59

def inspect
  return "#<#{self.class.to_s}:0x#{(object_id << 1).to_s(16)} size=#{size}>"
end

#join(max_wait = nil) ⇒ Object



42
43
44
45
46
47
48
# File 'lib/workers/pool.rb', line 42

def join(max_wait = nil)
  results = @workers.map { |w| w.join(max_wait) }
  @workers.clear
  @size = 0

  return results
end

#perform(&block) ⇒ Object



26
27
28
29
30
# File 'lib/workers/pool.rb', line 26

def perform(&block)
  enqueue(:perform, block)

  return nil
end

#resize(new_size) ⇒ Object



98
99
100
101
102
103
104
105
106
107
108
# File 'lib/workers/pool.rb', line 98

def resize(new_size)
  @lock.synchronize do
    if new_size > @size
      expand(new_size - @size)
    elsif new_size < @size
      contract(@size - new_size)
    end
  end

  return nil
end

#shutdown(&block) ⇒ Object



32
33
34
35
36
37
38
39
40
# File 'lib/workers/pool.rb', line 32

def shutdown(&block)
  @lock.synchronize do
    @size.times do
      enqueue(:shutdown, block)
    end
  end

  return nil
end

#sizeObject



63
64
65
66
67
# File 'lib/workers/pool.rb', line 63

def size
  @lock.synchronize do
    return @size
  end
end