Class: RbbtProcessQueue

Inherits:
Object
  • Object
show all
Defined in:
lib/rbbt/util/concurrency/processes.rb,
lib/rbbt/util/concurrency/processes/socket.rb,
lib/rbbt/util/concurrency/processes/worker.rb

Defined Under Namespace

Classes: RbbtProcessQueueWorker, RbbtProcessSocket

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(num_processes, cleanup = nil, join = nil, reswpan = nil, offset = false) ⇒ RbbtProcessQueue

Returns a new instance of RbbtProcessQueue.



7
8
9
10
11
12
13
14
15
# File 'lib/rbbt/util/concurrency/processes.rb', line 7

def initialize(num_processes, cleanup = nil, join = nil, reswpan = nil, offset = false)
  @num_processes = num_processes
  @processes = []
  @cleanup = cleanup
  @join = join
  @respawn = reswpan
  @offset = offset
  @queue = RbbtProcessSocket.new
end

Instance Attribute Details

#callback(&block) ⇒ Object

Returns the value of attribute callback.



17
18
19
# File 'lib/rbbt/util/concurrency/processes.rb', line 17

def callback
  @callback
end

#callback_queueObject

Returns the value of attribute callback_queue.



17
18
19
# File 'lib/rbbt/util/concurrency/processes.rb', line 17

def callback_queue
  @callback_queue
end

#callback_threadObject

Returns the value of attribute callback_thread.



17
18
19
# File 'lib/rbbt/util/concurrency/processes.rb', line 17

def callback_thread
  @callback_thread
end

#cleanupObject

Returns the value of attribute cleanup.



6
7
8
# File 'lib/rbbt/util/concurrency/processes.rb', line 6

def cleanup
  @cleanup
end

#joinObject

Returns the value of attribute join.



6
7
8
# File 'lib/rbbt/util/concurrency/processes.rb', line 6

def join
  @join
end

#num_processesObject

Returns the value of attribute num_processes.



6
7
8
# File 'lib/rbbt/util/concurrency/processes.rb', line 6

def num_processes
  @num_processes
end

#offsetObject

Returns the value of attribute offset.



6
7
8
# File 'lib/rbbt/util/concurrency/processes.rb', line 6

def offset
  @offset
end

#process_monitorObject

Returns the value of attribute process_monitor.



6
7
8
# File 'lib/rbbt/util/concurrency/processes.rb', line 6

def process_monitor
  @process_monitor
end

#processesObject

Returns the value of attribute processes.



6
7
8
# File 'lib/rbbt/util/concurrency/processes.rb', line 6

def processes
  @processes
end

#queueObject

Returns the value of attribute queue.



6
7
8
# File 'lib/rbbt/util/concurrency/processes.rb', line 6

def queue
  @queue
end

#reswpanObject

Returns the value of attribute reswpan.



6
7
8
# File 'lib/rbbt/util/concurrency/processes.rb', line 6

def reswpan
  @reswpan
end

Class Method Details

.each(list, num = 3, &block) ⇒ Object



160
161
162
163
164
165
# File 'lib/rbbt/util/concurrency/processes.rb', line 160

def self.each(list, num = 3, &block)
  q = RbbtProcessQueue.new num
  q.init(&block)
  list.each do |elem| q.process elem end
  q.join
end

Instance Method Details

#abortObject



140
141
142
143
144
145
146
147
148
149
150
# File 'lib/rbbt/util/concurrency/processes.rb', line 140

def abort
  begin
    (@process_monitor.raise(Aborted.new); @process_monitor.join) if @process_monitor and @process_monitor.alive?
    (@callback_thread.raise(Aborted.new); @callback_thread.join) if @callback_thread and @callback_thread.alive?
  ensure
    begin
      join
    rescue ProcessFailed
    end
  end
end

#cleanObject



130
131
132
133
134
135
136
137
138
# File 'lib/rbbt/util/concurrency/processes.rb', line 130

def clean
  if (@process_monitor and @process_monitor.alive?) or (@callback_thread and @callback_thread.alive?)
    self.abort 
    self.join
  end

  @queue.clean if @queue
  @callback_queue.clean if @callback_queue
end

#close_callbackObject



97
98
99
100
101
102
103
104
# File 'lib/rbbt/util/concurrency/processes.rb', line 97

def close_callback
  begin
    @callback_queue.push ClosedStream.new if @callback_thread.alive?
  rescue Exception
    Log.warn "Error closing callback: #{$!.message}"
  end
  @callback_thread.join  #if @callback_thread.alive?
end

#init(&block) ⇒ Object



61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
# File 'lib/rbbt/util/concurrency/processes.rb', line 61

def init(&block)
  num_processes.times do |i|
    @processes << RbbtProcessQueueWorker.new(@queue, @callback_queue, @cleanup, @respawn, @offset, &block)
  end
  @queue.close_read

  @process_monitor = Thread.new(Thread.current) do |parent|
    begin
      while @processes.any?
        @processes[0].join 
        @processes.shift
      end
    rescue Aborted
      Log.warn "Aborting process monitor"
      @processes.each{|p| p.abort }
      @processes.each{|p| 
        begin
          p.join 
        rescue ProcessFailed
        end
      }
    rescue Exception
      Log.warn "Process monitor exception: #{$!.message}"
      @processes.each{|p| p.abort }
      @processes.each{|p| 
        begin
          p.join 
        rescue ProcessFailed
        end
      }
      @callback_thread.raise $! if @callback_thread and @callback_thread.alive?
      raise $!
    end
  end
end

#process(*e) ⇒ Object



152
153
154
155
156
157
158
# File 'lib/rbbt/util/concurrency/processes.rb', line 152

def process(*e)
  begin
    @queue.push e
  rescue Errno::EPIPE
    raise Aborted
  end
end