Class: MPThreads::Parallel

Inherits:
Object
  • Object
show all
Defined in:
lib/multiprocess-threads.rb

Overview

Run tasks using multiple processes & threads

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(&block) ⇒ Parallel

Returns a new instance of Parallel.



30
31
32
33
# File 'lib/multiprocess-threads.rb', line 30

def initialize(&block)
  @channel = Channel.new
  @result_callback = block
end

Class Method Details

.calc_resources(count) ⇒ Object



67
68
69
70
71
72
# File 'lib/multiprocess-threads.rb', line 67

def calc_resources(count)
  kernels_count = Etc.nprocessors
  proc_count = [count, kernels_count].min
  thr_count = [1, count / proc_count].max
  [proc_count, thr_count]
end

Instance Method Details

#spawn_threads(count, proc_i, &block) ⇒ Object



57
58
59
60
61
62
63
64
# File 'lib/multiprocess-threads.rb', line 57

def spawn_threads(count, proc_i, &block)
  count.times.map do |i|
    Thread.new do
      res = @channel.instance_exec(proc_i, i, &block)
      @channel.write(res) if res
    end
  end
end

#spawn_workers(count, &block) ⇒ Object



43
44
45
46
47
48
49
50
51
52
53
54
55
# File 'lib/multiprocess-threads.rb', line 43

def spawn_workers(count, &block)
  proc_count, thr_count = Parallel.calc_resources count
  proc_count.times do |i|
    ::Process.fork do
      workers = spawn_threads(thr_count, i, &block)
      workers.each(&:join)
    rescue Interrupt
      workers.each(&:exit)
    end
  end
rescue Interrupt
  warn 'Exiting'
end

#work(workers_count = 2, &block) ⇒ Object



35
36
37
38
39
40
41
# File 'lib/multiprocess-threads.rb', line 35

def work(workers_count = 2, &block)
  spawn_workers(workers_count, &block)

  while (data = @channel.read)
    @result_callback.call data
  end
end