Module: Enumerable
- Defined in:
- lib/forkify.rb
Instance Method Summary collapse
-
#forkify(procs = 5, &block) ⇒ Object
Forkify will process block’s actions using processes.
Instance Method Details
#forkify(procs = 5, &block) ⇒ Object
Forkify will process block’s actions using processes. If no number of processes is given, the default of 5 will be used. If there are less than procs number of items in the Enumerable type, less processes will be spawned.
It should be noted that forkify will always return an Array at this time, so be careful with Hash objects.
Examples
[1, 2, 3].forkify { |n| n*2 } => [2, 4, 6]
{:a => 1, :b => 2, :c => 3}.forkify { |k, v| [v, k] } => [[1, :a], [2, :b], [3, :c]]
10.times.forkify(10) { sleep(1) } => [1, 1, 1, 1, 1, 1, 1, 1, 1, 1] (runs for less than 2 seconds)
22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 |
# File 'lib/forkify.rb', line 22 def forkify procs = 5, &block puts "Forkify Class: #{self.class}" if FORKIFY_DEBUG if self === Array items = self else begin items = self.to_a rescue NoMethodError => e raise NoMethodError, "Unable to coerce #{self.inspect} to an Array type." end end results = [] offset = 0 items_remaining = items.size while (items_remaining > 0) do num_procs = procs num_procs = items_remaining if items_remaining < procs pids = [] wpipes = [] rpipes = [] num_procs.times do |i| puts "Fork # #{i}" if FORKIFY_DEBUG r, w = IO.pipe pp "r, w: #{r} #{w}" if FORKIFY_DEBUG wpipes << w rpipes << r pid = fork unless pid r.close result = begin block.call(items[i + offset]) rescue Object => e e end w.write( Marshal.dump( result )) w.close exit! end pids << pid end offset += num_procs pp "Waiting for pids: #{pids.inspect}" if FORKIFY_DEBUG pids.each { |p| Process.waitpid(p) } # 1 select version #datawaiting_pipes = Kernel.select(rpipes, wpipes, nil, 2) #readwaiting_pipes = datawaiting_pipes[0] #writewaiting_pipes = datawaiting_pipes[1] # Switch to 2 selects instead of 1 #readwaiting_pipes = Kernel.select(rpipes, nil, nil, 2)[0] #writewaiting_pipes = Kernel.select(nil, wpipes, nil, 2)[1] # Finally settled on going through the pipes instead of select for Linux bug unless rpipes.size != wpipes.size rpipes.size.times do |i| r = rpipes[i] w = wpipes[i] pp "read: #{readwaiting_pipes}" if FORKIFY_DEBUG pp "write: #{writewaiting_pipes}" if FORKIFY_DEBUG w.close data = '' while ( buf = r.read(8192) ) data << buf end result = Marshal.load( data ) r.close pp "Pushing result: #{result}" if FORKIFY_DEBUG results << result end end items_remaining -= num_procs end return results end |