Class: Parallel::ForkManager

Inherits:
Object
  • Object
show all
Defined in:
lib/parallel/forkmanager.rb

Constant Summary collapse

VERSION =

$Revision: 1.2 $

'1.0.1'

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(procs) ⇒ ForkManager

Returns a new instance of ForkManager.



137
138
139
140
141
142
143
144
145
146
147
# File 'lib/parallel/forkmanager.rb', line 137

def initialize(procs)
    @debug = 0
    @max_proc = procs
    @processes = {}
    @do_on_finish = {}
    @in_child = 0

    if self.debug == 1
        print "in initialize #{max_proc}!\n"
    end
end

Instance Attribute Details

#debugObject

Set debug to 1 for debugging messages.



133
134
135
# File 'lib/parallel/forkmanager.rb', line 133

def debug
  @debug
end

#do_on_finishObject

Returns the value of attribute do_on_finish.



135
136
137
# File 'lib/parallel/forkmanager.rb', line 135

def do_on_finish
  @do_on_finish
end

#do_on_startObject

Returns the value of attribute do_on_start.



135
136
137
# File 'lib/parallel/forkmanager.rb', line 135

def do_on_start
  @do_on_start
end

#do_on_waitObject

Returns the value of attribute do_on_wait.



135
136
137
# File 'lib/parallel/forkmanager.rb', line 135

def do_on_wait
  @do_on_wait
end

#in_childObject

Returns the value of attribute in_child.



134
135
136
# File 'lib/parallel/forkmanager.rb', line 134

def in_child
  @in_child
end

#max_procObject

Returns the value of attribute max_proc.



134
135
136
# File 'lib/parallel/forkmanager.rb', line 134

def max_proc
  @max_proc
end

#on_wait_periodObject

Returns the value of attribute on_wait_period.



134
135
136
# File 'lib/parallel/forkmanager.rb', line 134

def on_wait_period
  @on_wait_period
end

#processesObject

Returns the value of attribute processes.



134
135
136
# File 'lib/parallel/forkmanager.rb', line 134

def processes
  @processes
end

Instance Method Details

#_NT_waitpid(pid, par) ⇒ Object

_NT_waitpid(…) is the Windows variant of _waitpid(…) and will be called automatically by wait_one_child(…) depending on the value of RUBY_PLATFORM. You should not call _NT_waitpid(…) directly.



439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
# File 'lib/parallel/forkmanager.rb', line 439

def _NT_waitpid(pid, par)
    if par == Process::WNOHANG
        pids = self.processes.keys()
        if pids.length() == 0
            return -1
        end
        
        kid = 0
        for my_pid in pids
            kid = Process.waitpid(my_pid, par)
            if kid != 0
                return kid
            end
        return kid
        end
    else
        return Process.waitpid(pid, par)    
    end
end

#_waitpid(pid, flags) ⇒ Object

_waitpid(…) should not be called directly as it is called automatically by wait_one_child(…).



430
431
432
# File 'lib/parallel/forkmanager.rb', line 430

def _waitpid(pid, flags)
    return Process.waitpid(pid, flags)
end

#finish(exit_code = 0) ⇒ Object

finish(exit_code) – exit_code is optional

finish() loses the child process by exiting and accepts an optional exit code. Default exit code is 0 and can be retrieved in the parent via callback. If you’re running the program in debug mode (max_proc == 0), this method doesn’t do anything.



212
213
214
215
216
217
218
219
220
221
222
223
# File 'lib/parallel/forkmanager.rb', line 212

def finish(exit_code = 0)
    if self.in_child == 1
        exit exit_code || 0
    end

    if self.max_proc == 0
        self.on_finish($$, exit_code, self.processes[$$], 0, 0)
        self.processes.delete($$)
    end
    
    return 0
end

#on_finish(*params) ⇒ Object



307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
# File 'lib/parallel/forkmanager.rb', line 307

def on_finish(*params)
    pid = params[0]
    code = self.do_on_finish[pid] || self.do_on_finish[0] or return 0
    begin
        my_argc = code.arity - 1
        if my_argc > 0
            my_params = params[0 .. my_argc]
        else
            my_params = [params[0]]
        end
        params = my_params
        code.call(*params)
    rescue
        raise "on finish failed!\n"
    end
end

#on_start(*params) ⇒ Object



394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
# File 'lib/parallel/forkmanager.rb', line 394

def on_start(*params)
    begin
        if self.do_on_start.class().name == 'Proc'
            my_argc = self.do_on_start.arity - 1
            if my_argc > 0
                my_params = params[0 .. my_argc]    
            else
                my_params = params[0]
            end
            params = my_params
            self.do_on_start.call(*params)
        end
    rescue
        raise "on_start failed\n"
    end       
end

#on_waitObject



349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
# File 'lib/parallel/forkmanager.rb', line 349

def on_wait()
    begin
        if self.do_on_wait.class().name == 'Proc'
            self.do_on_wait.call()
            if defined? self.on_wait_period
                #
                # Unfortunately Ruby 1.8 has no concept of 'sigaction',
                # so we're unable to check if a signal handler has
                # already been installed for a given signal.  In this
                # case it's no matter, since we define handler, but yikes.
                #
                Signal.trap("CHLD") do
                    lambda{}.call()
                end
                IO.select(nil, nil, nil, self.on_wait_period)
            end
        end
    end
end

#run_on_finish(code, pid = 0) ⇒ Object

You can define run_on_finish(…) that is called when a child in the parent process when a child is terminated.

The parameters of run_on_finish(…) are:

  • pid of the process, which is terminated

  • exit code of the program

  • identification of the process (if provided in the “start” method)

  • exit signal (0-127: signal name)

  • core dump (1 if there was core dump at exit)

Example:

pfm.run_on_finish(
    lambda {
        |pid,exit_code,ident|
        print "** PID (#{pid}) for #{ident} exited with code #{exit_code}!\n"
    }
)


299
300
301
302
303
304
305
# File 'lib/parallel/forkmanager.rb', line 299

def run_on_finish(code, pid=0)
    begin
        self.do_on_finish[pid] = code
    rescue
        raise "couldn't run on finish!\n"
    end
end

#run_on_start(code) ⇒ Object

You can define a subroutine which is called when a child is started. It is called after a successful startup of a child in the parent process.

The parameters of code are as follows:

  • pid of the process which has been started

  • identification of the process (if provided in the “start” method)

Example:

pfm.run_on_start(
    lambda {
        |pid,ident|
        print "run on start ::: #{ident} (#{pid})\n"
    }
)


386
387
388
389
390
391
392
# File 'lib/parallel/forkmanager.rb', line 386

def run_on_start(code)
    begin
        self.do_on_start = code
    rescue
        raise "run on start failed!\n"
    end
end

#run_on_wait(code, period) ⇒ Object

You can define a subroutine which is called when the child process needs to wait for the startup. If period is not defined, then one call is done per child. If period is defined, then code is called periodically and the method waits for “period” seconds betwen the two calls. Note, period can be fractional number also. The exact “period seconds” is not guaranteed, signals can shorten and the process scheduler can make it longer (i.e. on busy systems).

No parameters are passed to code on the call.

Example:

timeout = 0.5
pfm.run_on_wait(
    lambda {
        print "** Have to wait for one child ...\n"
    },
    timeout
)


344
345
346
347
# File 'lib/parallel/forkmanager.rb', line 344

def run_on_wait(code, period)
    self.do_on_wait = code
    self.on_wait_period = period
end

#set_max_procs(mp = nil) ⇒ Object

set_max_procs(mp) – mp is an integer

set_max_procs() allows you to set a new maximum number of children to maintain.

Return: The previous setting of max_procs.



418
419
420
421
422
423
424
# File 'lib/parallel/forkmanager.rb', line 418

def set_max_procs(mp=nil)
    if mp == nil
        return self.max_proc
    else
        self.max_proc = mp
    end
end

#start(identification = nil) ⇒ Object

start(“string”) – “string” identification is optional.

start(“string”) “puts the fork in Parallel::ForkManager” – as start() does the fork().

start(“string”) takes an optional “string” argument to use as a process identifier. It is used by the “run_on_finish” callback for identifying the finished process. See run_on_finish() for more information.

Return: PID of child process if in parent, or 0 if in the child process.



163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
# File 'lib/parallel/forkmanager.rb', line 163

def start(identification=nil)
    if self.in_child == 1
        puts "Cannot start another process while you are in the child process"
        exit 1
    end

    while(self.processes.length() >= self.max_proc)
        self.on_wait()
        if defined? self.on_wait_period
            arg = Process::WNOHANG
        else
            arg = nil
        end
        self.wait_one_child(arg)
    end
    
    self.wait_children()

    if self.max_proc
        pid = fork()
        if ! defined? pid
            print "Cannot fork #{$!}\n"
            exit 1
        end
        
        if pid != nil
            self.processes[pid] = identification
            self.on_start(pid, identification)
        else
            if ! pid
                self.in_child = 1
            end
        end
        return pid
    else
        self.processes[$$] = identification
        self.on_start($$, identification)
        return 0
    end        
end

#wait_all_childrenObject Also known as: wait_all_childs

wait_all_children() will wait for all the processes which have been forked. This is a blocking wait.



264
265
266
267
268
269
270
271
272
273
274
# File 'lib/parallel/forkmanager.rb', line 264

def wait_all_children()
    while ! self.processes.empty?
        self.on_wait()
        if defined? self.on_wait_period
            arg = Process::WNOHANG
        else
            arg = nil
        end
        self.wait_one_child(arg)
    end
end

#wait_childrenObject Also known as: wait_childs



225
226
227
228
229
230
231
232
233
# File 'lib/parallel/forkmanager.rb', line 225

def wait_children()
    return if self.processes.empty?
    
    kid = nil # Should our default be nil?
    loop do
        kid = self.wait_one_child(Process::WNOHANG)
        break if kid > 0 || kid < -1
    end
end

#wait_one_child(parent) ⇒ Object

Probably won’t want to call this directly. Just let wait_all_children(…) make the call for you.



241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
# File 'lib/parallel/forkmanager.rb', line 241

def wait_one_child(parent)
    kid = nil
    while true
        # Call _NT_waitpid(...) if we're using a Windows or Java variant.
        if(RUBY_PLATFORM =~ /mswin|mingw|bccwin|wince|emx|java/)
            kid = self._NT_waitpid(-1, parent ||= 0)
        else
            kid = self._waitpid(-1, parent ||= 0)
        end
        last if kid == 0 or kid == -1 # Win32 returns negative PIDs
        redo if ! self.processes.has_key?(kid)
        id = self.processes.delete(kid)
        self.on_finish(kid, $? >> 8, id, $? & 0x7f, $? & 0x80 ? 1 : 0)
        break
    end

    kid
end