Module: Enumerable
- Defined in:
- lib/forkandreturn/enumerable.rb
Instance Method Summary collapse
-
#clustered_concurrent_collect(number_of_clusters = ForkAndReturn::Util.cores, &block) ⇒ Object
(also: #clustered_concurrent_map)
In clustered_concurrent_collect(), all objects in the enumeration are clustered.
-
#clustered_concurrent_each(*args, &block) ⇒ Object
Like clustered_concurrent_select, but it’s “each” instead of “collect”.
-
#clustered_concurrent_reject(*args, &block) ⇒ Object
Like clustered_concurrent_select, but it’s “reject” instead of “collect”.
-
#clustered_concurrent_select(*args, &block) ⇒ Object
Like clustered_concurrent_select, but it’s “select” instead of “collect”.
-
#concurrent_collect(max_concurrent_workers = -1,, &block) ⇒ Object
(also: #concurrent_map)
For each object in the enumeration, call the block in a seperate process and pass the object to the block and collect the results of the blocks.
-
#concurrent_each(*args, &block) ⇒ Object
Like concurrent_collect, but it’s “each” instead of “collect”.
-
#concurrent_reject(*args, &block) ⇒ Object
Like concurrent_collect, but it’s “reject” instead of “collect”.
-
#concurrent_select(*args, &block) ⇒ Object
Like concurrent_collect, but it’s “select” instead of “collect”.
Instance Method Details
#clustered_concurrent_collect(number_of_clusters = ForkAndReturn::Util.cores, &block) ⇒ Object Also known as: clustered_concurrent_map
In clustered_concurrent_collect(), all objects in the enumeration are clustered. Each cluster is than handled in a seperate process. Compare this to concurrent_collect(), where each object is handled in a separate process.
However, the caller won’t will not be aware of the clusters: The interface is exactly the same as concurrent_collect() and Enumerable.collect().
clustered_concurrent_collect() is suitable for handling a lot of not too CPU intensive jobs.
71 72 73 74 75 76 77 78 79 |
# File 'lib/forkandreturn/enumerable.rb', line 71 def clustered_concurrent_collect(number_of_clusters=ForkAndReturn::Util.cores, &block) number_of_clusters = 0 unless ForkAndReturn::Util.forkable? if number_of_clusters < 1 self.concurrent_collect(number_of_clusters, &block) else ThreadLimiter.handle_clusters(self, number_of_clusters, :concurrent_collect, &block) end end |
#clustered_concurrent_each(*args, &block) ⇒ Object
Like clustered_concurrent_select, but it’s “each” instead of “collect”.
115 116 117 118 119 |
# File 'lib/forkandreturn/enumerable.rb', line 115 def clustered_concurrent_each(*args, &block) clustered_concurrent_collect(*args, &block) self end |
#clustered_concurrent_reject(*args, &block) ⇒ Object
Like clustered_concurrent_select, but it’s “reject” instead of “collect”.
109 110 111 |
# File 'lib/forkandreturn/enumerable.rb', line 109 def clustered_concurrent_reject(*args, &block) self.zip(self.clustered_concurrent_collect(*args, &block)).inject([]){|r, (o, b)| r << o unless b ; r} end |
#clustered_concurrent_select(*args, &block) ⇒ Object
Like clustered_concurrent_select, but it’s “select” instead of “collect”.
103 104 105 |
# File 'lib/forkandreturn/enumerable.rb', line 103 def clustered_concurrent_select(*args, &block) self.zip(self.clustered_concurrent_collect(*args, &block)).inject([]){|r, (o, b)| r << o if b ; r} end |
#concurrent_collect(max_concurrent_workers = -1,, &block) ⇒ Object Also known as: concurrent_map
For each object in the enumeration, call the block in a seperate process and pass the object to the block and collect the results of the blocks. It must be one of the easiest ways of parallel processing for Ruby.
Example:
[1, 2, 3, 4].concurrent_collect do |object|
2*object
end # ===> [2, 4, 6, 8]
This runs each “2*object” in a seperate process. Hopefully, the processes are spread over all available CPU’s. That’s a simple way of parallel processing!
Note that the code in the block is run in a seperate process, so updating objects and variables in the block won’t affect the parent process:
count = 0
[...].concurrent_collect do
count += 1
end
count # ==> 0
concurrent_collect() is suitable for handling a couple of very CPU intensive jobs, like parsing large XML files.
25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 |
# File 'lib/forkandreturn/enumerable.rb', line 25 def concurrent_collect(max_concurrent_workers=-1, &block) max_concurrent_workers = 0 unless ForkAndReturn::Util.forkable? case when max_concurrent_workers < 0 # No limit. self.collect do |object| ForkAndReturn.fork_and_return_core do if block.arity > 1 and object.kind_of?(Enumerable) yield(*object.to_a) else yield(object) end end end.collect do |wait| wait.call end.collect do |load| load.call end.collect do |result| result.call end when max_concurrent_workers == 0 # No fork. self.collect(&block) when max_concurrent_workers > 0 self.threaded_collect(max_concurrent_workers) do |object| ForkAndReturn.fork_and_return_core do if block.arity > 1 and object.kind_of?(Enumerable) yield(*object.to_a) else yield(object) end end.call end.collect do |load| load.call end.collect do |result| result.call end end end |
#concurrent_each(*args, &block) ⇒ Object
Like concurrent_collect, but it’s “each” instead of “collect”.
95 96 97 98 99 |
# File 'lib/forkandreturn/enumerable.rb', line 95 def concurrent_each(*args, &block) concurrent_collect(*args, &block) self end |
#concurrent_reject(*args, &block) ⇒ Object
Like concurrent_collect, but it’s “reject” instead of “collect”.
89 90 91 |
# File 'lib/forkandreturn/enumerable.rb', line 89 def concurrent_reject(*args, &block) self.zip(self.concurrent_collect(*args, &block)).inject([]){|r, (o, b)| r << o unless b ; r} end |
#concurrent_select(*args, &block) ⇒ Object
Like concurrent_collect, but it’s “select” instead of “collect”.
83 84 85 |
# File 'lib/forkandreturn/enumerable.rb', line 83 def concurrent_select(*args, &block) self.zip(self.concurrent_collect(*args, &block)).inject([]){|r, (o, b)| r << o if b ; r} end |