Module: Array::ParallelMap

Defined in:
lib/vex/base/array/parallel_map.rb

Overview

a simple (really really simple) map-in-parallel multithreaded mapper.

Defined Under Namespace

Modules: Etest

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.timeout(opts, &block) ⇒ Object



7
8
9
10
11
12
13
# File 'lib/vex/base/array/parallel_map.rb', line 7

def self.timeout(opts, &block)
  return yield unless opts[:timeout]
  begin
    Timeout.timeout(opts[:timeout], &block)
  rescue Timeout::Error
  end
end

Instance Method Details

#peach(opts = {}, &block) ⇒ Object



30
31
32
33
34
# File 'lib/vex/base/array/parallel_map.rb', line 30

def peach(opts = {}, &block)
  peach_with_index(opts) do |data, index|
    yield(data)
  end
end

#peach_with_index(opts = {}, &block) ⇒ Object



15
16
17
18
19
20
21
22
23
24
25
26
27
28
# File 'lib/vex/base/array/parallel_map.rb', line 15

def peach_with_index(opts = {}, &block)
  return [] if empty?
  
  threads = []
  
  each_with_index do |data, index|
    threads << Thread.new {
      Array::ParallelMap.timeout(opts) { yield(data, index) }
    }
  end
  
  threads.each do |thread| thread.join end
  self
end

#pmap(opts = {}, &block) ⇒ Object



36
37
38
39
40
41
42
43
44
45
46
47
# File 'lib/vex/base/array/parallel_map.rb', line 36

def pmap(opts = {}, &block)
  semaphore = Mutex.new
  
  results = []

  peach_with_index(opts) do |data, index|
    r = yield(data)
    semaphore.synchronize { results[index] = r } 
  end

  results
end