Class: RbbtProcessQueue::RbbtProcessQueueWorker

Inherits:
Object
  • Object
show all
Defined in:
lib/rbbt/util/concurrency/processes/worker.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(queue, callback_queue = nil, cleanup = nil, &block) ⇒ RbbtProcessQueueWorker

Returns a new instance of RbbtProcessQueueWorker.



5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# File 'lib/rbbt/util/concurrency/processes/worker.rb', line 5

def initialize(queue, callback_queue = nil, cleanup = nil, &block)
  @queue, @callback_queue, @cleanup, @block = queue, callback_queue, cleanup, block

  @pid = Process.fork do
    begin
      @cleanup.call if @cleanup
      @queue.close_write 

      if @callback_queue
        Misc.purge_pipes(@callback_queue.swrite) 
        @callback_queue.close_read 
      else
        Misc.purge_pipes
      end

      Signal.trap(:INT){ 
        Kernel.exit! -1
      }

      loop do
        p = @queue.pop
        next if p.nil?
        raise p if Exception === p
        raise p.first if Exception === p.first
        res = @block.call *p
        @callback_queue.push res if @callback_queue
      end
      Kernel.exit! 0
    rescue ClosedStream
    rescue Aborted, Interrupt
      Log.warn "Worker #{Process.pid} aborted"
      Kernel.exit! -1
    rescue Exception
      @callback_queue.push($!) if @callback_queue
      Kernel.exit! -1
    ensure
      @callback_queue.close_write if @callback_queue 
    end
  end
end

Instance Attribute Details

#blockObject (readonly)

Returns the value of attribute block.



4
5
6
# File 'lib/rbbt/util/concurrency/processes/worker.rb', line 4

def block
  @block
end

#callback_queueObject (readonly)

Returns the value of attribute callback_queue.



4
5
6
# File 'lib/rbbt/util/concurrency/processes/worker.rb', line 4

def callback_queue
  @callback_queue
end

#cleanupObject (readonly)

Returns the value of attribute cleanup.



4
5
6
# File 'lib/rbbt/util/concurrency/processes/worker.rb', line 4

def cleanup
  @cleanup
end

#pidObject (readonly)

Returns the value of attribute pid.



4
5
6
# File 'lib/rbbt/util/concurrency/processes/worker.rb', line 4

def pid
  @pid
end

#queueObject (readonly)

Returns the value of attribute queue.



4
5
6
# File 'lib/rbbt/util/concurrency/processes/worker.rb', line 4

def queue
  @queue
end

Instance Method Details

#abortObject



50
51
52
53
54
55
# File 'lib/rbbt/util/concurrency/processes/worker.rb', line 50

def abort
  begin
    Process.kill :INT, @pid
  rescue
  end
end

#done?Boolean

Returns:

  • (Boolean)


57
58
59
60
61
62
63
64
65
# File 'lib/rbbt/util/concurrency/processes/worker.rb', line 57

def done?
  begin
    Process.waitpid @pid, Process::WNOHANG
  rescue Errno::ECHILD
    true
  rescue
    false
  end
end

#joinObject



46
47
48
# File 'lib/rbbt/util/concurrency/processes/worker.rb', line 46

def join
  joined_pid = Process.waitpid @pid
end