Module: Enumerable

Defined in:
lib/forkify.rb

Instance Method Summary collapse

Instance Method Details

#forkify(procs = 5, &block) ⇒ Object

Forkify will process block’s actions using processes. If no number of processes is given, the default of 5 will be used. If there are less than procs number of items in the Enumerable type, less processes will be spawned.

It should be noted that forkify will always return an Array at this time, so be careful with Hash objects.

Examples

[1, 2, 3].forkify { |n| n*2 } => [2, 4, 6]

{:a => 1, :b => 2, :c => 3}.forkify { |k, v| [v, k] } => [[1, :a], [2, :b], [3, :c]]

10.times.forkify(10) { sleep(1) } => [1, 1, 1, 1, 1, 1, 1, 1, 1, 1] (runs for less than 2 seconds)


22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
# File 'lib/forkify.rb', line 22

def forkify procs = 5, &block
  puts "Forkify Class: #{self.class}" if FORKIFY_DEBUG
  if self === Array
    items = self
  else
    begin
      items = self.to_a
    rescue NoMethodError => e
      raise NoMethodError, "Unable to coerce #{self.inspect} to an Array type."
    end
  end

  results = []
  offset = 0

  items_remaining = items.size

  while (items_remaining > 0) do
    num_procs = procs
    num_procs = items_remaining if items_remaining < procs

    pids = []
    wpipes = []
    rpipes = []

    num_procs.times do |i|
      puts "Fork # #{i}" if FORKIFY_DEBUG
      r, w = IO.pipe
      pp "r, w: #{r} #{w}" if FORKIFY_DEBUG
      wpipes << w
      rpipes << r
      pid = fork
      unless pid
        r.close
        result = 
          begin
            block.call(items[i + offset])
          rescue Object => e
            e
          end
        w.write( Marshal.dump( result ))
        w.close
        exit!
      end

      pids << pid

    end

    offset += num_procs

    pp "Waiting for pids: #{pids.inspect}" if FORKIFY_DEBUG
    pids.each { |p| Process.waitpid(p) }

    # 1 select version
    #datawaiting_pipes = Kernel.select(rpipes, wpipes, nil, 2)
    #readwaiting_pipes = datawaiting_pipes[0]
    #writewaiting_pipes = datawaiting_pipes[1]
    
    # Switch to 2 selects instead of 1
    #readwaiting_pipes = Kernel.select(rpipes, nil, nil, 2)[0]
    #writewaiting_pipes = Kernel.select(nil, wpipes, nil, 2)[1]

    # Finally settled on going through the pipes instead of select for Linux bug
    unless rpipes.size != wpipes.size
      rpipes.size.times do |i|
        r = rpipes[i]
        w = wpipes[i]

        pp "read: #{readwaiting_pipes}" if FORKIFY_DEBUG
        pp "write: #{writewaiting_pipes}" if FORKIFY_DEBUG

        w.close
        data = ''
        while ( buf = r.read(8192) )
          data << buf
        end
        result = Marshal.load( data )
        r.close
        pp "Pushing result: #{result}" if FORKIFY_DEBUG
        results << result
      end
    end

    items_remaining -= num_procs
  end

  return results
end