Class: RbEAI::PipeTask

Inherits:
Object
  • Object
show all
Defined in:
lib/rbeai/PipeTask.rb

Direct Known Subclasses

RouterPipeTask

Instance Method Summary collapse

Constructor Details

#initialize(inputQueue, task, control) ⇒ PipeTask

Returns a new instance of PipeTask.



85
86
87
88
89
90
91
92
93
94
95
96
97
# File 'lib/rbeai/PipeTask.rb', line 85

def initialize(inputQueue, task, control)
  @inputQueue  = inputQueue
  @task = task
  @control = control
  @resultQueue = Queue.new
  @bufferQueue = task.respond_to?(:doBuffered) ? Queue.new : nil    
  @nextPipe = if @task.respond_to?(:nextTask) and @task.nextTask !=  nil 
                getNextPipe(@resultQueue, @task.nextTask, Control.new(Queue.new, @resultQueue))
              else nil               
              end
  @size = task.size
  @threadPool = ThreadPool.new(@size, @task, @task.method(:doJob), @inputQueue, @resultQueue, @bufferQueue, control.controlQueue)    
end

Instance Method Details

#finishObject



113
114
115
116
# File 'lib/rbeai/PipeTask.rb', line 113

def finish()
  @control.finish()
  #@inputQueue.enq(:FINISH_THREADS)
end

#runObject



99
100
101
102
103
# File 'lib/rbeai/PipeTask.rb', line 99

def run()
  @nextPipe.run() if @nextPipe != nil       
  @threadPool.start(@task.name)       
  @control.start(@threadPool)    
end

#waitToEndObject



105
106
107
108
109
110
111
# File 'lib/rbeai/PipeTask.rb', line 105

def waitToEnd()    
  #print "JOIN-#{@task.name}\n"    
  @control.waitJoin()
  print "END-#{@task.name}\n"
  @nextPipe.finish() if @nextPipe != nil
  @nextPipe.waitToEnd() if @nextPipe != nil
end