Class: Thread::Pool

Inherits:
Object
  • Object
show all
Defined in:
lib/thread/pool.rb,
lib/thread/future.rb

Overview

A pool is a container of a limited amount of threads to which you can add tasks to run.

This is usually more performant and less memory intensive than creating a new thread for every task.

Defined Under Namespace

Classes: Task

Class Attribute Summary collapse

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(min, max = nil, &block) ⇒ Pool

Create the pool with minimum and maximum threads.

The pool will start with the minimum amount of threads created and will spawn new threads until the max is reached in case of need.

A default block can be passed, which will be used to #process the passed data.



113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
# File 'lib/thread/pool.rb', line 113

def initialize (min, max = nil, &block)
  @min   = min
  @max   = max || min
  @block = block

  @cond  = ConditionVariable.new
  @mutex = Mutex.new

  @done       = ConditionVariable.new
  @done_mutex = Mutex.new

  @todo     = []
  @workers  = []
  @timeouts = {}

  @spawned       = 0
  @waiting       = 0
  @shutdown      = false
  @trim_requests = 0
  @auto_trim     = false
  @idle_trim     = nil

  @mutex.synchronize {
    min.times {
      spawn_thread
    }
  }
end

Class Attribute Details

.abort_on_exceptionObject

If true, tasks will allow raised exceptions to pass through.

Similar to Thread.abort_on_exception



350
351
352
# File 'lib/thread/pool.rb', line 350

def abort_on_exception
  @abort_on_exception
end

Instance Attribute Details

#maxObject (readonly)

Returns the value of attribute max.



104
105
106
# File 'lib/thread/pool.rb', line 104

def max
  @max
end

#minObject (readonly)

Returns the value of attribute min.



104
105
106
# File 'lib/thread/pool.rb', line 104

def min
  @min
end

#spawnedObject (readonly)

Returns the value of attribute spawned.



104
105
106
# File 'lib/thread/pool.rb', line 104

def spawned
  @spawned
end

#waitingObject (readonly)

Returns the value of attribute waiting.



104
105
106
# File 'lib/thread/pool.rb', line 104

def waiting
  @waiting
end

Instance Method Details

#auto_trim!Object

Enable auto trimming, unneeded threads will be deleted until the minimum is reached.



152
153
154
# File 'lib/thread/pool.rb', line 152

def auto_trim!
  @auto_trim = true
end

#auto_trim?Boolean

Check if auto trimming is enabled.

Returns:

  • (Boolean)


146
147
148
# File 'lib/thread/pool.rb', line 146

def auto_trim?
  @auto_trim
end

#backlogObject

Get the amount of tasks that still have to be run.



186
187
188
189
190
# File 'lib/thread/pool.rb', line 186

def backlog
  @mutex.synchronize {
    @todo.length
  }
end

#done?Boolean

Are all tasks consumed ?

Returns:

  • (Boolean)


193
194
195
196
197
# File 'lib/thread/pool.rb', line 193

def done?
  @mutex.synchronize {
    @todo.empty? and @waiting == @spawned
  }
end

#future(&block) ⇒ Object



161
162
163
# File 'lib/thread/future.rb', line 161

def future (&block)
  Thread.future self, &block
end

#idle(*args, &block) ⇒ Object

Process Block when there is a idle worker if not block its returns



217
218
219
220
221
222
223
224
225
226
227
228
229
230
# File 'lib/thread/pool.rb', line 217

def idle (*args, &block)
  while !idle?
    @done_mutex.synchronize {
      break if idle?
      @done.wait @done_mutex
    }
  end

  unless block
    return
  end

  process *args, &block
end

#idle?Boolean

Check if there are idle workers.

Returns:

  • (Boolean)


210
211
212
213
214
# File 'lib/thread/pool.rb', line 210

def idle?
  @mutex.synchronize {
    @todo.length < @waiting
  }
end

#idle_trim!(timeout) ⇒ Object

Enable idle trimming. Unneeded threads will be deleted after the given number of seconds of inactivity. The minimum number of threads is respeced.



168
169
170
# File 'lib/thread/pool.rb', line 168

def idle_trim!(timeout)
  @idle_trim = timeout
end

#idle_trim?Boolean

Check if idle trimming is enabled.

Returns:

  • (Boolean)


162
163
164
# File 'lib/thread/pool.rb', line 162

def idle_trim?
  !@idle_trim.nil?
end

#joinObject

Join on all threads in the pool.



312
313
314
315
316
317
318
319
320
# File 'lib/thread/pool.rb', line 312

def join
  until @workers.empty?
    if worker = @workers.first
      worker.join
    end
  end

  self
end

#no_auto_trim!Object

Disable auto trimming.



157
158
159
# File 'lib/thread/pool.rb', line 157

def no_auto_trim!
  @auto_trim = false
end

#no_idle_trim!Object

Turn of idle trimming.



173
174
175
# File 'lib/thread/pool.rb', line 173

def no_idle_trim!
  @idle_trim = nil
end

#process(*args, &block) ⇒ Object Also known as: <<

Add a task to the pool which will execute the block with the given argument.

If no block is passed the default block will be used if present, an ArgumentError will be raised otherwise.



237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
# File 'lib/thread/pool.rb', line 237

def process (*args, &block)
  unless block || @block
    raise ArgumentError, 'you must pass a block'
  end

  task = Task.new(self, *args, &(block || @block))

  @mutex.synchronize {
    raise 'unable to add work while shutting down' if shutdown?

    @todo << task

    if @waiting == 0 && @spawned < @max
      spawn_thread
    end

    @cond.signal
  }

  task
end

#resize(min, max = nil) ⇒ Object

Resize the pool with the passed arguments.



178
179
180
181
182
183
# File 'lib/thread/pool.rb', line 178

def resize (min, max = nil)
  @min = min
  @max = max || min

  trim!
end

#shutdownObject

Shut down the pool, it will block until all tasks have finished running.



292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
# File 'lib/thread/pool.rb', line 292

def shutdown
  @mutex.synchronize {
    @shutdown = :nicely
    @cond.broadcast
  }

  join

  if @timeout
    @shutdown = :now

    wake_up_timeout

    @timeout.join
  end

  self
end

#shutdown!Object

Shut down the pool instantly without finishing to execute tasks.



280
281
282
283
284
285
286
287
288
289
# File 'lib/thread/pool.rb', line 280

def shutdown!
  @mutex.synchronize {
    @shutdown = :now
    @cond.broadcast
  }

  wake_up_timeout

  self
end

#shutdown?Boolean

Check if the pool has been shut down.

Returns:

  • (Boolean)


143
# File 'lib/thread/pool.rb', line 143

def shutdown?; !!@shutdown; end

#shutdown_after(timeout) ⇒ Object

Shutdown the pool after a given amount of time.



336
337
338
339
340
341
342
343
344
# File 'lib/thread/pool.rb', line 336

def shutdown_after (timeout)
  Thread.new {
    sleep timeout

    shutdown
  }

  self
end

#timeout_for(task, timeout) ⇒ Object

Define a timeout for a task.



323
324
325
326
327
328
329
330
331
332
333
# File 'lib/thread/pool.rb', line 323

def timeout_for (task, timeout)
  unless @timeout
    spawn_timeout_thread
  end

  @mutex.synchronize {
    @timeouts[task] = timeout

    wake_up_timeout
  }
end

#trim(force = false) ⇒ Object

Trim the unused threads, if forced threads will be trimmed even if there are tasks waiting.



263
264
265
266
267
268
269
270
271
272
# File 'lib/thread/pool.rb', line 263

def trim (force = false)
  @mutex.synchronize {
    if (force || @waiting > 0) && @spawned - @trim_requests > @min
      @trim_requests += 1
      @cond.signal
    end
  }

  self
end

#trim!Object

Force ##trim.



275
276
277
# File 'lib/thread/pool.rb', line 275

def trim!
  trim true
end

#wait_doneObject

Wait until all tasks are consumed. The caller will be blocked until then.



200
201
202
203
204
205
206
207
# File 'lib/thread/pool.rb', line 200

def wait_done
  loop do
    @done_mutex.synchronize {
      return self if done?
      @done.wait @done_mutex
    }
  end
end