Class: RbbtProcessQueue

Inherits:
Object
  • Object
show all
Defined in:
lib/rbbt/util/concurrency/processes.rb,
lib/rbbt/util/concurrency/processes/socket.rb,
lib/rbbt/util/concurrency/processes/worker.rb

Defined Under Namespace

Classes: RbbtProcessQueueWorker, RbbtProcessSocket

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(num_processes) ⇒ RbbtProcessQueue

Returns a new instance of RbbtProcessQueue.



9
10
11
12
13
# File 'lib/rbbt/util/concurrency/processes.rb', line 9

def initialize(num_processes)
  @num_processes = num_processes
  @processes = []
  @queue = RbbtProcessSocket.new
end

Instance Attribute Details

#callback(&block) ⇒ Object

Returns the value of attribute callback.



15
16
17
# File 'lib/rbbt/util/concurrency/processes.rb', line 15

def callback
  @callback
end

#callback_queueObject

Returns the value of attribute callback_queue.



15
16
17
# File 'lib/rbbt/util/concurrency/processes.rb', line 15

def callback_queue
  @callback_queue
end

#callback_threadObject

Returns the value of attribute callback_thread.



15
16
17
# File 'lib/rbbt/util/concurrency/processes.rb', line 15

def callback_thread
  @callback_thread
end

#num_processesObject

{{{ RbbtProcessQueue



8
9
10
# File 'lib/rbbt/util/concurrency/processes.rb', line 8

def num_processes
  @num_processes
end

#process_monitorObject

{{{ RbbtProcessQueue



8
9
10
# File 'lib/rbbt/util/concurrency/processes.rb', line 8

def process_monitor
  @process_monitor
end

#processesObject

{{{ RbbtProcessQueue



8
9
10
# File 'lib/rbbt/util/concurrency/processes.rb', line 8

def processes
  @processes
end

#queueObject

{{{ RbbtProcessQueue



8
9
10
# File 'lib/rbbt/util/concurrency/processes.rb', line 8

def queue
  @queue
end

Class Method Details

.each(list, num = 3, &block) ⇒ Object



91
92
93
94
95
96
# File 'lib/rbbt/util/concurrency/processes.rb', line 91

def self.each(list, num = 3, &block)
  q = RbbtProcessQueue.new num
  q.init(&block)
  list.each do |elem| q.process elem end
  q.join
end

Instance Method Details

#cleanObject



82
83
84
85
# File 'lib/rbbt/util/concurrency/processes.rb', line 82

def clean
  @processes.each{|p| p.abort }.clear
  @callback_thread.raise Aborted if @callback_thread and @callback_thread.alive?
end

#close_callbackObject



68
69
70
# File 'lib/rbbt/util/concurrency/processes.rb', line 68

def close_callback
  @callback_thread.join if @callback_thread and @callback_thread.alive?
end

#init(&block) ⇒ Object



41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
# File 'lib/rbbt/util/concurrency/processes.rb', line 41

def init(&block)
  num_processes.times do |i|
    @processes << RbbtProcessQueueWorker.new(@queue, @callback_queue, &block)
  end
  @queue.sread.close
  @callback_queue.swrite.close if @callback_queue

  @process_monitor = Thread.new(Thread.current) do |parent|
    begin
      while @processes.any? do
        pid = Process.wait -1, Process::WNOHANG
        if pid
          next unless @processes.collect{|p| p.pid }.include? pid
          @processes.delete_if{|p| p.pid == pid}
          raise "Process #{pid} failed" unless $?.success?
        else
          sleep 1
        end
      end
    rescue
      parent.raise $!
    ensure
      Thread.exit
    end
  end
end

#joinObject



72
73
74
75
76
77
78
79
80
# File 'lib/rbbt/util/concurrency/processes.rb', line 72

def join
  @queue.push ClosedStream.new
  @queue.swrite.close
  begin
    @process_monitor.join
  ensure
    close_callback if @callback
  end
end

#process(*e) ⇒ Object



87
88
89
# File 'lib/rbbt/util/concurrency/processes.rb', line 87

def process(*e)
  @queue.push e
end