Class: RbbtProcessQueue::RbbtProcessQueueWorker

Inherits:
Object
  • Object
show all
Defined in:
lib/rbbt/util/concurrency/processes/worker.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(queue, callback_queue = nil, &block) ⇒ RbbtProcessQueueWorker

Returns a new instance of RbbtProcessQueueWorker.



5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# File 'lib/rbbt/util/concurrency/processes/worker.rb', line 5

def initialize(queue, callback_queue = nil, &block)
  @queue, @callback_queue, @block = queue, callback_queue, block

  @pid = Process.fork do
    begin
      @queue.swrite.close
      @callback_queue.sread.close if @callback_queue

      Signal.trap(:INT){ raise Aborted; }
      loop do
        p = @queue.pop
        raise p if Exception === p
        raise p.first if Exception === p.first
        res = @block.call *p
        @callback_queue.push res if @callback_queue
      end

      exit 0
    rescue ClosedStream
      exit 0
    rescue Aborted
      exit -1
    rescue Exception
      Log.exception $!
      @callback_queue.push($!) if @callback_queue
      exit -1
    end

  end
end

Instance Attribute Details

#blockObject

Returns the value of attribute block.



4
5
6
# File 'lib/rbbt/util/concurrency/processes/worker.rb', line 4

def block
  @block
end

#callback_queueObject

Returns the value of attribute callback_queue.



4
5
6
# File 'lib/rbbt/util/concurrency/processes/worker.rb', line 4

def callback_queue
  @callback_queue
end

#pidObject

Returns the value of attribute pid.



4
5
6
# File 'lib/rbbt/util/concurrency/processes/worker.rb', line 4

def pid
  @pid
end

#queueObject

Returns the value of attribute queue.



4
5
6
# File 'lib/rbbt/util/concurrency/processes/worker.rb', line 4

def queue
  @queue
end

Instance Method Details

#abortObject



40
41
42
# File 'lib/rbbt/util/concurrency/processes/worker.rb', line 40

def abort
  Process.kill :INT, @pid
end

#done?Boolean

Returns:

  • (Boolean)


44
45
46
47
48
49
50
51
52
# File 'lib/rbbt/util/concurrency/processes/worker.rb', line 44

def done?
  begin
    Process.waitpid @pid, Process::WNOHANG
  rescue Errno::ECHILD
    true
  rescue
    false
  end
end

#joinObject



36
37
38
# File 'lib/rbbt/util/concurrency/processes/worker.rb', line 36

def join
  Process.waitpid @pid
end