Module: ForkAndReturn

Defined in:
lib/forkandreturn/forkandreturn.rb,
lib/forkandreturn/util.rb,
lib/forkandreturn/exceptions.rb

Overview

ForkAndReturn implements a couple of methods that simplifies running a block of code in a subprocess. The result (Ruby object or exception) of the block will be available in the parent process.

The intermediate return value (or exception) will be Marshal’led to disk. This means that it is possible to (concurrently) run thousands of child process, with a relative low memory footprint. Just gather the results once all child process are done. ForkAndReturn will handle the writing, reading and deleting of the temporary file.

The core of these methods is fork_and_return_core(). It returns some nested lambdas, which are handled by the other methods and by Enumerable#concurrent_collect(). These lambdas handle the WAITing, LOADing and RESULTing (explained in fork_and_return_core()).

The child process exits with Process.exit!(), so at_exit() blocks are skipped in the child process. However, both $stdout and $stderr will be flushed.

Only Marshal’lable Ruby objects can be returned.

ForkAndReturn uses Process.fork(), so it only runs on platforms where Process.fork() is implemented.

Defined Under Namespace

Modules: Util Classes: WorkerError

Class Method Summary collapse

Class Method Details

.fork_and_return(*args, &block) ⇒ Object

Fork a new process and run the block of code within that process.

The WAITing, LOADing and RESULTing (explained in fork_and_return_core()) will be performed immediately and the return value of the block will be returned.

Example:

[1, 2, 3, 4].collect do |object|
  Thread.fork do
    ForkAndReturn.fork_and_return do
      2*object
    end
  end
end.collect do |thread|
  thread.value
end   # ===> [2, 4, 6, 8]

This runs each “2*object” in a seperate process. Hopefully, the processes are spread over all available CPU’s. That’s a simple way of parallel processing! (Although Enumerable#concurrent_collect() is even simpler…)

*args is passed to the block.



44
45
46
47
48
# File 'lib/forkandreturn/forkandreturn.rb', line 44

def self.fork_and_return(*args, &block)
  wait	= fork_and_return_core(*args, &block)

  wait.call.call.call
end

.fork_and_return_core(*args, &block) ⇒ Object

Fork a new process and run the block of code within that process.

Returns some nested lambdas: The first lambda is the WAIT-lambda. If you call the WAIT-lambda, you’re going to wait for the child process to finish. The WAIT-lambda returns the LOAD-lambda. If you call the LOAD-lambda, the result of the child process (the return value or the exception) will be loaded from the temporary file into memory and the temporary file will be deleted. The LOAD-lambda returns the RESULT-lambda. If you call RESULT-lambda, the result of the child process will be handled. This means either “return the return value of the block” or “raise the exception”

at_exit blocks defined in the child itself will be executed in the child, whereas at_exit blocks defined in the parent won’t be executed in the child.

*args is passed to the block.



80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
# File 'lib/forkandreturn/forkandreturn.rb', line 80

def self.fork_and_return_core(*args, &block)
  file	= Util.tempfile

  #begin
    pid =
    Process.fork do
      at_exit do
        $stdout.flush
        $stderr.flush

        Process.exit!	# To avoid the execution of already defined at_exit handlers.
      end

      begin
        ok, res	= true, yield(*args)
      rescue
        ok, res	= false, $!
      end

      File.open(file, "wb") do |f|
        f.chmod(0600)

        Marshal.dump([ok, res], f)
      end
    end
  #rescue Errno::EAGAIN	# Resource temporarily unavailable - fork(2)
  #  Kernel.sleep 0.1

  #  retry			# TODO: Reconsider.
  #end

  lambda do			# Wait for the result.
    Process.wait(pid)		# To avoid zombies.

    lambda do			# Load the result and delete the temp file.
      begin
        ok, res	= File.open(file, "rb"){|f| Marshal.load(f)}
      rescue Errno::ENOENT	# No such file or directory
        ok, res	= false, WorkerError.new("the worker hasn't returned a result")
      rescue EOFError		# end of file reached
        ok, res	= false, WorkerError.new("the worker hasn't returned a result")
      rescue TypeError	# can't be read
        ok, res	= false, WorkerError.new("the worker has returned corrupt data")
      ensure
        File.delete(file)	if File.file?(file)
      end

      lambda do		# Handle the result.
        raise res	unless ok

        res
      end
    end
  end
end

.fork_and_return_later(*args, &block) ⇒ Object

Fork a new process and run the block of code within that process.

Returns a lambda. If you call it, the WAITing, LOADing and RESULTing (explained in fork_and_return_core()) will be performed in one go.

*args is passed to the block.



57
58
59
60
61
# File 'lib/forkandreturn/forkandreturn.rb', line 57

def self.fork_and_return_later(*args, &block)
  wait	= fork_and_return_core(*args, &block)

  lambda{wait.call.call.call}
end