Class: Enumerator

Inherits:
Object
  • Object
show all
Defined in:
lib/parallel_enum.rb

Overview

Tested on Ubuntu and CentOS. Untested on Windows and OSX. The fork stuff probably won’t work on Windows because Windows doesn’t have Kernel#fork

Instance Method Summary collapse

Instance Method Details

#forked(num_forks = 8, &block) ⇒ Object

forked is like threaded, but uses multiple process forks to speed up processing by taking advantage of multiple CPU cores. Note that, while this is an advantage over threaded, there are a few drawbacks: First, forked is not available on all platforms, though *nix systems are usually fine. Second, there is no inter-process mutex built into Ruby, although some libraries are available. Third, variables CANNOT be altered from within a fork, as forking the Ruby interpreter clones the environment.



98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
# File 'lib/parallel_enum.rb', line 98

def forked(num_forks=8,&block)
  # Threading beind the scenes should create separate
  # ActiveRecord connections and severely de-complicates things
  self.threaded(num_forks) do |item|
    xn_r, xn_w = IO.pipe      # xn pipe will be used to send a Marshal'd exception back to the main process
    pid = Process.fork do     # Fork a new process from the thread
      begin
        block.call(item)      # Call block
      rescue Exception => e   # Exceptions are caught to be sent back to the main process
        xn = nil              # xn will hold the Marshal'd exception
        begin
          xn = Marshal.dump(e)# Try to dump the exception
        rescue Exception => e # That might fail if this is a particularly exotic exception
          xn = Marshal.dump(IOError.new("Failed to carry #{e.class} to main process"))
        end
        xn_w.print(xn)        # Send that Marshal'd string version of the exception back
      ensure
        exit!                 # Don't call any at_exit methods
      end
    end
    Process.wait(pid)         # Wait for the subprocess to finish
    xn_w.close                # Close the write pipe
    xn = xn_r.read            # Read any exception
    xn_r.close                # Close the read pipe
    raise Marshal.load(xn) if xn != '' # Raise the passed exception if it exists
  end
end

#threaded(num_threads = 8, &block) ⇒ Object

threaded is like each, but uses multiple threads to speed up processing when the executed code contains a lot of blocking or waiting. Try benchmarking these two pieces of code:

(0…50).each{|x| sleep rand*3; puts x} (0…50).each.threaded{|x| sleep rand*3; puts x}

If any thread raises an exception, Enumerator#threaded will catch it and bring it into the main thread. That said, if two different threads raise two different exceptions, one will be saved while the other will be lost to the aether. It is not possible to predict which will be saved, so it’s probably best to put exception handling code within the block if you plan to catch errors.

Note that even though Ruby 1.9 uses real system threads in its code, it still contains a Global Interpreter Lock that will not allow two threads to run concurrently. Benchmark these two:

(0…50).each{|x| 32000000.times{}; puts x} (0…50).each.threaded{|x| 32000000.times{}; puts x}

The threaded version may actually run slower than the non-threaded version because of the overhead invloved. If you want to speed up processing code by taking advantage of multiple cores, see Enumerator#forked

Raises:

  • (ArgumentError)


27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
# File 'lib/parallel_enum.rb', line 27

def threaded(num_threads=8,&block)
  raise ArgumentError.new("It makes no sense to call Enumerator#threaded without a block") if block.nil?
  raise ArgumentError.new("num_threads must be a positive integer") unless num_threads.kind_of? Fixnum and num_threads > 0
  
  mutex = Mutex.new   # used to ensure only one thread is using the instruction and feedback pipes at a time
  threads = []        # will hold the pool of threads so we can join them later
  items = {}          # contains items returned by self.next indexed by their object_id - used to prevent garbage collection
  exception = nil     # contains the exception raised by any thread
  
  instruction_r, instruction_w = IO.pipe  # Used to assign items to the threads. 'stop' is sent to terminate the thread.
  feedback_r, feedback_w = IO.pipe        # Used by threads to indicate completion of an item
  
  # Alright. Let's make some threads!
  num_threads.times do
    threads << Thread.new do
      instruction = nil       # Contains a string of the next instruction - either something like '63913' or 'stop'
      while true              # Main evaluation loop - exited explicitly through break
        mutex.synchronize{instruction = instruction_r.gets.chomp} # One thread at a time may read an instruction
        break if instruction == 'stop'                            # Stop if told to stop
        begin
          block.call(ObjectSpace._id2ref(instruction.to_i))       # Call the block on the referenced item
        rescue Exception => e
          exception = e                                           # Any exceptions are caught and sent to the main thread
        end
        mutex.synchronize{feedback_w.puts instruction}            # Report completion to the main thread
      end
    end
  end
  
  # The threads are now armed and ready to evaluate
  begin
    # Start by sending as many items as there are threads
    num_threads.times do
      item = self.next                    # Grab the next item
      items[item.object_id] = item        # Store it so it won't be garbage collected
      instruction_w.puts item.object_id   # Send it to the thread pool
    end
    # Then send items as old ones come back (break out of loop when we reach the end)
    while true
      index = feedback_r.gets.to_i        # Wait for an item to be done
      break if exception                  # Stop if a thread had an error
      items.delete index                  # Delete the completed item from the item pool
      item = self.next                    # Grab the next item
      items[item.object_id] = item        # Store it so it won't be garbage collected
      instruction_w.puts item.object_id   # Send it to the thread pool
    end
  rescue StopIteration                    # StopIteration will be raised by self.next when we reach the end of the iteration
    nil
  rescue Exception => e
    mutex.synchronize{exception = e} if exception.nil?      # Any other error will be dealt with promptly
  ensure
    begin
      (num_threads+1).times{instruction_w.puts 'stop'}      # Tell all the threads to stop
      threads.each{|t| t.join}                              # and wait for them to stop
    rescue Exception => e
      mutex.synchronize{exception = e} if exception.nil?    # Any error at this stage will be dealt with promptly
    ensure
      threads.each{|t| t.kill}                              # Threads should have stopped by now, but if not, they die.
      [instruction_r, instruction_w, feedback_r, feedback_w ].each{|io| io.close} # Close IO
      raise exception unless exception.nil?                 # Re-raise any errors now that the thread pool is closed
      return self
    end
  end
end