Class: Parallel

Inherits:
Object
  • Object
show all
Defined in:
lib/parallel.rb

Constant Summary collapse

VERSION =
File.read( File.join(File.dirname(__FILE__),'..','VERSION') ).strip

Class Method Summary collapse

Class Method Details

.in_processes(count = processor_count) ⇒ Object



18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
# File 'lib/parallel.rb', line 18

def self.in_processes(count = processor_count)
  # Start writing results into n pipes
  reads = []
  writes = []
  pids = []
  count.times do |i|
    reads[i], writes[i] = IO.pipe
    pids << Process.fork do
      Marshal.dump(yield(i), writes[i]) # Serialize result
    end
  end

  kill_on_ctrl_c(pids)

  # Collect results from pipes simultanously
  # otherwise pipes get stuck when to much is written (buffer full)
  out = []
  collectors = []
  count.times do |i|
    collectors << Thread.new do
      writes[i].close

      out[i] = ''
      while text = reads[i].gets
        out[i] += text
      end

      reads[i].close
    end
  end

  collectors.each{|c| c.join }

  out.map{|x| Marshal.load(x) } # Deserialize results
end

.in_threads(count = 2) ⇒ Object



4
5
6
7
8
9
10
11
12
13
14
15
16
# File 'lib/parallel.rb', line 4

def self.in_threads(count = 2)
  out = []
  threads = []

  count.times do |i|
    threads[i] = Thread.new do
      out[i] = yield(i)
    end
  end

  threads.each{|t| t.join }
  out
end

.map(array, options = {}) ⇒ Object



54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
# File 'lib/parallel.rb', line 54

def self.map(array, options = {})
  require 'thread' # to get Thread.exclusive

  if options[:in_threads]
    method = :in_threads
    size = options[method]
  else
    method = :in_processes
    size = options[method] || processor_count
  end

  # work in #{size} threads that use threads/processes
  results = []
  current = -1

  in_threads(size) do
    # as long as there are more items, work on one of them
    loop do
      index = Thread.exclusive{ current+=1 }
      break if index >= array.size
      results[index] = *send(method, 1){ yield array[index] }
    end
  end

  results
end

.processor_countObject



81
82
83
84
85
86
87
88
# File 'lib/parallel.rb', line 81

def self.processor_count
  case RUBY_PLATFORM
  when /darwin/
    `hwprefs cpu_count`.to_i
  when /linux/
    `cat /proc/cpuinfo | grep processor | wc -l`.to_i
  end
end