Class: RbbtProcessQueue

Inherits:
Object
  • Object
show all
Defined in:
lib/rbbt/util/concurrency/processes.rb,
lib/rbbt/util/concurrency/processes/socket.rb,
lib/rbbt/util/concurrency/processes/worker.rb

Defined Under Namespace

Classes: RbbtProcessQueueWorker, RbbtProcessSocket

Constant Summary collapse

ABORT_SIGNAL =
:INT
CLOSE_SIGNAL =
:PIPE

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(num_processes, cleanup = nil, join = nil, reswpan = nil, offset = false) ⇒ RbbtProcessQueue

Returns a new instance of RbbtProcessQueue.



7
8
9
10
11
12
13
14
15
16
17
18
19
# File 'lib/rbbt/util/concurrency/processes.rb', line 7

def initialize(num_processes, cleanup = nil, join = nil, reswpan = nil, offset = false)
  @num_processes = num_processes
  @cleanup = cleanup
  @join = join
  @respawn = reswpan
  @offset = offset
  @queue = RbbtProcessSocket.new

  key = "/" << rand(1000000000).to_s << '.' << Process.pid.to_s;
  @sem = key + '.process'
  Log.debug "Creating process semaphore: #{@sem}"
  RbbtSemaphore.create_semaphore(@sem,1)
end

Instance Attribute Details

#callback(&block) ⇒ Object

Returns the value of attribute callback.



25
26
27
# File 'lib/rbbt/util/concurrency/processes.rb', line 25

def callback
  @callback
end

#callback_queueObject

Returns the value of attribute callback_queue.



25
26
27
# File 'lib/rbbt/util/concurrency/processes.rb', line 25

def callback_queue
  @callback_queue
end

#callback_threadObject

Returns the value of attribute callback_thread.



25
26
27
# File 'lib/rbbt/util/concurrency/processes.rb', line 25

def callback_thread
  @callback_thread
end

#cleanupObject

Returns the value of attribute cleanup.



6
7
8
# File 'lib/rbbt/util/concurrency/processes.rb', line 6

def cleanup
  @cleanup
end

#joinObject

Returns the value of attribute join.



6
7
8
# File 'lib/rbbt/util/concurrency/processes.rb', line 6

def join
  @join
end

#num_processesObject

Returns the value of attribute num_processes.



6
7
8
# File 'lib/rbbt/util/concurrency/processes.rb', line 6

def num_processes
  @num_processes
end

#offsetObject

Returns the value of attribute offset.



6
7
8
# File 'lib/rbbt/util/concurrency/processes.rb', line 6

def offset
  @offset
end

#process_monitorObject

Returns the value of attribute process_monitor.



6
7
8
# File 'lib/rbbt/util/concurrency/processes.rb', line 6

def process_monitor
  @process_monitor
end

#queueObject

Returns the value of attribute queue.



6
7
8
# File 'lib/rbbt/util/concurrency/processes.rb', line 6

def queue
  @queue
end

#reswpanObject

Returns the value of attribute reswpan.



6
7
8
# File 'lib/rbbt/util/concurrency/processes.rb', line 6

def reswpan
  @reswpan
end

Class Method Details

.each(list, num = 3, &block) ⇒ Object



375
376
377
378
379
380
# File 'lib/rbbt/util/concurrency/processes.rb', line 375

def self.each(list, num = 3, &block)
  q = RbbtProcessQueue.new num
  q.init(&block)
  list.each do |elem| q.process elem end
  q.join
end

Instance Method Details

#_abortObject



335
336
337
338
339
340
341
342
# File 'lib/rbbt/util/concurrency/processes.rb', line 335

def _abort
  begin
    Log.warn "Aborting process queue #{@master_pid}"
    Process.kill ABORT_SIGNAL, @master_pid
  rescue Errno::ECHILD, Errno::ESRCH
    Log.debug "Cannot abort #{@master_pid}: #{$!.message}"
  end
end

#_joinObject



257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
# File 'lib/rbbt/util/concurrency/processes.rb', line 257

def _join
  error = :redo 
  begin
    pid, @status = Process.waitpid2 @master_pid unless @status
    error = true unless @status.success?
    begin
      @callback_thread.join if @callback_thread
      raise ProcessFailed.new @master_pid unless @status.success?
    rescue
      exception = $!
      raise $!
    end
    error = false
  rescue Aborted, Interrupt
    exception = $!
    Log.exception $!
    error = true
    if @aborted
      raise $!
    else
      self.abort
      Log.high "Process queue #{@master_pid} aborted"
      retry
    end
  rescue Errno::ESRCH, Errno::ECHILD
    retry if Misc.pid_exists? @master_pid
    error = ! @status.success?
  ensure
    begin
      begin
        self.abort
      ensure
        _join 
      end if error == :redo

      begin
        @callback_thread.join 
      rescue Exception
      end

      Log.medium "Joining process queue #{"(error) " if error}#{@master_pid} #{@join}" 
      begin
        if @join
          if @join.arity == 1
            @join.call(error) 
          else
            @join.call
          end
        end
      end
    ensure
      self.clean
    end

    if exception
      raise exception 
    else
      raise "Process queue #{@master_pid} failed" 
    end if error
  end
end

#abortObject



344
345
346
347
348
349
350
351
352
# File 'lib/rbbt/util/concurrency/processes.rb', line 344

def abort
  _abort
  @callback_thread.raise(Aborted.new) if @callback_thread and @callback_thread.alive?
  @aborted = true
  begin
    _join
  rescue
  end
end

#add_processObject



249
250
251
# File 'lib/rbbt/util/concurrency/processes.rb', line 249

def add_process
  Process.kill :USR1, @master_pid
end

#cleanObject



354
355
356
357
358
359
360
361
362
363
364
# File 'lib/rbbt/util/concurrency/processes.rb', line 354

def clean
  RbbtSemaphore.delete_semaphore(@sem)
  begin
    self.abort if Misc.pid_exists?(@master_pid)

  ensure
    @queue.clean if @queue
    #@callback_thread.push ClosedStream if @callback_thread && @callback_thread.alive?
    @callback_queue.clean if @callback_queue
  end
end

#close_up_queueObject



319
320
321
322
323
324
325
326
327
# File 'lib/rbbt/util/concurrency/processes.rb', line 319

def close_up_queue
  begin
    RbbtSemaphore.synchronize(@sem) do
      Process.kill CLOSE_SIGNAL, @master_pid
    end
  rescue Errno::ECHILD, Errno::ESRCH
    Log.debug "Cannot kill (CLOSE) #{@master_pid}: #{$!.message}"
  end if Misc.pid_exists? @master_pid 
end

#init(&block) ⇒ Object



212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
# File 'lib/rbbt/util/concurrency/processes.rb', line 212

def init(&block)
  @init_block = block

  init_master

  RbbtSemaphore.synchronize(@sem) do
  @callback_thread = Thread.new do
    begin
      loop do
        p = @callback_queue.pop unless @callback_queue.cleaned

        if Exception === p or (Array === p and Exception === p.first)
          e = Array === p ? p.first : p
          Log.low "Callback recieved exception from worker: #{e.message}" unless Aborted === e or ClosedStream === e
          raise e 
        end

        if @callback.arity == 0
          @callback.call
        else
          @callback.call p
        end
      end
    rescue ClosedStream
      Log.low "Callback thread closing"
    rescue Aborted
      Log.low "Callback thread aborted"
      raise $!
    rescue Exception
      Log.low "Exception captured in callback: #{$!.message}"
      raise $!
    end
  end if @callback_queue
  end

end

#init_masterObject



38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
# File 'lib/rbbt/util/concurrency/processes.rb', line 38

def init_master
  RbbtSemaphore.wait_semaphore(@sem)
  @master_pid = Process.fork do
    @close_up = false
    processes_initiated = false
    processes = []
    process_mutex = Mutex.new

    Signal.trap(CLOSE_SIGNAL) do
      if ! @closing_thread
        @close_up = true 
        Misc.insist([0,0.01,0.1,0.2,0.5]) do
          if ! @manager_thread
            Thread.pass 
            raise "Manager thread for #{Process.pid} not found yet" 
          end

          if @manager_thread.alive?
            raise "Manager thread for #{Process.pid} not working yet" unless @manager_thread["working"]
            @manager_thread.raise TryAgain
          end
        end
      end
    end

    Signal.trap(:USR1) do
      @count += 1
      @manager_thread.raise TryAgain
    end

    Signal.trap(:USR2) do
      @count -= 1
      @manager_thread.raise TryAgain
    end

    Signal.trap(ABORT_SIGNAL) do
      @abort_monitor = true
      @monitor_thread.raise Aborted if @monitor_thread && @monitor_thread.alive?
    end

    if @callback_queue
      Misc.purge_pipes(@queue.swrite, @queue.sread, @callback_queue.swrite) 
    else
      Misc.purge_pipes(@queue.swrite, @queue.sread) 
    end

    @total = 0
    @count = 0

    @manager_thread = Thread.new do
      while true 
        break if processes_initiated && processes.empty? && (@monitor_thread && ! @monitor_thread.alive?)
        begin
          Thread.current["working"] = true
          if @close_up
            Thread.handle_interrupt(TryAgain => :never) do
              Log.debug "Closing up process queue #{Process.pid}"
              @count = 0
              @closing_thread = Thread.new do
                Thread.handle_interrupt(TryAgain => :never) do
                  Log.debug "Pushing closed stream #{Process.pid}"
                  while true
                    @queue.push ClosedStream.new unless @queue.cleaned 
                    break if processes_initiated && processes.empty?
                  end unless processes_initiated && processes.empty?
                end
              end
              @close_up = false
            end
          end

          begin
            sleep 3
          rescue TryAgain
          end

          raise TryAgain if @close_up

          process_mutex.synchronize do
            Thread.handle_interrupt(TryAgain => :never) do
              while @count > 0
                @count -= 1
                @total += 1
                processes << RbbtProcessQueueWorker.new(@queue, @callback_queue, @cleanup, @respawn, (@offset ? @total : false), &@init_block)
                Log.warn "Added process #{processes.last.pid} to #{Process.pid} (#{processes.length})"
              end

              while @count < 0
                @count += 1
                @total -= 1
                next unless processes.length > 1
                last = processes.reject{|p| p.stopping }.last
                last.stop
                Log.warn "Removed process #{last.pid} from #{Process.pid} (#{processes.length})"
              end
            end
          end
        rescue TryAgain
          retry
        rescue Aborted
          Log.low "Aborting manager thread #{Process.pid}"
          raise $!
        rescue Exception
          raise Exception
        end
      end
      Log.low "Manager thread stopped #{Process.pid}"
    end

    @callback_queue.close_read if @callback_queue

    num_processes.to_i.times do |i|
      @total += 1 
      process_mutex.synchronize do
        processes << RbbtProcessQueueWorker.new(@queue, @callback_queue, @cleanup, @respawn, (@offset ? @total : false), &@init_block)
      end
    end

    processes_initiated = true

    @monitor_thread = Thread.new do
      begin
        while processes.any?
          raise Aborted if @abort_monitor
          #processes[0].join
          #Log.debug "Joined process #{processes[0].pid} of queue #{Process.pid}"
          #processes.shift
          pid, status = Process.wait2
          Log.debug "Joined process #{pid} of queue #{Process.pid} (status: #{status})"
          processes.reject!{|p| p.pid == pid}
          raise ProcessFailed.new pid if not status.success?
        end
        Log.low "All processes completed #{Process.pid}"
      rescue Aborted
        Log.exception $!
        Log.low "Aborting process monitor #{Process.pid}"
        processes.each{|p|  p.abort_and_join}
        Log.low "Processes aborted #{Process.pid}"
        processes.clear

        @manager_thread.raise Aborted if @manager_thread.alive?
        raise Aborted, "Aborted monitor thread"
      rescue Exception
        Log.low "Process monitor exception [#{Process.pid}]: #{$!.message}"
        processes.each{|p| p.abort_and_join}
        Log.low "Processes aborted #{Process.pid}"
        processes.clear

        @manager_thread.raise $! if @manager_thread.alive?
        raise Aborted, "Aborted monitor thread with exception"
      end
    end

    RbbtSemaphore.post_semaphore(@sem)

    Log.low "Process monitor #{Process.pid} joining threads"
    begin
      @monitor_thread.join
      @manager_thread.raise TryAgain if @manager_thread.alive?
      @manager_thread.join 
      @callback_queue.push ClosedStream.new if @callback_queue
    rescue Exception
      Kernel.exit -1
    end
    Log.low "Process monitor #{Process.pid} threads joined successfully, now exit"

    Kernel.exit 0
  end

  @queue.close_read
  Log.info "Cpu process #{@master_pid} with #{num_processes} workers."
  Log.low "Signal #{@master_pid} USR1/USR2 (#10/#12) to increase/decrease workers."
end

#process(*e) ⇒ Object



367
368
369
370
371
372
373
# File 'lib/rbbt/util/concurrency/processes.rb', line 367

def process(*e)
  begin
    @queue.push e
  rescue Errno::EPIPE
    raise Aborted
  end
end

#remove_processObject



253
254
255
# File 'lib/rbbt/util/concurrency/processes.rb', line 253

def remove_process
  Process.kill :USR2, @master_pid
end