Class: Parallel::ForkManager
- Inherits:
-
Object
- Object
- Parallel::ForkManager
- Defined in:
- lib/parallel/forkmanager.rb
Constant Summary collapse
- VERSION =
$Revision: 1.2 $
'1.0.1'
Instance Attribute Summary collapse
-
#debug ⇒ Object
Set debug to 1 for debugging messages.
-
#do_on_finish ⇒ Object
Returns the value of attribute do_on_finish.
-
#do_on_start ⇒ Object
Returns the value of attribute do_on_start.
-
#do_on_wait ⇒ Object
Returns the value of attribute do_on_wait.
-
#in_child ⇒ Object
Returns the value of attribute in_child.
-
#max_proc ⇒ Object
Returns the value of attribute max_proc.
-
#on_wait_period ⇒ Object
Returns the value of attribute on_wait_period.
-
#processes ⇒ Object
Returns the value of attribute processes.
Instance Method Summary collapse
-
#_NT_waitpid(pid, par) ⇒ Object
_NT_waitpid(…) is the Windows variant of _waitpid(…) and will be called automatically by wait_one_child(…) depending on the value of RUBY_PLATFORM.
-
#_waitpid(pid, flags) ⇒ Object
_waitpid(…) should not be called directly as it is called automatically by wait_one_child(…).
-
#finish(exit_code = 0) ⇒ Object
finish(exit_code) – exit_code is optional.
-
#initialize(procs) ⇒ ForkManager
constructor
A new instance of ForkManager.
- #on_finish(*params) ⇒ Object
- #on_start(*params) ⇒ Object
- #on_wait ⇒ Object
-
#run_on_finish(code, pid = 0) ⇒ Object
You can define run_on_finish(…) that is called when a child in the parent process when a child is terminated.
-
#run_on_start(code) ⇒ Object
You can define a subroutine which is called when a child is started.
-
#run_on_wait(code, period) ⇒ Object
You can define a subroutine which is called when the child process needs to wait for the startup.
-
#set_max_procs(mp = nil) ⇒ Object
set_max_procs(mp) – mp is an integer.
-
#start(identification = nil) ⇒ Object
start(“string”) – “string” identification is optional.
-
#wait_all_children ⇒ Object
(also: #wait_all_childs)
wait_all_children() will wait for all the processes which have been forked.
- #wait_children ⇒ Object (also: #wait_childs)
-
#wait_one_child(parent) ⇒ Object
Probably won’t want to call this directly.
Constructor Details
#initialize(procs) ⇒ ForkManager
Returns a new instance of ForkManager.
137 138 139 140 141 142 143 144 145 146 147 |
# File 'lib/parallel/forkmanager.rb', line 137 def initialize(procs) @debug = 0 @max_proc = procs @processes = {} @do_on_finish = {} @in_child = 0 if self.debug == 1 print "in initialize #{max_proc}!\n" end end |
Instance Attribute Details
#debug ⇒ Object
Set debug to 1 for debugging messages.
133 134 135 |
# File 'lib/parallel/forkmanager.rb', line 133 def debug @debug end |
#do_on_finish ⇒ Object
Returns the value of attribute do_on_finish.
135 136 137 |
# File 'lib/parallel/forkmanager.rb', line 135 def do_on_finish @do_on_finish end |
#do_on_start ⇒ Object
Returns the value of attribute do_on_start.
135 136 137 |
# File 'lib/parallel/forkmanager.rb', line 135 def do_on_start @do_on_start end |
#do_on_wait ⇒ Object
Returns the value of attribute do_on_wait.
135 136 137 |
# File 'lib/parallel/forkmanager.rb', line 135 def do_on_wait @do_on_wait end |
#in_child ⇒ Object
Returns the value of attribute in_child.
134 135 136 |
# File 'lib/parallel/forkmanager.rb', line 134 def in_child @in_child end |
#max_proc ⇒ Object
Returns the value of attribute max_proc.
134 135 136 |
# File 'lib/parallel/forkmanager.rb', line 134 def max_proc @max_proc end |
#on_wait_period ⇒ Object
Returns the value of attribute on_wait_period.
134 135 136 |
# File 'lib/parallel/forkmanager.rb', line 134 def on_wait_period @on_wait_period end |
#processes ⇒ Object
Returns the value of attribute processes.
134 135 136 |
# File 'lib/parallel/forkmanager.rb', line 134 def processes @processes end |
Instance Method Details
#_NT_waitpid(pid, par) ⇒ Object
_NT_waitpid(…) is the Windows variant of _waitpid(…) and will be called automatically by wait_one_child(…) depending on the value of RUBY_PLATFORM. You should not call _NT_waitpid(…) directly.
439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 |
# File 'lib/parallel/forkmanager.rb', line 439 def _NT_waitpid(pid, par) if par == Process::WNOHANG pids = self.processes.keys() if pids.length() == 0 return -1 end kid = 0 for my_pid in pids kid = Process.waitpid(my_pid, par) if kid != 0 return kid end return kid end else return Process.waitpid(pid, par) end end |
#_waitpid(pid, flags) ⇒ Object
_waitpid(…) should not be called directly as it is called automatically by wait_one_child(…).
430 431 432 |
# File 'lib/parallel/forkmanager.rb', line 430 def _waitpid(pid, flags) return Process.waitpid(pid, flags) end |
#finish(exit_code = 0) ⇒ Object
finish(exit_code) – exit_code is optional
finish() loses the child process by exiting and accepts an optional exit code. Default exit code is 0 and can be retrieved in the parent via callback. If you’re running the program in debug mode (max_proc == 0), this method doesn’t do anything.
212 213 214 215 216 217 218 219 220 221 222 223 |
# File 'lib/parallel/forkmanager.rb', line 212 def finish(exit_code = 0) if self.in_child == 1 exit exit_code || 0 end if self.max_proc == 0 self.on_finish($$, exit_code, self.processes[$$], 0, 0) self.processes.delete($$) end return 0 end |
#on_finish(*params) ⇒ Object
307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 |
# File 'lib/parallel/forkmanager.rb', line 307 def on_finish(*params) pid = params[0] code = self.do_on_finish[pid] || self.do_on_finish[0] or return 0 begin my_argc = code.arity - 1 if my_argc > 0 my_params = params[0 .. my_argc] else my_params = [params[0]] end params = my_params code.call(*params) rescue raise "on finish failed!\n" end end |
#on_start(*params) ⇒ Object
394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 |
# File 'lib/parallel/forkmanager.rb', line 394 def on_start(*params) begin if self.do_on_start.class().name == 'Proc' my_argc = self.do_on_start.arity - 1 if my_argc > 0 my_params = params[0 .. my_argc] else my_params = params[0] end params = my_params self.do_on_start.call(*params) end rescue raise "on_start failed\n" end end |
#on_wait ⇒ Object
349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 |
# File 'lib/parallel/forkmanager.rb', line 349 def on_wait() begin if self.do_on_wait.class().name == 'Proc' self.do_on_wait.call() if defined? self.on_wait_period # # Unfortunately Ruby 1.8 has no concept of 'sigaction', # so we're unable to check if a signal handler has # already been installed for a given signal. In this # case it's no matter, since we define handler, but yikes. # Signal.trap("CHLD") do lambda{}.call() end IO.select(nil, nil, nil, self.on_wait_period) end end end end |
#run_on_finish(code, pid = 0) ⇒ Object
You can define run_on_finish(…) that is called when a child in the parent process when a child is terminated.
The parameters of run_on_finish(…) are:
-
pid of the process, which is terminated
-
exit code of the program
-
identification of the process (if provided in the “start” method)
-
exit signal (0-127: signal name)
-
core dump (1 if there was core dump at exit)
Example:
pfm.run_on_finish(
lambda {
|pid,exit_code,ident|
print "** PID (#{pid}) for #{ident} exited with code #{exit_code}!\n"
}
)
299 300 301 302 303 304 305 |
# File 'lib/parallel/forkmanager.rb', line 299 def run_on_finish(code, pid=0) begin self.do_on_finish[pid] = code rescue raise "couldn't run on finish!\n" end end |
#run_on_start(code) ⇒ Object
You can define a subroutine which is called when a child is started. It is called after a successful startup of a child in the parent process.
The parameters of code are as follows:
-
pid of the process which has been started
-
identification of the process (if provided in the “start” method)
Example:
pfm.run_on_start(
lambda {
|pid,ident|
print "run on start ::: #{ident} (#{pid})\n"
}
)
386 387 388 389 390 391 392 |
# File 'lib/parallel/forkmanager.rb', line 386 def run_on_start(code) begin self.do_on_start = code rescue raise "run on start failed!\n" end end |
#run_on_wait(code, period) ⇒ Object
You can define a subroutine which is called when the child process needs to wait for the startup. If period is not defined, then one call is done per child. If period is defined, then code is called periodically and the method waits for “period” seconds betwen the two calls. Note, period can be fractional number also. The exact “period seconds” is not guaranteed, signals can shorten and the process scheduler can make it longer (i.e. on busy systems).
No parameters are passed to code on the call.
Example:
timeout = 0.5
pfm.run_on_wait(
lambda {
print "** Have to wait for one child ...\n"
},
timeout
)
344 345 346 347 |
# File 'lib/parallel/forkmanager.rb', line 344 def run_on_wait(code, period) self.do_on_wait = code self.on_wait_period = period end |
#set_max_procs(mp = nil) ⇒ Object
set_max_procs(mp) – mp is an integer
set_max_procs() allows you to set a new maximum number of children to maintain.
Return: The previous setting of max_procs.
418 419 420 421 422 423 424 |
# File 'lib/parallel/forkmanager.rb', line 418 def set_max_procs(mp=nil) if mp == nil return self.max_proc else self.max_proc = mp end end |
#start(identification = nil) ⇒ Object
start(“string”) – “string” identification is optional.
start(“string”) “puts the fork in Parallel::ForkManager” – as start() does the fork().
start(“string”) takes an optional “string” argument to use as a process identifier. It is used by the “run_on_finish” callback for identifying the finished process. See run_on_finish() for more information.
Return: PID of child process if in parent, or 0 if in the child process.
163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 |
# File 'lib/parallel/forkmanager.rb', line 163 def start(identification=nil) if self.in_child == 1 puts "Cannot start another process while you are in the child process" exit 1 end while(self.processes.length() >= self.max_proc) self.on_wait() if defined? self.on_wait_period arg = Process::WNOHANG else arg = nil end self.wait_one_child(arg) end self.wait_children() if self.max_proc pid = fork() if ! defined? pid print "Cannot fork #{$!}\n" exit 1 end if pid != nil self.processes[pid] = identification self.on_start(pid, identification) else if ! pid self.in_child = 1 end end return pid else self.processes[$$] = identification self.on_start($$, identification) return 0 end end |
#wait_all_children ⇒ Object Also known as: wait_all_childs
wait_all_children() will wait for all the processes which have been forked. This is a blocking wait.
264 265 266 267 268 269 270 271 272 273 274 |
# File 'lib/parallel/forkmanager.rb', line 264 def wait_all_children() while ! self.processes.empty? self.on_wait() if defined? self.on_wait_period arg = Process::WNOHANG else arg = nil end self.wait_one_child(arg) end end |
#wait_children ⇒ Object Also known as: wait_childs
225 226 227 228 229 230 231 232 233 |
# File 'lib/parallel/forkmanager.rb', line 225 def wait_children() return if self.processes.empty? kid = nil # Should our default be nil? loop do kid = self.wait_one_child(Process::WNOHANG) break if kid > 0 || kid < -1 end end |
#wait_one_child(parent) ⇒ Object
Probably won’t want to call this directly. Just let wait_all_children(…) make the call for you.
241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 |
# File 'lib/parallel/forkmanager.rb', line 241 def wait_one_child(parent) kid = nil while true # Call _NT_waitpid(...) if we're using a Windows or Java variant. if(RUBY_PLATFORM =~ /mswin|mingw|bccwin|wince|emx|java/) kid = self._NT_waitpid(-1, parent ||= 0) else kid = self._waitpid(-1, parent ||= 0) end last if kid == 0 or kid == -1 # Win32 returns negative PIDs redo if ! self.processes.has_key?(kid) id = self.processes.delete(kid) self.on_finish(kid, $? >> 8, id, $? & 0x7f, $? & 0x80 ? 1 : 0) break end kid end |