Class: RbEAI::ThreadPool

Inherits:
Object
  • Object
show all
Defined in:
lib/rbeai/PipeTask.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(size, task, method, inputQueue, resultQueue, bufferQueue, controlQueue) ⇒ ThreadPool

Returns a new instance of ThreadPool.



41
42
43
44
45
46
47
48
49
50
# File 'lib/rbeai/PipeTask.rb', line 41

def initialize(size, task, method, inputQueue, resultQueue, bufferQueue, controlQueue)
  @size = size
  @task = task
  @method = method
  @inputQueue = inputQueue
  @resultQueue = resultQueue
  @bufferQueue = bufferQueue
  @controlQueue = controlQueue
  @threadgroup = ThreadsWait.new
end

Instance Attribute Details

#sizeObject

Returns the value of attribute size.



39
40
41
# File 'lib/rbeai/PipeTask.rb', line 39

def size
  @size
end

Instance Method Details

#start(name) ⇒ Object



52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
# File 'lib/rbeai/PipeTask.rb', line 52

def start(name)     
  @size.times do 
    th = Thread.new(@method) do |method|          
      #print "Running #{name} th \n"          
      begin           
        obj = @inputQueue.deq
        #print "#{name} -> #{obj}\n"
        if obj != :FINISH_WORK && obj != :FINISH_THREADS
          if @bufferQueue != nil
            method.call(@bufferQueue, obj)
            @resultQueue.enq(obj)
          else
            method.call(@resultQueue, obj)
          end
        elsif obj == :FINISH_THREADS
          @controlQueue.enq(:FINISH_THREADS)
        end
      end until obj == :FINISH_WORK          
      #print "Finished th \n"
    end      
    @threadgroup.join_nowait(th)      
  end     
end

#waitJoinAllObject



76
77
78
79
# File 'lib/rbeai/PipeTask.rb', line 76

def waitJoinAll()  
  @threadgroup.all_waits
  @task.doBuffered(@bufferQueue) if @bufferQueue != nil 
end