Module: Enumerable
- Defined in:
- lib/parallel_enum.rb
Instance Method Summary collapse
-
#map_forked(num_forks = 8, &block) ⇒ Object
This function is identical to the map function, but uses multiple forks to speed up processing.
-
#map_threaded(num_threads = 8, &block) ⇒ Object
This function is identical to the map function, but uses multiple threads to speed up processing.
Instance Method Details
#map_forked(num_forks = 8, &block) ⇒ Object
This function is identical to the map function, but uses multiple forks to speed up processing. See Enumerator#forked for more information. NOTE: Since most variables are not shared between processes, I had to rely on I/O to send the block’s return values back. Because of this, the block’s return value must be Marshal-able into a string.
145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 |
# File 'lib/parallel_enum.rb', line 145 def map_forked(num_forks=8,&block) return self.map_threaded(num_forks) do |item| xn_r, xn_w = IO.pipe # xn pipe will be used to send a Marshal'd exception back to the main process rz_r, rz_w = IO.pipe # rz pipe will be used to send the block's return value back to the main process pid = Process.fork do # Fork a new process from the thread begin r = block.call(item) # Call block rz_w.print(Marshal.dump(r)) # Send the result back rescue Exception => e # Exceptions are caught to be sent back to the main process xn = nil # xn will hold the Marshal'd exception begin xn = Marshal.dump(e)# Try to dump the exception rescue Exception => e # That might fail if this is a particularly exotic exception xn = Marshal.dump(IOError.new("Failed to carry #{e.class} to main process")) end xn_w.print(xn) # Send that Marshal'd string version of the exception back ensure exit! # Don't call any at_exit methods end end Process.wait(pid) # Wait for the subprocess to finish xn_w.close # Close the write pipe xn = xn_r.read # Read any exception xn_r.close # Close the read pipe raise Marshal.load(xn) if xn != '' # Raise the passed exception if it exists rz_w.close # Close the write pipe rz = rz_r.read # Read the result of the block rz_r.close # Close the read pipe Marshal.load(rz) # Return the return value end end |
#map_threaded(num_threads = 8, &block) ⇒ Object
This function is identical to the map function, but uses multiple threads to speed up processing. See Enumerator#threaded for more information
130 131 132 133 134 135 136 137 138 |
# File 'lib/parallel_enum.rb', line 130 def map_threaded(num_threads=8,&block) result = {} mtx = Mutex.new self.each.with_index.threaded(num_threads) do |x, i| r = block.call(x) mtx.synchronize{result[i] = r} end return result.to_a.sort.map{|i, x| x} end |