Module: ForkAndReturn
- Defined in:
- lib/forkandreturn/forkandreturn.rb,
lib/forkandreturn/util.rb,
lib/forkandreturn/exceptions.rb
Overview
ForkAndReturn implements a couple of methods that simplifies running a block of code in a subprocess. The result (Ruby object or exception) of the block will be available in the parent process.
The intermediate return value (or exception) will be Marshal’led to disk. This means that it is possible to (concurrently) run thousands of child process, with a relative low memory footprint. Just gather the results once all child process are done. ForkAndReturn will handle the writing, reading and deleting of the temporary file.
The core of these methods is fork_and_return_core(). It returns some nested lambdas, which are handled by the other methods and by Enumerable#concurrent_collect(). These lambdas handle the WAITing, LOADing and RESULTing (explained in fork_and_return_core()).
The child process exits with Process.exit!(), so at_exit() blocks are skipped in the child process. However, both $stdout and $stderr will be flushed.
Only Marshal’lable Ruby objects can be returned.
ForkAndReturn uses Process.fork(), so it only runs on platforms where Process.fork() is implemented.
Defined Under Namespace
Modules: Util Classes: WorkerError
Class Method Summary collapse
-
.fork_and_return(*args, &block) ⇒ Object
Fork a new process and run the block of code within that process.
-
.fork_and_return_core(*args, &block) ⇒ Object
Fork a new process and run the block of code within that process.
-
.fork_and_return_later(*args, &block) ⇒ Object
Fork a new process and run the block of code within that process.
Class Method Details
.fork_and_return(*args, &block) ⇒ Object
Fork a new process and run the block of code within that process.
The WAITing, LOADing and RESULTing (explained in fork_and_return_core()) will be performed immediately and the return value of the block will be returned.
Example:
[1, 2, 3, 4].collect do |object|
Thread.fork do
ForkAndReturn.fork_and_return do
2*object
end
end
end.collect do |thread|
thread.value
end # ===> [2, 4, 6, 8]
This runs each “2*object” in a seperate process. Hopefully, the processes are spread over all available CPU’s. That’s a simple way of parallel processing! (Although Enumerable#concurrent_collect() is even simpler…)
*args is passed to the block.
44 45 46 47 48 |
# File 'lib/forkandreturn/forkandreturn.rb', line 44 def self.fork_and_return(*args, &block) wait = fork_and_return_core(*args, &block) wait.call.call.call end |
.fork_and_return_core(*args, &block) ⇒ Object
Fork a new process and run the block of code within that process.
Returns some nested lambdas: The first lambda is the WAIT-lambda. If you call the WAIT-lambda, you’re going to wait for the child process to finish. The WAIT-lambda returns the LOAD-lambda. If you call the LOAD-lambda, the result of the child process (the return value or the exception) will be loaded from the temporary file into memory and the temporary file will be deleted. The LOAD-lambda returns the RESULT-lambda. If you call RESULT-lambda, the result of the child process will be handled. This means either “return the return value of the block” or “raise the exception”
at_exit blocks defined in the child itself will be executed in the child, whereas at_exit blocks defined in the parent won’t be executed in the child.
*args is passed to the block.
80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 |
# File 'lib/forkandreturn/forkandreturn.rb', line 80 def self.fork_and_return_core(*args, &block) file = Util.tempfile #begin pid = Process.fork do at_exit do $stdout.flush $stderr.flush Process.exit! # To avoid the execution of already defined at_exit handlers. end begin ok, res = true, yield(*args) rescue ok, res = false, $! end File.open(file, "wb") do |f| f.chmod(0600) Marshal.dump([ok, res], f) end end #rescue Errno::EAGAIN # Resource temporarily unavailable - fork(2) # Kernel.sleep 0.1 # retry # TODO: Reconsider. #end lambda do # Wait for the result. Process.wait(pid) # To avoid zombies. lambda do # Load the result and delete the temp file. begin ok, res = File.open(file, "rb"){|f| Marshal.load(f)} rescue Errno::ENOENT # No such file or directory ok, res = false, WorkerError.new("the worker hasn't returned a result") rescue EOFError # end of file reached ok, res = false, WorkerError.new("the worker hasn't returned a result") rescue TypeError # can't be read ok, res = false, WorkerError.new("the worker has returned corrupt data") ensure File.delete(file) if File.file?(file) end lambda do # Handle the result. raise res unless ok res end end end end |
.fork_and_return_later(*args, &block) ⇒ Object
Fork a new process and run the block of code within that process.
Returns a lambda. If you call it, the WAITing, LOADing and RESULTing (explained in fork_and_return_core()) will be performed in one go.
*args is passed to the block.
57 58 59 60 61 |
# File 'lib/forkandreturn/forkandreturn.rb', line 57 def self.fork_and_return_later(*args, &block) wait = fork_and_return_core(*args, &block) lambda{wait.call.call.call} end |