Module: Parallax

Defined in:
lib/parallax.rb,
lib/parallax/worker.rb,
lib/parallax/version.rb,
lib/parallax/collector.rb,
lib/parallax/collectable.rb

Defined Under Namespace

Modules: Collectable Classes: Collector, Worker

Constant Summary collapse

VERSION =
'0.4.0'

Class Method Summary collapse

Class Method Details

.execute(elements, options = {}, &block) ⇒ Collector

Divides the given elements in groups of N and executes each chunk in parallel with the given block.

Parameters:

  • elements (Array)

    processing elements.

  • options (Hash) (defaults to: {})

    secondary options.

Returns:

  • (Collector)

    all processes output collector.



21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# File 'lib/parallax.rb', line 21

def execute(elements, options = {}, &block)
  processes = options[:processes] || Etc.nprocessors

  if options[:collector].present?
    collector = options[:collector].initialize_collector(processes)
  else
    collector = Parallax::Collector.new(processes)
  end
  elements_chunks = elements.in_groups(processes, false)
  processes.times do |worker_index|
    Process.fork do
      begin
        worker = Parallax::Worker.new(collector, worker_index)
        yield worker, elements_chunks[worker_index]
      rescue StandardError => error
        worker.rescue error
      ensure
        worker.close
      end
    end
  end
  
  until collector.all_workers_terminated?
    collector.receive
  end
  collector.close
  collector
end