Class: RbbtProcessQueue

Inherits:
Object
  • Object
show all
Defined in:
lib/rbbt/util/concurrency/processes.rb,
lib/rbbt/util/concurrency/processes/socket.rb,
lib/rbbt/util/concurrency/processes/worker.rb

Defined Under Namespace

Classes: RbbtProcessQueueWorker, RbbtProcessSocket

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(num_processes, cleanup = nil, join = nil, reswpan = nil, offset = false) ⇒ RbbtProcessQueue

Returns a new instance of RbbtProcessQueue.



7
8
9
10
11
12
13
14
15
16
# File 'lib/rbbt/util/concurrency/processes.rb', line 7

def initialize(num_processes, cleanup = nil, join = nil, reswpan = nil, offset = false)
  @num_processes = num_processes
  @processes = []
  @cleanup = cleanup
  @join = join
  @respawn = reswpan
  @offset = offset
  @queue = RbbtProcessSocket.new
  @process_mutex = Mutex.new
end

Instance Attribute Details

#callback(&block) ⇒ Object

Returns the value of attribute callback.



18
19
20
# File 'lib/rbbt/util/concurrency/processes.rb', line 18

def callback
  @callback
end

#callback_queueObject

Returns the value of attribute callback_queue.



18
19
20
# File 'lib/rbbt/util/concurrency/processes.rb', line 18

def callback_queue
  @callback_queue
end

#callback_threadObject

Returns the value of attribute callback_thread.



18
19
20
# File 'lib/rbbt/util/concurrency/processes.rb', line 18

def callback_thread
  @callback_thread
end

#cleanupObject

Returns the value of attribute cleanup.



6
7
8
# File 'lib/rbbt/util/concurrency/processes.rb', line 6

def cleanup
  @cleanup
end

#joinObject

Returns the value of attribute join.



6
7
8
# File 'lib/rbbt/util/concurrency/processes.rb', line 6

def join
  @join
end

#num_processesObject

Returns the value of attribute num_processes.



6
7
8
# File 'lib/rbbt/util/concurrency/processes.rb', line 6

def num_processes
  @num_processes
end

#offsetObject

Returns the value of attribute offset.



6
7
8
# File 'lib/rbbt/util/concurrency/processes.rb', line 6

def offset
  @offset
end

#process_monitorObject

Returns the value of attribute process_monitor.



6
7
8
# File 'lib/rbbt/util/concurrency/processes.rb', line 6

def process_monitor
  @process_monitor
end

#processesObject

Returns the value of attribute processes.



6
7
8
# File 'lib/rbbt/util/concurrency/processes.rb', line 6

def processes
  @processes
end

#queueObject

Returns the value of attribute queue.



6
7
8
# File 'lib/rbbt/util/concurrency/processes.rb', line 6

def queue
  @queue
end

#reswpanObject

Returns the value of attribute reswpan.



6
7
8
# File 'lib/rbbt/util/concurrency/processes.rb', line 6

def reswpan
  @reswpan
end

Class Method Details

.each(list, num = 3, &block) ⇒ Object



309
310
311
312
313
314
# File 'lib/rbbt/util/concurrency/processes.rb', line 309

def self.each(list, num = 3, &block)
  q = RbbtProcessQueue.new num
  q.init(&block)
  list.each do |elem| q.process elem end
  q.join
end

Instance Method Details

#_abortObject



271
272
273
274
275
276
277
278
279
280
281
282
# File 'lib/rbbt/util/concurrency/processes.rb', line 271

def _abort
  begin
    Process.kill 20, @master_pid
  rescue Errno::ECHILD, Errno::ESRCH
    Log.debug "Cannot kill #{@master_pid}: #{$!.message}"
  end

  begin
    _join
  rescue ProcessFailed
  end
end

#_joinObject



229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
# File 'lib/rbbt/util/concurrency/processes.rb', line 229

def _join
  error = true
  begin
    pid, status = Process.waitpid2 @master_pid
    error = false if status.success?
    raise ProcessFailed if error
  rescue Errno::ECHILD
  rescue Aborted
    Log.error "Aborted joining queue"
    raise $!
  rescue Exception
    Log.error "Exception joining queue: #{$!.message}"
    raise $!
  ensure
    if @join
      if @join.arity == 1
        @join.call(error) 
      else
        @join.call
      end
    end
  end

end

#abortObject

Raises:



284
285
286
287
288
# File 'lib/rbbt/util/concurrency/processes.rb', line 284

def abort
  _abort
  (@callback_thread.raise(Aborted.new); @callback_thread.join) if @callback_thread and @callback_thread.alive?
  raise Aborted.new
end

#add_processObject



208
209
210
# File 'lib/rbbt/util/concurrency/processes.rb', line 208

def add_process
  Process.kill :USR1, @master_pid
end

#cleanObject



290
291
292
293
294
295
296
297
298
# File 'lib/rbbt/util/concurrency/processes.rb', line 290

def clean
  begin
    self.abort if Misc.pid_exists?(@master_pid)

  ensure
    @queue.clean if @queue
    @callback_queue.clean if @callback_queue
  end
end

#close_callbackObject



216
217
218
219
220
221
222
223
224
225
226
227
# File 'lib/rbbt/util/concurrency/processes.rb', line 216

def close_callback
  return unless @callback_thread.alive?
  begin
    t = Thread.new do
      @callback_queue.push ClosedStream.new
    end
  rescue Exception
    Log.warn "Error closing callback: #{$!.message}"
  end
  @callback_thread.join  #if @callback_thread.alive?
  t.join
end

#init(&block) ⇒ Object



62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
# File 'lib/rbbt/util/concurrency/processes.rb', line 62

def init(&block)
  @init_block = block

  @master_pid = Process.fork do
    @close_up = false
    Signal.trap(:INT) do
      @close_up = true
      Misc.insist([0,0.01,0.1,0.2,0.5]) do
        raise TryAgain unless @manager_thread
        if @manager_thread.alive?
          raise "Manager thread for #{Process.pid} Not working yet" unless @manager_thread["working"]
          @manager_thread.raise TryAgain
        end
      end
    end

    if @callback_queue
      Misc.purge_pipes(@queue.swrite,@queue.sread,@callback_queue.swrite, @callback_queue.sread) 
    else
      Misc.purge_pipes(@queue.swrite,@queue.sread) 
    end

    @total = num_processes
    @count = 0
    @processes = []

    @manager_thread = Thread.new do
      begin
      while true 
        begin
          Thread.current["working"] = true
          if @close_up
            Log.debug "Closing up process queue #{Process.pid}"
            @count = 0
            Thread.new do
              Log.debug "Pushing closed stream #{Process.pid}"
              while true
                @queue.push ClosedStream.new unless @queue.cleaned 
              end unless @processes.empty?
            end
            @close_up = false
          end

          begin
            sleep 3
          rescue TryAgain
          end

          raise TryAgain if @close_up

          @process_mutex.synchronize do
            while @count > 0
              @count -= 1
              @total += 1
              @processes << RbbtProcessQueueWorker.new(@queue, @callback_queue, @cleanup, @respawn, @offset, &@init_block)
              Log.warn "Added process #{@processes.last.pid} to #{Process.pid} (#{@processes.length})"
            end

            while @count < 0
              @count += 1
              next unless @processes.length > 1
              first = @processes.shift
              first.stop
              Log.warn "Removed process #{first.pid} from #{Process.pid} (#{@processes.length})"
            end
          end
        rescue TryAgain
          retry
        rescue Aborted
          Log.low "Aborting manager thread #{Process.pid}"
          raise Aborted
        rescue Exception
          Log.exception $!
          raise Exception
        end
      end
      rescue Exception
        Log.exception $!
        raise $!
      end
    end

    Signal.trap(:USR1) do
      @count += 1
      @manager_thread.raise TryAgain
    end

    Signal.trap(:USR2) do
      @count -= 1
      @manager_thread.raise TryAgain
    end


    @callback_queue.close_read if @callback_queue

    num_processes.times do |i|
      @process_mutex.synchronize do
        @processes << RbbtProcessQueueWorker.new(@queue, @callback_queue, @cleanup, @respawn, @offset, &@init_block)
      end
    end

    @monitor_thread = Thread.new do
      begin
        while @processes.any?
          @processes[0].join 
          @processes.shift
        end
      rescue Aborted
        Log.warn "Aborting process monitor"
        @processes.each{|p| p.abort_and_join}
        @processes.clear

        @callback_thread.kill if @callback_thread && @callback_thread.alive?
        @manager_thread.kill if @manager_thread.alive?
      rescue Exception
        Log.warn "Process monitor exception: #{$!.message}"
        @processes.each{|p| p.abort_and_join}
        @processes.clear

        @callback_thread.kill if @callback_thread && @callback_thread.alive?
        @manager_thread.kill if @manager_thread.alive?
      end
    end

    Signal.trap(20) do
      begin
        @monitor_thread.raise Aborted.new
      rescue Exception
        Log.exception $!
      end
    end

    begin
      @monitor_thread.join
    rescue Exception
      Log.exception $!
    end

    Kernel.exit! 0
  end

  Log.info "Cpu process (#{num_processes}) started with master: #{@master_pid}"
  
  @queue.close_read
end

#process(*e) ⇒ Object



301
302
303
304
305
306
307
# File 'lib/rbbt/util/concurrency/processes.rb', line 301

def process(*e)
  begin
    @queue.push e
  rescue Errno::EPIPE
    raise Aborted
  end
end

#remove_processObject



212
213
214
# File 'lib/rbbt/util/concurrency/processes.rb', line 212

def remove_process
  Process.kill :USR2, @master_pid
end