Class: ActionPool::Pool

Inherits:
Object
  • Object
show all
Defined in:
lib/actionpool/Pool.rb

Instance Method Summary collapse

Constructor Details

#initialize(args = {}) ⇒ Pool

:min_threads

minimum number of threads in pool

:max_threads

maximum number of threads in pool

:t_to

thread timeout waiting for action to process

:a_to

maximum time action may be worked on before aborting

:logger

logger to print logging messages to

Creates a new pool

Raises:

  • (ArgumentError)


18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
# File 'lib/actionpool/Pool.rb', line 18

def initialize(args={})
    raise ArgumentError.new('Hash required for initialization') unless args.is_a?(Hash)
    @logger = args[:logger] && args[:logger].is_a?(Logger) ? args[:logger] : Logger.new(nil)
    @queue = ActionPool::Queue.new
    @threads = []
    @lock = Splib::Monitor.new
    @thread_timeout = args[:t_to] ? args[:t_to] : 0
    @action_timeout = args[:a_to] ? args[:a_to] : 0
    @max_threads = args[:max_threads] ? args[:max_threads] : 100
    @min_threads = args[:min_threads] ? args[:min_threads] : 10
    @min_threads = @max_threads if @max_threads < @min_threads
    @respond_to = args[:respond_thread] || ::Thread.current
    @open = true
    fill_pool
end

Instance Method Details

#<<(action) ⇒ Object

action

proc to be executed or array of [proc, [*args]]

Add a new proc/lambda to be executed (alias for queue)



112
113
114
115
116
117
118
119
120
121
122
123
# File 'lib/actionpool/Pool.rb', line 112

def <<(action)
    case action
        when Proc
            queue(action)
        when Array
            raise ArgumentError.new('Actions to be processed by the pool must be a proc/lambda or [proc/lambda, [*args]]') unless action.size == 2 and action[0].is_a?(Proc) and action[1].is_a?(Array)
            queue(action[0], action[1])
        else
            raise ArgumentError.new('Actions to be processed by the pool must be a proc/lambda or [proc/lambda, [*args]]')
    end
    nil
end

#actionObject

Returns the next action to be processed



251
252
253
# File 'lib/actionpool/Pool.rb', line 251

def action
    @queue.pop
end

#action_sizeObject

Number of actions in the queue



256
257
258
# File 'lib/actionpool/Pool.rb', line 256

def action_size
    @queue.size
end

#action_timeoutObject

Maximum number of seconds a thread is allowed to work on a given action (nil means thread is given unlimited time to work on action)



225
226
227
# File 'lib/actionpool/Pool.rb', line 225

def action_timeout
    @action_timeout
end

#action_timeout=(t) ⇒ Object

t

timeout in seconds (nil for infinte)

Set maximum allowed time thread may work on a given action

Raises:

  • (ArgumentError)


242
243
244
245
246
247
248
# File 'lib/actionpool/Pool.rb', line 242

def action_timeout=(t)
    t = t.to_f
    raise ArgumentError.new('Value must be greater than zero or nil') unless t >= 0
    @action_timeout = t
    @threads.each{|thread|thread.action_timeout = t}
    t
end

#add_jobs(jobs) ⇒ Object

jobs

Array of proc/lambdas

Will queue a list of jobs into the pool

Raises:



137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
# File 'lib/actionpool/Pool.rb', line 137

def add_jobs(jobs)
    raise PoolClosed.new("Pool #{self} is currently closed") if pool_closed?
    raise ArgumentError.new("Expecting an array but received: #{jobs.class}") unless jobs.is_a?(Array)
    @queue.pause
    begin
        jobs.each do |job|
            case job
            when Proc
                @queue << [job, []]
            when Array
                raise ArgumentError.new('Jobs to be processed by the pool must be a proc/lambda or [proc/lambda, [*args]]') unless job.size == 2 and job[0].is_a?(Proc) and job[1].is_a?(Array)
                @queue << [job.shift, job]
            else
                raise ArgumentError.new('Jobs to be processed by the pool must be a proc/lambda or [proc/lambda, [*args]]')
            end
        end
    ensure
        num = jobs.size - @threads.select{|t|t.waiting?}.size
        num.times{ create_thread(:nowait) } if num > 0
        @queue.unpause
    end
    true
end

#create_thread(*args) ⇒ Object

args

:force forces a new thread. :nowait will create a thread if threads are waiting

Create a new thread for pool. Returns newly created thread or nil if pool is at maximum size



55
56
57
58
59
60
61
62
63
64
65
66
67
# File 'lib/actionpool/Pool.rb', line 55

def create_thread(*args)
    return if pool_closed?
    thread = nil
    @lock.synchronize do
        if(((size == working || args.include?(:nowait)) && @threads.size < @max_threads) || args.include?(:force))
            thread = ActionPool::Thread.new(:pool => self, :respond_thread => @respond_to, :a_timeout => @action_timeout,
                :t_timeout => @thread_timeout, :logger => @logger, :autostart => false)
            @threads << thread
        end
    end
    thread.start if thread
    thread
end

#fill_poolObject

Fills the pool with the minimum number of threads Returns array of created threads



71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
# File 'lib/actionpool/Pool.rb', line 71

def fill_pool
    threads = []
    if(@open)
        @lock.synchronize do
            required = min - size
            if(required > 0)
                required.times do
                    thread = ActionPool::Thread.new(:pool => self, :respond_thread => @respond_to,
                        :a_timeout => @action_timeout, :t_timeout => @thread_timeout, :logger => @logger,
                        :autostart => false)
                    @threads << thread
                    threads << thread
                end
            end
        end
    end
    threads.each{|t|t.start}
    threads
end

#flushObject

Flush the thread pool. Mainly used for forcibly resizing the pool if existing threads have a long thread life waiting for input.



263
264
265
266
267
268
269
# File 'lib/actionpool/Pool.rb', line 263

def flush
    mon = Splib::Monitor.new
    @threads.size.times{ queue{ mon.wait } }
    @queue.wait_empty
    sleep(0.01)
    mon.broadcast
end

#maxObject

Maximum allowed number of threads



174
175
176
# File 'lib/actionpool/Pool.rb', line 174

def max
    @max_threads
end

#max=(m) ⇒ Object

m

new max

Set maximum number of threads

Raises:

  • (ArgumentError)


185
186
187
188
189
190
191
192
# File 'lib/actionpool/Pool.rb', line 185

def max=(m)
    m = m.to_i
    raise ArgumentError.new('Maximum value must be greater than 0') unless m > 0
    @max_threads = m
    @min_threads = m if m < @min_threads
    resize if m < size
    m
end

#minObject

Minimum allowed number of threads



179
180
181
# File 'lib/actionpool/Pool.rb', line 179

def min
    @min_threads
end

#min=(m) ⇒ Object

m

new min

Set minimum number of threads

Raises:

  • (ArgumentError)


196
197
198
199
200
201
# File 'lib/actionpool/Pool.rb', line 196

def min=(m)
    m = m.to_i
    raise ArgumentError.new("Minimum value must be greater than 0 and less than or equal to maximum (#{max})") unless m > 0 && m <= max
    @min_threads = m
    m
end

#pool_closed?Boolean

Pool is closed

Returns:

  • (Boolean)


35
36
37
# File 'lib/actionpool/Pool.rb', line 35

def pool_closed?
    !@open
end

#pool_open?Boolean

Pool is open

Returns:

  • (Boolean)


40
41
42
# File 'lib/actionpool/Pool.rb', line 40

def pool_open?
    @open
end

#process(*args, &block) ⇒ Object

block

block to process

Adds a block to be processed



163
164
165
166
# File 'lib/actionpool/Pool.rb', line 163

def process(*args, &block)
    queue(block, *args)
    nil
end

#queue(action, *args) ⇒ Object

action

proc to be executed

Add a new proc/lambda to be executed

Raises:



127
128
129
130
131
132
133
# File 'lib/actionpool/Pool.rb', line 127

def queue(action, *args)
    raise PoolClosed.new("Pool #{self} is currently closed") if pool_closed?
    raise ArgumentError.new('Expecting block') unless action.is_a?(Proc)
    @queue << [action, args]
    ::Thread.pass
    create_thread
end

#remove(t) ⇒ Object

t

ActionPool::Thread to remove

Removes a thread from the pool

Raises:

  • (ArgumentError)


205
206
207
208
209
210
211
212
# File 'lib/actionpool/Pool.rb', line 205

def remove(t)
    raise ArgumentError.new('Expecting an ActionPool::Thread object') unless t.is_a?(ActionPool::Thread)
    t.stop
    del = @threads.include?(t)
    @threads.delete(t) if del
    fill_pool
    del
end

#shutdown(force = false) ⇒ Object

force

force immediate stop

Stop the pool



93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
# File 'lib/actionpool/Pool.rb', line 93

def shutdown(force=false)
    status(:closed)
    args = []
    args.push(:force) if force
    @logger.info("Pool is now shutting down #{force ? 'using force' : ''}")
    @queue.clear if force
    @queue.wait_empty
    while(t = @threads.pop) do
        t.stop(*args)
    end
    unless(force)
        flush
        @threads.each{|t|t.join}
    end
    nil
end

#sizeObject

Current size of pool



169
170
171
# File 'lib/actionpool/Pool.rb', line 169

def size
    @threads.size
end

#status(arg) ⇒ Object

arg

:open or :closed

Set pool status



46
47
48
49
# File 'lib/actionpool/Pool.rb', line 46

def status(arg)
    @open = arg == :open
    fill_pool if @open
end

#thread_statsObject



276
277
278
# File 'lib/actionpool/Pool.rb', line 276

def thread_stats
    @threads.map{|t|[t.object_id,t.status]}
end

#thread_timeoutObject

Maximum number of seconds a thread is allowed to idle in the pool. (nil means thread life is infinite)



217
218
219
# File 'lib/actionpool/Pool.rb', line 217

def thread_timeout
    @thread_timeout
end

#thread_timeout=(t) ⇒ Object

t

timeout in seconds (nil for infinite)

Set maximum allowed time thead may idle in pool

Raises:

  • (ArgumentError)


231
232
233
234
235
236
237
# File 'lib/actionpool/Pool.rb', line 231

def thread_timeout=(t)
    t = t.to_f
    raise ArgumentError.new('Value must be greater than zero or nil') unless t >= 0
    @thread_timeout = t
    @threads.each{|thread|thread.thread_timeout = t}
    t
end

#workingObject

Returns current number of threads in the pool working



272
273
274
# File 'lib/actionpool/Pool.rb', line 272

def working
    @threads.select{|t|t.running?}.size
end