Class: Amazon::Util::ThreadPool

Inherits:
Object
  • Object
show all
Defined in:
lib/amazon/util/threadpool.rb

Overview

ThreadPool is a generic threadpooling class that enables easier multithreaded workflows. Initialize with a thread count, then addWork to queue up tasks. You can sync to ensure the current workload is complete, or finish to flush the threads when you’re done.

Defined Under Namespace

Classes: WorkItem

Instance Method Summary collapse

Constructor Details

#initialize(num_threads, exception_handler = nil) ⇒ ThreadPool

First arg is the thread count. Threads will be created once and wait for work ( no performance penalty, since they’re waiting on a Queue. Second arg (optional) is a proc to be used as an exception handler. If this argument is passed in and the thread encounters an uncaught exception, the proc will be called with the exception as the only argument.



21
22
23
24
25
26
27
28
# File 'lib/amazon/util/threadpool.rb', line 21

def initialize( num_threads, exception_handler=nil )
  @work = Queue.new
  @threads = ThreadGroup.new
  num_threads.times do
    worker_thread = Thread.new { workerProcess(exception_handler) }
    @threads.add worker_thread
  end
end

Instance Method Details

#addWork(*args, &block) ⇒ Object

add work to the queue pass any number of arguments, they will be passed on to the block.



32
33
34
35
36
# File 'lib/amazon/util/threadpool.rb', line 32

def addWork( *args, &block )
  item = WorkItem.new( args, &block )
  @work.push( item )
  item
end

#finishObject

request thread completion and wait for them to finish



50
51
52
53
54
55
# File 'lib/amazon/util/threadpool.rb', line 50

def finish
  noMoreWork
  @threads.list.each do |t|
    t.join
  end
end

#noMoreWorkObject

request thread completion No more work will be performed



45
46
47
# File 'lib/amazon/util/threadpool.rb', line 45

def noMoreWork
  threadcount.times { @work << :Finish }
end

#syncObject

wait for the currently queued work to finish (This freezes up the entire pool, temporarily)



59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
# File 'lib/amazon/util/threadpool.rb', line 59

def sync
  t = threadcount

  if t < 2
    item = addWork { :sync }
    return item.getResult
  end

  q = Queue.new
  items = []

  items << addWork do
    q.pop
  end

  (t-2).times do |t|
    items << addWork(t) do |i|
      items[i].getResult
    end
  end

  addWork do
    q.push :sync
  end

  items.last.getResult
end

#threadcountObject

how many worker threads are there?



39
40
41
# File 'lib/amazon/util/threadpool.rb', line 39

def threadcount
  @threads.list.length
end