Module: Enumerable

Defined in:
lib/parallel_enum.rb

Instance Method Summary collapse

Instance Method Details

#map_forked(num_forks = 8, &block) ⇒ Object

This function is identical to the map function, but uses multiple forks to speed up processing. See Enumerator#forked for more information. NOTE: Since most variables are not shared between processes, I had to rely on I/O to send the block’s return values back. Because of this, the block’s return value must be Marshal-able into a string.



145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
# File 'lib/parallel_enum.rb', line 145

def map_forked(num_forks=8,&block)
  return self.map_threaded(num_forks) do |item|
    xn_r, xn_w = IO.pipe      # xn pipe will be used to send a Marshal'd exception back to the main process
    rz_r, rz_w = IO.pipe      # rz pipe will be used to send the block's return value back to the main process
    pid = Process.fork do     # Fork a new process from the thread
      begin
        r = block.call(item)  # Call block
        rz_w.print(Marshal.dump(r)) # Send the result back
      rescue Exception => e   # Exceptions are caught to be sent back to the main process
        xn = nil              # xn will hold the Marshal'd exception
        begin
          xn = Marshal.dump(e)# Try to dump the exception
        rescue Exception => e # That might fail if this is a particularly exotic exception
          xn = Marshal.dump(IOError.new("Failed to carry #{e.class} to main process"))
        end
        xn_w.print(xn)        # Send that Marshal'd string version of the exception back
      ensure
        exit!                 # Don't call any at_exit methods
      end
    end
    Process.wait(pid)         # Wait for the subprocess to finish
    xn_w.close                # Close the write pipe
    xn = xn_r.read            # Read any exception
    xn_r.close                # Close the read pipe
    raise Marshal.load(xn) if xn != '' # Raise the passed exception if it exists
    
    rz_w.close                # Close the write pipe
    rz = rz_r.read            # Read the result of the block
    rz_r.close                # Close the read pipe
    Marshal.load(rz)          # Return the return value
  end
end

#map_threaded(num_threads = 8, &block) ⇒ Object

This function is identical to the map function, but uses multiple threads to speed up processing. See Enumerator#threaded for more information



130
131
132
133
134
135
136
137
138
# File 'lib/parallel_enum.rb', line 130

def map_threaded(num_threads=8,&block)
  result = {}
  mtx = Mutex.new
  self.each.with_index.threaded(num_threads) do |x, i|
    r = block.call(x)
    mtx.synchronize{result[i] = r}
  end
  return result.to_a.sort.map{|i, x| x}
end