Class: RbbtThreadQueue

Inherits:
Object
  • Object
show all
Defined in:
lib/rbbt/util/concurrency/threads.rb

Defined Under Namespace

Classes: RbbtThreadQueueWorker

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(num_threads) ⇒ RbbtThreadQueue

Returns a new instance of RbbtThreadQueue.



37
38
39
40
41
42
# File 'lib/rbbt/util/concurrency/threads.rb', line 37

def initialize(num_threads)
  @num_threads = num_threads
  @threads = []
  @queue = Queue.new
  @mutex = Mutex.new
end

Instance Attribute Details

#blockObject

Returns the value of attribute block.



2
3
4
# File 'lib/rbbt/util/concurrency/threads.rb', line 2

def block
  @block
end

#doneObject

Returns the value of attribute done.



2
3
4
# File 'lib/rbbt/util/concurrency/threads.rb', line 2

def done
  @done
end

#mutexObject

Returns the value of attribute mutex.



2
3
4
# File 'lib/rbbt/util/concurrency/threads.rb', line 2

def mutex
  @mutex
end

#num_threadsObject

Returns the value of attribute num_threads.



2
3
4
# File 'lib/rbbt/util/concurrency/threads.rb', line 2

def num_threads
  @num_threads
end

#queueObject

Returns the value of attribute queue.



2
3
4
# File 'lib/rbbt/util/concurrency/threads.rb', line 2

def queue
  @queue
end

#threadsObject

Returns the value of attribute threads.



2
3
4
# File 'lib/rbbt/util/concurrency/threads.rb', line 2

def threads
  @threads
end

Class Method Details

.each(list, num = 3, &block) ⇒ Object



69
70
71
72
73
74
# File 'lib/rbbt/util/concurrency/threads.rb', line 69

def self.each(list, num = 3, &block)
  q = RbbtThreadQueue.new num
  q.init(&block)
  list.each do |elem| q.process elem end
  q.join
end

Instance Method Details

#cleanObject



61
62
63
# File 'lib/rbbt/util/concurrency/threads.rb', line 61

def clean
  threads.each{ |t| t.clean }.clear
end

#init(use_mutex = false, &block) ⇒ Object



44
45
46
47
48
49
# File 'lib/rbbt/util/concurrency/threads.rb', line 44

def init(use_mutex = false, &block)
  clean
  num_threads.times do |i|
    @threads << RbbtThreadQueueWorker.new(queue, use_mutex ? mutex : nil, &block)
  end
end

#joinObject



51
52
53
54
55
56
57
58
59
# File 'lib/rbbt/util/concurrency/threads.rb', line 51

def join
  while queue.length > 0 or queue.num_waiting < @threads.length
    Thread.pass 
    raise "No worker thread survived" if @threads.empty? and queue.length > 0
  end
  @threads.delete_if{|t| t.alive?}
  @threads.each{|t| t.raise Aborted } 
  @threads.each{|t| t.join(0.1) } 
end

#process(e) ⇒ Object



65
66
67
# File 'lib/rbbt/util/concurrency/threads.rb', line 65

def process(e)
  queue << e
end