Class: Universa::Parallel::ParallelEnumerable

Inherits:
SimpleDelegator
  • Object
show all
Includes:
Concurrent
Defined in:
lib/universa/tools.rb

Overview

The eollection-like delegate supporting parallel execution on #each, #each_with_index and #map. Use refine Enumerable to easily construct it as simple as collection.par. Inspired by Scala.

Constant Summary collapse

@@pool =
CachedThreadPool.new

Instance Method Summary collapse

Instance Method Details

#each(&block) ⇒ Object

Call the given block on each item in the collection in parallel, blocks until all items are processed

Returns:

  • self



56
57
58
# File 'lib/universa/tools.rb', line 56

def each &block
  each_with_index {|x, i| block.call(x)}
end

#each_with_index(&block) ⇒ Object

Enumerates in parallel all items. Like Enumerable#each_with_index, but requires block. Blocks until all items are processed.

Parameters:

  • block (Proc)

    to call with (object, index) parameters

Returns:

  • self



36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
# File 'lib/universa/tools.rb', line 36

def each_with_index &block
  latch = CountDownLatch.new(size)
  __getobj__.each_with_index {|x, index|
    @@pool << -> {
      begin
        block.call(x, index)
      rescue
        $!.print_stack_trace
      ensure
        latch.count_down
      end
    }
  }
  latch.wait
  self
end

#map(&block) ⇒ Object Also known as: collect

Parallel version of the Enumerable#map. Creates a new array containing the values returned by the block, using parallel execution in threads.

Returns:

  • new array containing the values returned by the block.



64
65
66
67
68
69
70
# File 'lib/universa/tools.rb', line 64

def map &block
  result = size.times.map {nil}
  each_with_index {|value, i|
    result[i] = block.call(value)
  }
  result.par
end