Class: XPool

Inherits:
Object
  • Object
show all
Defined in:
lib/xpool.rb,
lib/xpool/version.rb

Defined Under Namespace

Classes: Process

Constant Summary collapse

VERSION =
"0.10.1"

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(size = number_of_cpu_cores) ⇒ XPool

Parameters:

  • size (Fixnum) (defaults to: number_of_cpu_cores)

    The number of subprocesses to spawn. Defaults to the number of cores on your CPU.



40
41
42
# File 'lib/xpool.rb', line 40

def initialize(size=number_of_cpu_cores)
  @pool = Array.new(size) { Process.new }
end

Class Method Details

.debugObject



9
10
11
12
13
14
15
16
17
18
19
20
# File 'lib/xpool.rb', line 9

def self.debug
  if block_given?
    begin
      @debug = true
      yield
    ensure
      @debug = false
    end
  else
    @debug
  end
end

.debug=(boolean) ⇒ Object



22
23
24
# File 'lib/xpool.rb', line 22

def self.debug=(boolean)
  @debug = boolean
end

.log(msg, type = :info) ⇒ Object



26
27
28
29
30
31
# File 'lib/xpool.rb', line 26

def self.log(msg, type = :info)
  @logger = @logger || Logger.new(STDOUT)
  if @debug
    @logger.public_send type, msg
  end
end

Instance Method Details

#broadcast(unit, *args) ⇒ Array<XPool::Process>

Broadcasts unit to be run across all subprocesses in the pool.

Examples:

pool = XPool.new 5
pool.broadcast unit
pool.shutdown

Returns:

  • (Array<XPool::Process>)

    Returns an array of XPool::Process objects

Raises:

  • (RuntimeError)

    When a subprocess in the pool is dead.



115
116
117
118
119
# File 'lib/xpool.rb', line 115

def broadcast(unit, *args)
  @pool.map do |process|
    process.schedule unit, *args
  end
end

#dry?Boolean

Returns true when all subprocesses in the pool are busy.

Returns:

  • (Boolean)

    Returns true when all subprocesses in the pool are busy.



217
218
219
# File 'lib/xpool.rb', line 217

def dry?
  @pool.all?(&:busy?)
end

#expand(number) ⇒ void

This method returns an undefined value.

Parameters:

  • number (Fixnum)

    The number of subprocesses to add to the pool.



51
52
53
# File 'lib/xpool.rb', line 51

def expand(number)
  resize! size + number
end

#failed_processesArray<XPool::Process>

Returns an Array of failed processes.

Returns:



97
98
99
# File 'lib/xpool.rb', line 97

def failed_processes
  @pool.select(&:failed?)
end

#resize(new_size) ⇒ void

This method returns an undefined value.

Resize the pool (gracefully, if neccesary)

Parameters:

  • new_size (Fixnum)

    The new size of the pool.



163
164
165
# File 'lib/xpool.rb', line 163

def resize(new_size)
  _resize new_size, false
end

#resize!(new_size) ⇒ void

This method returns an undefined value.

Resize the pool (with force, if neccesary).

Examples:

pool = XPool.new 5
pool.resize! 3
pool.shutdown

Parameters:

  • new_size (Fixnum)

    The new size of the pool.



180
181
182
# File 'lib/xpool.rb', line 180

def resize!(new_size)
  _resize new_size, true
end

#schedule(unit, *args) ⇒ XPool::Process

Dispatch a unit of work in a subprocess.

Parameters:

  • unit (#run)

    The unit of work

  • *args (Object)

    A variable number of arguments to be passed to #run

Returns:

Raises:

  • (RuntimeError)

    When the pool is dead (no subprocesses are left running)



196
197
198
199
200
201
202
203
# File 'lib/xpool.rb', line 196

def schedule(unit,*args)
  if size == 0 # dead pool
    raise RuntimeError,
      "cannot schedule unit of work on a dead pool"
  end
  process = @pool.reject(&:dead?).min_by { |p| p.frequency }
  process.schedule unit, *args
end

#shrink(number) ⇒ Object

Parameters:

  • number (Fixnum)

    The number of subprocesses to remove from the pool. A graceful shutdown is performed.

Raises:

  • (ArgumentError)

    When number is greater than #size.



66
67
68
69
70
71
72
# File 'lib/xpool.rb', line 66

def shrink(number)
  present_size = size
  raise_if number > present_size,
    ArgumentError,
    "cannot shrink pool by #{number}. pool is only #{present_size} in size."
  resize present_size - number
end

#shrink!(number) ⇒ void

This method returns an undefined value.

Parameters:

  • number (Fixnum)

    The number of subprocesses to remove from the pool. A forceful shutdown is performed.

Raises:

  • (ArgumentError)

    When number is greater than #size.



85
86
87
88
89
90
91
# File 'lib/xpool.rb', line 85

def shrink!(number)
  present_size = size
  raise_if number > present_size,
    ArgumentError,
    "cannot shrink pool by #{number}. pool is only #{present_size} in size."
  resize! present_size - number
end

#shutdown(timeout = nil) ⇒ void

This method returns an undefined value.

A graceful shutdown of the pool. Each subprocess in the pool empties its queue and exits normally.

Parameters:

  • timeout (Fixnum) (defaults to: nil)

    An optional amount of seconds to wait before forcing a shutdown through #shutdown!.



131
132
133
134
135
136
137
138
139
140
141
142
143
144
# File 'lib/xpool.rb', line 131

def shutdown(timeout=nil)
  if timeout
    begin
      Timeout.timeout(timeout) do
        @pool.each(&:shutdown)
      end
    rescue Timeout::Error
      XPool.log "'#{timeout}' seconds elapsed, switching to hard shutdown."
      shutdown!
    end
  else
    @pool.each(&:shutdown)
  end
end

#shutdown!void

This method returns an undefined value.

A forceful shutdown of the pool (through SIGKILL).



151
152
153
# File 'lib/xpool.rb', line 151

def shutdown!
  @pool.each(&:shutdown!)
end

#sizeFixnum

Returns the number of alive subprocesses in the pool.

Returns:

  • (Fixnum)

    Returns the number of alive subprocesses in the pool.



209
210
211
# File 'lib/xpool.rb', line 209

def size
  @pool.count(&:alive?)
end