Class: RbbtProcessQueue
- Inherits:
-
Object
- Object
- RbbtProcessQueue
- Defined in:
- lib/rbbt/util/concurrency/processes.rb,
lib/rbbt/util/concurrency/processes/socket.rb,
lib/rbbt/util/concurrency/processes/worker.rb
Defined Under Namespace
Classes: RbbtProcessQueueWorker, RbbtProcessSocket
Instance Attribute Summary collapse
-
#callback(&block) ⇒ Object
Returns the value of attribute callback.
-
#callback_queue ⇒ Object
Returns the value of attribute callback_queue.
-
#callback_thread ⇒ Object
Returns the value of attribute callback_thread.
-
#num_processes ⇒ Object
{{{ RbbtProcessQueue.
-
#process_monitor ⇒ Object
{{{ RbbtProcessQueue.
-
#processes ⇒ Object
{{{ RbbtProcessQueue.
-
#queue ⇒ Object
{{{ RbbtProcessQueue.
Class Method Summary collapse
Instance Method Summary collapse
- #clean ⇒ Object
- #close_callback ⇒ Object
- #init(&block) ⇒ Object
-
#initialize(num_processes) ⇒ RbbtProcessQueue
constructor
A new instance of RbbtProcessQueue.
- #join ⇒ Object
- #process(*e) ⇒ Object
Constructor Details
#initialize(num_processes) ⇒ RbbtProcessQueue
Returns a new instance of RbbtProcessQueue.
9 10 11 12 13 |
# File 'lib/rbbt/util/concurrency/processes.rb', line 9 def initialize(num_processes) @num_processes = num_processes @processes = [] @queue = RbbtProcessSocket.new end |
Instance Attribute Details
#callback(&block) ⇒ Object
Returns the value of attribute callback.
15 16 17 |
# File 'lib/rbbt/util/concurrency/processes.rb', line 15 def callback @callback end |
#callback_queue ⇒ Object
Returns the value of attribute callback_queue.
15 16 17 |
# File 'lib/rbbt/util/concurrency/processes.rb', line 15 def callback_queue @callback_queue end |
#callback_thread ⇒ Object
Returns the value of attribute callback_thread.
15 16 17 |
# File 'lib/rbbt/util/concurrency/processes.rb', line 15 def callback_thread @callback_thread end |
#num_processes ⇒ Object
{{{ RbbtProcessQueue
8 9 10 |
# File 'lib/rbbt/util/concurrency/processes.rb', line 8 def num_processes @num_processes end |
#process_monitor ⇒ Object
{{{ RbbtProcessQueue
8 9 10 |
# File 'lib/rbbt/util/concurrency/processes.rb', line 8 def process_monitor @process_monitor end |
#processes ⇒ Object
{{{ RbbtProcessQueue
8 9 10 |
# File 'lib/rbbt/util/concurrency/processes.rb', line 8 def processes @processes end |
#queue ⇒ Object
{{{ RbbtProcessQueue
8 9 10 |
# File 'lib/rbbt/util/concurrency/processes.rb', line 8 def queue @queue end |
Class Method Details
.each(list, num = 3, &block) ⇒ Object
91 92 93 94 95 96 |
# File 'lib/rbbt/util/concurrency/processes.rb', line 91 def self.each(list, num = 3, &block) q = RbbtProcessQueue.new num q.init(&block) list.each do |elem| q.process elem end q.join end |
Instance Method Details
#clean ⇒ Object
82 83 84 85 |
# File 'lib/rbbt/util/concurrency/processes.rb', line 82 def clean @processes.each{|p| p.abort }.clear @callback_thread.raise Aborted if @callback_thread and @callback_thread.alive? end |
#close_callback ⇒ Object
68 69 70 |
# File 'lib/rbbt/util/concurrency/processes.rb', line 68 def close_callback @callback_thread.join if @callback_thread and @callback_thread.alive? end |
#init(&block) ⇒ Object
41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 |
# File 'lib/rbbt/util/concurrency/processes.rb', line 41 def init(&block) num_processes.times do |i| @processes << RbbtProcessQueueWorker.new(@queue, @callback_queue, &block) end @queue.sread.close @callback_queue.swrite.close if @callback_queue @process_monitor = Thread.new(Thread.current) do |parent| begin while @processes.any? do pid = Process.wait -1, Process::WNOHANG if pid next unless @processes.collect{|p| p.pid }.include? pid @processes.delete_if{|p| p.pid == pid} raise "Process #{pid} failed" unless $?.success? else sleep 1 end end rescue parent.raise $! ensure Thread.exit end end end |
#join ⇒ Object
72 73 74 75 76 77 78 79 80 |
# File 'lib/rbbt/util/concurrency/processes.rb', line 72 def join @queue.push ClosedStream.new @queue.swrite.close begin @process_monitor.join ensure close_callback if @callback end end |
#process(*e) ⇒ Object
87 88 89 |
# File 'lib/rbbt/util/concurrency/processes.rb', line 87 def process(*e) @queue.push e end |