Class: RbbtProcessQueue
- Inherits:
-
Object
- Object
- RbbtProcessQueue
- Defined in:
- lib/rbbt/util/concurrency/processes.rb,
lib/rbbt/util/concurrency/processes/socket.rb,
lib/rbbt/util/concurrency/processes/worker.rb
Defined Under Namespace
Classes: RbbtProcessQueueWorker, RbbtProcessSocket
Instance Attribute Summary collapse
-
#callback(&block) ⇒ Object
Returns the value of attribute callback.
-
#callback_queue ⇒ Object
Returns the value of attribute callback_queue.
-
#callback_thread ⇒ Object
Returns the value of attribute callback_thread.
-
#cleanup ⇒ Object
Returns the value of attribute cleanup.
-
#join ⇒ Object
Returns the value of attribute join.
-
#num_processes ⇒ Object
Returns the value of attribute num_processes.
-
#process_monitor ⇒ Object
Returns the value of attribute process_monitor.
-
#processes ⇒ Object
Returns the value of attribute processes.
-
#queue ⇒ Object
Returns the value of attribute queue.
Class Method Summary collapse
Instance Method Summary collapse
- #abort ⇒ Object
- #clean ⇒ Object
- #close_callback ⇒ Object
- #init(&block) ⇒ Object
-
#initialize(num_processes, cleanup = nil, join = nil) ⇒ RbbtProcessQueue
constructor
A new instance of RbbtProcessQueue.
- #process(*e) ⇒ Object
Constructor Details
#initialize(num_processes, cleanup = nil, join = nil) ⇒ RbbtProcessQueue
Returns a new instance of RbbtProcessQueue.
7 8 9 10 11 12 13 |
# File 'lib/rbbt/util/concurrency/processes.rb', line 7 def initialize(num_processes, cleanup = nil, join = nil) @num_processes = num_processes @processes = [] @cleanup = cleanup @join = join @queue = RbbtProcessSocket.new end |
Instance Attribute Details
#callback(&block) ⇒ Object
Returns the value of attribute callback.
15 16 17 |
# File 'lib/rbbt/util/concurrency/processes.rb', line 15 def callback @callback end |
#callback_queue ⇒ Object
Returns the value of attribute callback_queue.
15 16 17 |
# File 'lib/rbbt/util/concurrency/processes.rb', line 15 def callback_queue @callback_queue end |
#callback_thread ⇒ Object
Returns the value of attribute callback_thread.
15 16 17 |
# File 'lib/rbbt/util/concurrency/processes.rb', line 15 def callback_thread @callback_thread end |
#cleanup ⇒ Object
Returns the value of attribute cleanup.
6 7 8 |
# File 'lib/rbbt/util/concurrency/processes.rb', line 6 def cleanup @cleanup end |
#join ⇒ Object
Returns the value of attribute join.
6 7 8 |
# File 'lib/rbbt/util/concurrency/processes.rb', line 6 def join @join end |
#num_processes ⇒ Object
Returns the value of attribute num_processes.
6 7 8 |
# File 'lib/rbbt/util/concurrency/processes.rb', line 6 def num_processes @num_processes end |
#process_monitor ⇒ Object
Returns the value of attribute process_monitor.
6 7 8 |
# File 'lib/rbbt/util/concurrency/processes.rb', line 6 def process_monitor @process_monitor end |
#processes ⇒ Object
Returns the value of attribute processes.
6 7 8 |
# File 'lib/rbbt/util/concurrency/processes.rb', line 6 def processes @processes end |
#queue ⇒ Object
Returns the value of attribute queue.
6 7 8 |
# File 'lib/rbbt/util/concurrency/processes.rb', line 6 def queue @queue end |
Class Method Details
.each(list, num = 3, &block) ⇒ Object
128 129 130 131 132 133 |
# File 'lib/rbbt/util/concurrency/processes.rb', line 128 def self.each(list, num = 3, &block) q = RbbtProcessQueue.new num q.init(&block) list.each do |elem| q.process elem end q.join end |
Instance Method Details
#abort ⇒ Object
111 112 113 114 115 116 117 118 |
# File 'lib/rbbt/util/concurrency/processes.rb', line 111 def abort begin @process_monitor.raise(Aborted.new); @process_monitor.join if @process_monitor and @process_monitor.alive? @callback_thread.raise(Aborted.new); @callback_thread.join if @callback_thread and @callback_thread.alive? ensure join end end |
#clean ⇒ Object
103 104 105 106 107 108 109 |
# File 'lib/rbbt/util/concurrency/processes.rb', line 103 def clean if @process_monitor.alive? or @callback_thread.alive? self.abort else self.join end end |
#close_callback ⇒ Object
73 74 75 76 77 78 79 80 |
# File 'lib/rbbt/util/concurrency/processes.rb', line 73 def close_callback begin @callback_queue.push ClosedStream.new if @callback_thread.alive? rescue Exception Log.warn "Error closing callback: #{$!.}" end @callback_thread.join #if @callback_thread.alive? end |
#init(&block) ⇒ Object
48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 |
# File 'lib/rbbt/util/concurrency/processes.rb', line 48 def init(&block) num_processes.times do |i| @processes << RbbtProcessQueueWorker.new(@queue, @callback_queue, @cleanup, &block) end @queue.close_read @process_monitor = Thread.new(Thread.current) do |parent| begin while @processes.any? @processes[0].join @processes.shift end rescue Aborted Log.warn "Aborting process monitor" @processes.each{|p| p.abort } @processes.each{|p| p.join } rescue Exception Log.warn "Process monitor exception: #{$!.}" @processes.each{|p| p.abort } @callback_thread.raise $! if @callback_thread and @callback_thread.alive? raise $! end end end |
#process(*e) ⇒ Object
120 121 122 123 124 125 126 |
# File 'lib/rbbt/util/concurrency/processes.rb', line 120 def process(*e) begin @queue.push e rescue Errno::EPIPE raise Aborted end end |