Class: Enumerator
- Inherits:
-
Object
- Object
- Enumerator
- Defined in:
- lib/parallel_enum.rb
Overview
Tested on Ubuntu and CentOS. Untested on Windows and OSX. The fork stuff probably won’t work on Windows because Windows doesn’t have Kernel#fork
Instance Method Summary collapse
-
#forked(num_forks = 8, &block) ⇒ Object
forked is like threaded, but uses multiple process forks to speed up processing by taking advantage of multiple CPU cores.
-
#threaded(num_threads = 8, &block) ⇒ Object
threaded is like each, but uses multiple threads to speed up processing when the executed code contains a lot of blocking or waiting.
Instance Method Details
#forked(num_forks = 8, &block) ⇒ Object
forked is like threaded, but uses multiple process forks to speed up processing by taking advantage of multiple CPU cores. Note that, while this is an advantage over threaded, there are a few drawbacks: First, forked is not available on all platforms, though *nix systems are usually fine. Second, there is no inter-process mutex built into Ruby, although some libraries are available. Third, variables CANNOT be altered from within a fork, as forking the Ruby interpreter clones the environment.
98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 |
# File 'lib/parallel_enum.rb', line 98 def forked(num_forks=8,&block) # Threading beind the scenes should create separate # ActiveRecord connections and severely de-complicates things self.threaded(num_forks) do |item| xn_r, xn_w = IO.pipe # xn pipe will be used to send a Marshal'd exception back to the main process pid = Process.fork do # Fork a new process from the thread begin block.call(item) # Call block rescue Exception => e # Exceptions are caught to be sent back to the main process xn = nil # xn will hold the Marshal'd exception begin xn = Marshal.dump(e)# Try to dump the exception rescue Exception => e # That might fail if this is a particularly exotic exception xn = Marshal.dump(IOError.new("Failed to carry #{e.class} to main process")) end xn_w.print(xn) # Send that Marshal'd string version of the exception back ensure exit! # Don't call any at_exit methods end end Process.wait(pid) # Wait for the subprocess to finish xn_w.close # Close the write pipe xn = xn_r.read # Read any exception xn_r.close # Close the read pipe raise Marshal.load(xn) if xn != '' # Raise the passed exception if it exists end end |
#threaded(num_threads = 8, &block) ⇒ Object
threaded is like each, but uses multiple threads to speed up processing when the executed code contains a lot of blocking or waiting. Try benchmarking these two pieces of code:
(0…50).each{|x| sleep rand*3; puts x} (0…50).each.threaded{|x| sleep rand*3; puts x}
If any thread raises an exception, Enumerator#threaded will catch it and bring it into the main thread. That said, if two different threads raise two different exceptions, one will be saved while the other will be lost to the aether. It is not possible to predict which will be saved, so it’s probably best to put exception handling code within the block if you plan to catch errors.
Note that even though Ruby 1.9 uses real system threads in its code, it still contains a Global Interpreter Lock that will not allow two threads to run concurrently. Benchmark these two:
(0…50).each{|x| 32000000.times{}; puts x} (0…50).each.threaded{|x| 32000000.times{}; puts x}
The threaded version may actually run slower than the non-threaded version because of the overhead invloved. If you want to speed up processing code by taking advantage of multiple cores, see Enumerator#forked
27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 |
# File 'lib/parallel_enum.rb', line 27 def threaded(num_threads=8,&block) raise ArgumentError.new("It makes no sense to call Enumerator#threaded without a block") if block.nil? raise ArgumentError.new("num_threads must be a positive integer") unless num_threads.kind_of? Fixnum and num_threads > 0 mutex = Mutex.new # used to ensure only one thread is using the instruction and feedback pipes at a time threads = [] # will hold the pool of threads so we can join them later items = {} # contains items returned by self.next indexed by their object_id - used to prevent garbage collection exception = nil # contains the exception raised by any thread instruction_r, instruction_w = IO.pipe # Used to assign items to the threads. 'stop' is sent to terminate the thread. feedback_r, feedback_w = IO.pipe # Used by threads to indicate completion of an item # Alright. Let's make some threads! num_threads.times do threads << Thread.new do instruction = nil # Contains a string of the next instruction - either something like '63913' or 'stop' while true # Main evaluation loop - exited explicitly through break mutex.synchronize{instruction = instruction_r.gets.chomp} # One thread at a time may read an instruction break if instruction == 'stop' # Stop if told to stop begin block.call(ObjectSpace._id2ref(instruction.to_i)) # Call the block on the referenced item rescue Exception => e exception = e # Any exceptions are caught and sent to the main thread end mutex.synchronize{feedback_w.puts instruction} # Report completion to the main thread end end end # The threads are now armed and ready to evaluate begin # Start by sending as many items as there are threads num_threads.times do item = self.next # Grab the next item items[item.object_id] = item # Store it so it won't be garbage collected instruction_w.puts item.object_id # Send it to the thread pool end # Then send items as old ones come back (break out of loop when we reach the end) while true index = feedback_r.gets.to_i # Wait for an item to be done break if exception # Stop if a thread had an error items.delete index # Delete the completed item from the item pool item = self.next # Grab the next item items[item.object_id] = item # Store it so it won't be garbage collected instruction_w.puts item.object_id # Send it to the thread pool end rescue StopIteration # StopIteration will be raised by self.next when we reach the end of the iteration nil rescue Exception => e mutex.synchronize{exception = e} if exception.nil? # Any other error will be dealt with promptly ensure begin (num_threads+1).times{instruction_w.puts 'stop'} # Tell all the threads to stop threads.each{|t| t.join} # and wait for them to stop rescue Exception => e mutex.synchronize{exception = e} if exception.nil? # Any error at this stage will be dealt with promptly ensure threads.each{|t| t.kill} # Threads should have stopped by now, but if not, they die. [instruction_r, instruction_w, feedback_r, feedback_w ].each{|io| io.close} # Close IO raise exception unless exception.nil? # Re-raise any errors now that the thread pool is closed return self end end end |