Class: ActionPool::Pool
- Inherits:
-
Object
- Object
- ActionPool::Pool
- Defined in:
- lib/actionpool/Pool.rb
Instance Method Summary collapse
-
#<<(action) ⇒ Object
- action
-
proc to be executed or array of [proc, [*args]] Add a new proc/lambda to be executed (alias for queue).
-
#action ⇒ Object
Returns the next action to be processed.
-
#action_size ⇒ Object
Number of actions in the queue.
-
#action_timeout ⇒ Object
Maximum number of seconds a thread is allowed to work on a given action (nil means thread is given unlimited time to work on action).
-
#action_timeout=(t) ⇒ Object
- t
-
timeout in seconds (nil for infinte) Set maximum allowed time thread may work on a given action.
-
#add_jobs(jobs) ⇒ Object
- jobs
-
Array of proc/lambdas Will queue a list of jobs into the pool.
-
#create_thread(*args) ⇒ Object
- args
-
:force forces a new thread.
-
#fill_pool ⇒ Object
Fills the pool with the minimum number of threads Returns array of created threads.
-
#flush ⇒ Object
Flush the thread pool.
-
#initialize(args = {}) ⇒ Pool
constructor
- :min_threads
- minimum number of threads in pool :max_threads
- maximum number of threads in pool :t_to
- thread timeout waiting for action to process :a_to
- maximum time action may be worked on before aborting :logger
-
logger to print logging messages to Creates a new pool.
-
#max ⇒ Object
Maximum allowed number of threads.
-
#max=(m) ⇒ Object
- m
-
new max Set maximum number of threads.
-
#min ⇒ Object
Minimum allowed number of threads.
-
#min=(m) ⇒ Object
- m
-
new min Set minimum number of threads.
-
#pool_closed? ⇒ Boolean
Pool is closed.
-
#pool_open? ⇒ Boolean
Pool is open.
-
#process(*args, &block) ⇒ Object
- block
-
block to process Adds a block to be processed.
-
#queue(action, *args) ⇒ Object
- action
-
proc to be executed Add a new proc/lambda to be executed.
-
#remove(t) ⇒ Object
- t
-
ActionPool::Thread to remove Removes a thread from the pool.
-
#shutdown(force = false) ⇒ Object
- force
-
force immediate stop Stop the pool.
-
#size ⇒ Object
Current size of pool.
-
#status(arg) ⇒ Object
- arg
-
:open or :closed Set pool status.
- #thread_stats ⇒ Object
-
#thread_timeout ⇒ Object
Maximum number of seconds a thread is allowed to idle in the pool.
-
#thread_timeout=(t) ⇒ Object
- t
-
timeout in seconds (nil for infinite) Set maximum allowed time thead may idle in pool.
-
#working ⇒ Object
Returns current number of threads in the pool working.
Constructor Details
#initialize(args = {}) ⇒ Pool
- :min_threads
-
minimum number of threads in pool
- :max_threads
-
maximum number of threads in pool
- :t_to
-
thread timeout waiting for action to process
- :a_to
-
maximum time action may be worked on before aborting
- :logger
-
logger to print logging messages to
Creates a new pool
18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 |
# File 'lib/actionpool/Pool.rb', line 18 def initialize(args={}) raise ArgumentError.new('Hash required for initialization') unless args.is_a?(Hash) @logger = args[:logger] && args[:logger].is_a?(Logger) ? args[:logger] : Logger.new(nil) @queue = ActionPool::Queue.new @threads = [] @lock = Splib::Monitor.new @thread_timeout = args[:t_to] ? args[:t_to] : 0 @action_timeout = args[:a_to] ? args[:a_to] : 0 @max_threads = args[:max_threads] ? args[:max_threads] : 100 @min_threads = args[:min_threads] ? args[:min_threads] : 10 @min_threads = @max_threads if @max_threads < @min_threads @respond_to = args[:respond_thread] || ::Thread.current @open = true fill_pool end |
Instance Method Details
#<<(action) ⇒ Object
- action
-
proc to be executed or array of [proc, [*args]]
Add a new proc/lambda to be executed (alias for queue)
112 113 114 115 116 117 118 119 120 121 122 123 |
# File 'lib/actionpool/Pool.rb', line 112 def <<(action) case action when Proc queue(action) when Array raise ArgumentError.new('Actions to be processed by the pool must be a proc/lambda or [proc/lambda, [*args]]') unless action.size == 2 and action[0].is_a?(Proc) and action[1].is_a?(Array) queue(action[0], action[1]) else raise ArgumentError.new('Actions to be processed by the pool must be a proc/lambda or [proc/lambda, [*args]]') end nil end |
#action ⇒ Object
Returns the next action to be processed
251 252 253 |
# File 'lib/actionpool/Pool.rb', line 251 def action @queue.pop end |
#action_size ⇒ Object
Number of actions in the queue
256 257 258 |
# File 'lib/actionpool/Pool.rb', line 256 def action_size @queue.size end |
#action_timeout ⇒ Object
Maximum number of seconds a thread is allowed to work on a given action (nil means thread is given unlimited time to work on action)
225 226 227 |
# File 'lib/actionpool/Pool.rb', line 225 def action_timeout @action_timeout end |
#action_timeout=(t) ⇒ Object
- t
-
timeout in seconds (nil for infinte)
Set maximum allowed time thread may work on a given action
242 243 244 245 246 247 248 |
# File 'lib/actionpool/Pool.rb', line 242 def action_timeout=(t) t = t.to_f raise ArgumentError.new('Value must be greater than zero or nil') unless t >= 0 @action_timeout = t @threads.each{|thread|thread.action_timeout = t} t end |
#add_jobs(jobs) ⇒ Object
- jobs
-
Array of proc/lambdas
Will queue a list of jobs into the pool
137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 |
# File 'lib/actionpool/Pool.rb', line 137 def add_jobs(jobs) raise PoolClosed.new("Pool #{self} is currently closed") if pool_closed? raise ArgumentError.new("Expecting an array but received: #{jobs.class}") unless jobs.is_a?(Array) @queue.pause begin jobs.each do |job| case job when Proc @queue << [job, []] when Array raise ArgumentError.new('Jobs to be processed by the pool must be a proc/lambda or [proc/lambda, [*args]]') unless job.size == 2 and job[0].is_a?(Proc) and job[1].is_a?(Array) @queue << [job.shift, job] else raise ArgumentError.new('Jobs to be processed by the pool must be a proc/lambda or [proc/lambda, [*args]]') end end ensure num = jobs.size - @threads.select{|t|t.waiting?}.size num.times{ create_thread(:nowait) } if num > 0 @queue.unpause end true end |
#create_thread(*args) ⇒ Object
- args
-
:force forces a new thread. :nowait will create a thread if threads are waiting
Create a new thread for pool. Returns newly created thread or nil if pool is at maximum size
55 56 57 58 59 60 61 62 63 64 65 66 67 |
# File 'lib/actionpool/Pool.rb', line 55 def create_thread(*args) return if pool_closed? thread = nil @lock.synchronize do if(((size == working || args.include?(:nowait)) && @threads.size < @max_threads) || args.include?(:force)) thread = ActionPool::Thread.new(:pool => self, :respond_thread => @respond_to, :a_timeout => @action_timeout, :t_timeout => @thread_timeout, :logger => @logger, :autostart => false) @threads << thread end end thread.start if thread thread end |
#fill_pool ⇒ Object
Fills the pool with the minimum number of threads Returns array of created threads
71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 |
# File 'lib/actionpool/Pool.rb', line 71 def fill_pool threads = [] if(@open) @lock.synchronize do required = min - size if(required > 0) required.times do thread = ActionPool::Thread.new(:pool => self, :respond_thread => @respond_to, :a_timeout => @action_timeout, :t_timeout => @thread_timeout, :logger => @logger, :autostart => false) @threads << thread threads << thread end end end end threads.each{|t|t.start} threads end |
#flush ⇒ Object
Flush the thread pool. Mainly used for forcibly resizing the pool if existing threads have a long thread life waiting for input.
263 264 265 266 267 268 269 |
# File 'lib/actionpool/Pool.rb', line 263 def flush mon = Splib::Monitor.new @threads.size.times{ queue{ mon.wait } } @queue.wait_empty sleep(0.01) mon.broadcast end |
#max ⇒ Object
Maximum allowed number of threads
174 175 176 |
# File 'lib/actionpool/Pool.rb', line 174 def max @max_threads end |
#max=(m) ⇒ Object
- m
-
new max
Set maximum number of threads
185 186 187 188 189 190 191 192 |
# File 'lib/actionpool/Pool.rb', line 185 def max=(m) m = m.to_i raise ArgumentError.new('Maximum value must be greater than 0') unless m > 0 @max_threads = m @min_threads = m if m < @min_threads resize if m < size m end |
#min ⇒ Object
Minimum allowed number of threads
179 180 181 |
# File 'lib/actionpool/Pool.rb', line 179 def min @min_threads end |
#min=(m) ⇒ Object
- m
-
new min
Set minimum number of threads
196 197 198 199 200 201 |
# File 'lib/actionpool/Pool.rb', line 196 def min=(m) m = m.to_i raise ArgumentError.new("Minimum value must be greater than 0 and less than or equal to maximum (#{max})") unless m > 0 && m <= max @min_threads = m m end |
#pool_closed? ⇒ Boolean
Pool is closed
35 36 37 |
# File 'lib/actionpool/Pool.rb', line 35 def pool_closed? !@open end |
#pool_open? ⇒ Boolean
Pool is open
40 41 42 |
# File 'lib/actionpool/Pool.rb', line 40 def pool_open? @open end |
#process(*args, &block) ⇒ Object
- block
-
block to process
Adds a block to be processed
163 164 165 166 |
# File 'lib/actionpool/Pool.rb', line 163 def process(*args, &block) queue(block, *args) nil end |
#queue(action, *args) ⇒ Object
- action
-
proc to be executed
Add a new proc/lambda to be executed
127 128 129 130 131 132 133 |
# File 'lib/actionpool/Pool.rb', line 127 def queue(action, *args) raise PoolClosed.new("Pool #{self} is currently closed") if pool_closed? raise ArgumentError.new('Expecting block') unless action.is_a?(Proc) @queue << [action, args] ::Thread.pass create_thread end |
#remove(t) ⇒ Object
- t
-
ActionPool::Thread to remove
Removes a thread from the pool
205 206 207 208 209 210 211 212 |
# File 'lib/actionpool/Pool.rb', line 205 def remove(t) raise ArgumentError.new('Expecting an ActionPool::Thread object') unless t.is_a?(ActionPool::Thread) t.stop del = @threads.include?(t) @threads.delete(t) if del fill_pool del end |
#shutdown(force = false) ⇒ Object
- force
-
force immediate stop
Stop the pool
93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 |
# File 'lib/actionpool/Pool.rb', line 93 def shutdown(force=false) status(:closed) args = [] args.push(:force) if force @logger.info("Pool is now shutting down #{force ? 'using force' : ''}") @queue.clear if force @queue.wait_empty while(t = @threads.pop) do t.stop(*args) end unless(force) flush @threads.each{|t|t.join} end nil end |
#size ⇒ Object
Current size of pool
169 170 171 |
# File 'lib/actionpool/Pool.rb', line 169 def size @threads.size end |
#status(arg) ⇒ Object
- arg
-
:open or :closed
Set pool status
46 47 48 49 |
# File 'lib/actionpool/Pool.rb', line 46 def status(arg) @open = arg == :open fill_pool if @open end |
#thread_stats ⇒ Object
276 277 278 |
# File 'lib/actionpool/Pool.rb', line 276 def thread_stats @threads.map{|t|[t.object_id,t.status]} end |
#thread_timeout ⇒ Object
Maximum number of seconds a thread is allowed to idle in the pool. (nil means thread life is infinite)
217 218 219 |
# File 'lib/actionpool/Pool.rb', line 217 def thread_timeout @thread_timeout end |
#thread_timeout=(t) ⇒ Object
- t
-
timeout in seconds (nil for infinite)
Set maximum allowed time thead may idle in pool
231 232 233 234 235 236 237 |
# File 'lib/actionpool/Pool.rb', line 231 def thread_timeout=(t) t = t.to_f raise ArgumentError.new('Value must be greater than zero or nil') unless t >= 0 @thread_timeout = t @threads.each{|thread|thread.thread_timeout = t} t end |
#working ⇒ Object
Returns current number of threads in the pool working
272 273 274 |
# File 'lib/actionpool/Pool.rb', line 272 def working @threads.select{|t|t.running?}.size end |