Class: XPool
- Inherits:
-
Object
- Object
- XPool
- Defined in:
- lib/xpool.rb,
lib/xpool/version.rb
Defined Under Namespace
Classes: Process
Constant Summary collapse
- VERSION =
"0.10.0"
Class Method Summary collapse
Instance Method Summary collapse
-
#broadcast(unit, *args) ⇒ Array<XPool::Process>
Broadcasts unit to be run across all subprocesses in the pool.
-
#dry? ⇒ Boolean
Returns true when all subprocesses in the pool are busy.
- #expand(number) ⇒ void
-
#failed_processes ⇒ Array<XPool::Process>
Returns an Array of failed processes.
- #initialize(size = number_of_cpu_cores) ⇒ XPool constructor
-
#resize(new_size) ⇒ void
Resize the pool (gracefully, if neccesary).
-
#resize!(new_size) ⇒ void
Resize the pool (with force, if neccesary).
-
#schedule(unit, *args) ⇒ XPool::Process
Dispatch a unit of work in a subprocess.
- #shrink(number) ⇒ Object
- #shrink!(number) ⇒ void
-
#shutdown(timeout = nil) ⇒ void
A graceful shutdown of the pool.
-
#shutdown! ⇒ void
A forceful shutdown of the pool (through SIGKILL).
-
#size ⇒ Fixnum
Returns the number of alive subprocesses in the pool.
Constructor Details
Class Method Details
.debug ⇒ Object
9 10 11 12 13 14 15 16 17 18 19 20 |
# File 'lib/xpool.rb', line 9 def self.debug if block_given? begin @debug = true yield ensure @debug = false end else @debug end end |
.debug=(boolean) ⇒ Object
22 23 24 |
# File 'lib/xpool.rb', line 22 def self.debug=(boolean) @debug = boolean end |
.log(msg, type = :info) ⇒ Object
26 27 28 29 30 31 |
# File 'lib/xpool.rb', line 26 def self.log(msg, type = :info) @logger = @logger || Logger.new(STDOUT) if @debug @logger.public_send type, msg end end |
Instance Method Details
#broadcast(unit, *args) ⇒ Array<XPool::Process>
Broadcasts unit to be run across all subprocesses in the pool.
115 116 117 118 119 |
# File 'lib/xpool.rb', line 115 def broadcast(unit, *args) @pool.map do |process| process.schedule unit, *args end end |
#dry? ⇒ Boolean
Returns true when all subprocesses in the pool are busy.
217 218 219 |
# File 'lib/xpool.rb', line 217 def dry? @pool.all?(&:busy?) end |
#expand(number) ⇒ void
This method returns an undefined value.
51 52 53 |
# File 'lib/xpool.rb', line 51 def (number) resize! size + number end |
#failed_processes ⇒ Array<XPool::Process>
Returns an Array of failed processes.
97 98 99 |
# File 'lib/xpool.rb', line 97 def failed_processes @pool.select(&:failed?) end |
#resize(new_size) ⇒ void
This method returns an undefined value.
Resize the pool (gracefully, if neccesary)
163 164 165 |
# File 'lib/xpool.rb', line 163 def resize(new_size) _resize new_size, false end |
#resize!(new_size) ⇒ void
This method returns an undefined value.
Resize the pool (with force, if neccesary).
180 181 182 |
# File 'lib/xpool.rb', line 180 def resize!(new_size) _resize new_size, true end |
#schedule(unit, *args) ⇒ XPool::Process
Dispatch a unit of work in a subprocess.
196 197 198 199 200 201 202 203 |
# File 'lib/xpool.rb', line 196 def schedule(unit,*args) if size == 0 # dead pool raise RuntimeError, "cannot schedule unit of work on a dead pool" end process = @pool.reject(&:dead?).min_by { |p| p.frequency } process.schedule unit, *args end |
#shrink(number) ⇒ Object
66 67 68 69 70 71 72 |
# File 'lib/xpool.rb', line 66 def shrink(number) present_size = size raise_if number > present_size, ArgumentError, "cannot shrink pool by #{number}. pool is only #{present_size} in size." resize present_size - number end |
#shrink!(number) ⇒ void
This method returns an undefined value.
85 86 87 88 89 90 91 |
# File 'lib/xpool.rb', line 85 def shrink!(number) present_size = size raise_if number > present_size, ArgumentError, "cannot shrink pool by #{number}. pool is only #{present_size} in size." resize! present_size - number end |
#shutdown(timeout = nil) ⇒ void
This method returns an undefined value.
A graceful shutdown of the pool. Each subprocess in the pool empties its queue and exits normally.
131 132 133 134 135 136 137 138 139 140 141 142 143 144 |
# File 'lib/xpool.rb', line 131 def shutdown(timeout=nil) if timeout begin Timeout.timeout(timeout) do @pool.each(&:shutdown) end rescue Timeout::Error XPool.log "'#{timeout}' seconds elapsed, switching to hard shutdown." shutdown! end else @pool.each(&:shutdown) end end |
#shutdown! ⇒ void
This method returns an undefined value.
A forceful shutdown of the pool (through SIGKILL).
151 152 153 |
# File 'lib/xpool.rb', line 151 def shutdown! @pool.each(&:shutdown!) end |
#size ⇒ Fixnum
Returns the number of alive subprocesses in the pool.
209 210 211 |
# File 'lib/xpool.rb', line 209 def size @pool.count(&:alive?) end |