Module: Enumerable

Defined in:
lib/forkandreturn/enumerable.rb

Instance Method Summary collapse

Instance Method Details

#clustered_concurrent_collect(number_of_clusters = ForkAndReturn::Util.cores, &block) ⇒ Object Also known as: clustered_concurrent_map

In clustered_concurrent_collect(), all objects in the enumeration are clustered. Each cluster is than handled in a seperate process. Compare this to concurrent_collect(), where each object is handled in a separate process.

However, the caller won’t will not be aware of the clusters: The interface is exactly the same as concurrent_collect() and Enumerable.collect().

clustered_concurrent_collect() is suitable for handling a lot of not too CPU intensive jobs.



71
72
73
74
75
76
77
78
79
# File 'lib/forkandreturn/enumerable.rb', line 71

def clustered_concurrent_collect(number_of_clusters=ForkAndReturn::Util.cores, &block)
  number_of_clusters	= 0	unless ForkAndReturn::Util.forkable?

  if number_of_clusters < 1
    self.concurrent_collect(number_of_clusters, &block)
  else
    ThreadLimiter.handle_clusters(self, number_of_clusters, :concurrent_collect, &block)
  end
end

#clustered_concurrent_each(*args, &block) ⇒ Object

Like clustered_concurrent_select, but it’s “each” instead of “collect”.



115
116
117
118
119
# File 'lib/forkandreturn/enumerable.rb', line 115

def clustered_concurrent_each(*args, &block)
  clustered_concurrent_collect(*args, &block)

  self
end

#clustered_concurrent_reject(*args, &block) ⇒ Object

Like clustered_concurrent_select, but it’s “reject” instead of “collect”.



109
110
111
# File 'lib/forkandreturn/enumerable.rb', line 109

def clustered_concurrent_reject(*args, &block)
  self.zip(self.clustered_concurrent_collect(*args, &block)).inject([]){|r, (o, b)| r << o unless b ; r}
end

#clustered_concurrent_select(*args, &block) ⇒ Object

Like clustered_concurrent_select, but it’s “select” instead of “collect”.



103
104
105
# File 'lib/forkandreturn/enumerable.rb', line 103

def clustered_concurrent_select(*args, &block)
  self.zip(self.clustered_concurrent_collect(*args, &block)).inject([]){|r, (o, b)| r << o if b ; r}
end

#concurrent_collect(max_concurrent_workers = -1,, &block) ⇒ Object Also known as: concurrent_map

For each object in the enumeration, call the block in a seperate process and pass the object to the block and collect the results of the blocks. It must be one of the easiest ways of parallel processing for Ruby.

Example:

[1, 2, 3, 4].concurrent_collect do |object|
  2*object
end   # ===> [2, 4, 6, 8]

This runs each “2*object” in a seperate process. Hopefully, the processes are spread over all available CPU’s. That’s a simple way of parallel processing!

Note that the code in the block is run in a seperate process, so updating objects and variables in the block won’t affect the parent process:

count = 0
[...].concurrent_collect do
  count += 1
end
count   # ==> 0

concurrent_collect() is suitable for handling a couple of very CPU intensive jobs, like parsing large XML files.



25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
# File 'lib/forkandreturn/enumerable.rb', line 25

def concurrent_collect(max_concurrent_workers=-1, &block)
  max_concurrent_workers	= 0	unless ForkAndReturn::Util.forkable?

  case
  when max_concurrent_workers < 0	# No limit.
    self.collect do |object|
      ForkAndReturn.fork_and_return_core do
        if block.arity > 1 and object.kind_of?(Enumerable)
          yield(*object.to_a)
        else
          yield(object)
        end
      end
    end.collect do |wait|
      wait.call
    end.collect do |load|
      load.call
    end.collect do |result|
      result.call
    end
  when max_concurrent_workers == 0	# No fork.
    self.collect(&block)
  when max_concurrent_workers > 0
    self.threaded_collect(max_concurrent_workers) do |object|
      ForkAndReturn.fork_and_return_core do
        if block.arity > 1 and object.kind_of?(Enumerable)
          yield(*object.to_a)
        else
          yield(object)
        end
      end.call
    end.collect do |load|
      load.call
    end.collect do |result|
      result.call
    end
  end
end

#concurrent_each(*args, &block) ⇒ Object

Like concurrent_collect, but it’s “each” instead of “collect”.



95
96
97
98
99
# File 'lib/forkandreturn/enumerable.rb', line 95

def concurrent_each(*args, &block)
  concurrent_collect(*args, &block)

  self
end

#concurrent_reject(*args, &block) ⇒ Object

Like concurrent_collect, but it’s “reject” instead of “collect”.



89
90
91
# File 'lib/forkandreturn/enumerable.rb', line 89

def concurrent_reject(*args, &block)
  self.zip(self.concurrent_collect(*args, &block)).inject([]){|r, (o, b)| r << o unless b ; r}
end

#concurrent_select(*args, &block) ⇒ Object

Like concurrent_collect, but it’s “select” instead of “collect”.



83
84
85
# File 'lib/forkandreturn/enumerable.rb', line 83

def concurrent_select(*args, &block)
  self.zip(self.concurrent_collect(*args, &block)).inject([]){|r, (o, b)| r << o if b ; r}
end