Module: Enumerable

Defined in:
lib/rake/opt/parallel/patches/enumerable.rb

Overview

Add some parallel iterators to Enumerable

Instance Method Summary collapse

Instance Method Details

#execution_id_wrapper(id, &block) ⇒ Object



50
51
52
53
54
55
56
57
58
59
60
61
62
# File 'lib/rake/opt/parallel/patches/enumerable.rb', line 50

def execution_id_wrapper(id, &block)
  # Have the block set an execution_id on launch
  new_block = lambda do |*args|
    if id
      Thread.current.execution_id = id
    else
      Thread.current.reset_execution_id
    end
    output = block.call(*args) if block
    Thread.current.clear_execution_id
    return output
  end
end

#parallel_each(options = {}, &block) ⇒ Object



6
7
8
9
# File 'lib/rake/opt/parallel/patches/enumerable.rb', line 6

def parallel_each(options={}, &block)
  parallel_map(options, &block)
  return self.each {}
end

#parallel_map(options = {}, &block) ⇒ Object



11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# File 'lib/rake/opt/parallel/patches/enumerable.rb', line 11

def parallel_map(options={}, &block)
  # Convert options hash to a mash
  options = ::Hashie::Mash.new(options)

  use_seperate_pool = options.force.present? || options.unlimited
  execution_ids = options[:execution_ids]
  if execution_ids
    raise 'The execution_ids array must be same length as Enum Object' if execution_ids.size != self.count
  else
    execution_ids = Array.new(self.size)
  end

  if use_seperate_pool
    thread_count = options.unlimited ? self.size : options.force
    # Create a seperate thread pool
    thread_pool = Rake::ThreadPool.new(thread_count-1)
    thread_pool.add_reference
  else
    # Otherwise, use the existing thread pool
    thread_pool = Rake.application.thread_pool
  end

  # Convert them to Promises to be fulfuilled
  promises = self.to_a.map do |item|
    thread_pool.future(item, &execution_id_wrapper(execution_ids.shift, &block))
  end

  # Fulfill all promises
  promises.reverse_each { |f| f.value }

  # Remove ThreadPool instance references if it's not the main one
  if thread_pool != Rake.application.thread_pool
    thread_pool.remove_reference
    thread_pool = nil
  end

  return promises.map { |f| f.value }
end