Class: MiGA::Parallel

Inherits:
MiGA
  • Object
show all
Defined in:
lib/miga/parallel.rb

Overview

Parallel execution in MiGA.

Constant Summary

Constants included from MiGA

CITATION, VERSION, VERSION_DATE, VERSION_NAME

Instance Attribute Summary

Attributes included from Common::Net

#remote_connection_uri

Class Method Summary collapse

Methods inherited from MiGA

CITATION, CITATION_ARRAY, DEBUG, DEBUG_OFF, DEBUG_ON, DEBUG_TRACE_OFF, DEBUG_TRACE_ON, FULL_VERSION, LONG_VERSION, VERSION, VERSION_DATE, #advance, debug?, debug_trace?, initialized?, #like_io?, #num_suffix, rc_path, #result_files_exist?, #say

Methods included from Common::Path

#root_path, #script_path

Methods included from Common::Format

#clean_fasta_file, #seqs_length, #tabulate

Methods included from Common::Net

#download_file_ftp, #http_request, #known_hosts, #main_server, #net_method, #normalize_encoding, #remote_connection

Methods included from Common::SystemCall

#run_cmd, #run_cmd_opts

Class Method Details

.assess_success(status) ⇒ Object

Assesses the success of all thread exit codes and raises an error if any of the children status in status failed. It can be used as:

status = MiGA::Parallel.process(3) { |i| 1/i }
MiGA::Parallel.assess_success(status)

Or in conjunction with MiGA::Parallel.distribute

Raises:



43
44
45
46
47
48
49
50
51
# File 'lib/miga/parallel.rb', line 43

def assess_success(status)
  failed = status.map { |i| i[1].success? ? 0 : 1 }.inject(:+)
  return if failed.zero?

  raise MiGA::Error.new(
    "Child threads failed: #{failed}/#{status.size}. " \
    "Maximum exit status: #{status.map { |i| i[1].exitstatus || 0 }.max}"
  )
end

.distribute(enum, threads, &blk) ⇒ Object

Distributes enum across threads and calls the passed block with args:

  1. Unitary object from enum

  2. Index of the unitary object

  3. Index of the acting thread



21
22
23
# File 'lib/miga/parallel.rb', line 21

def distribute(enum, threads, &blk)
  process(threads) { |thr| thread_enum(enum, threads, thr, &blk) }
end

.process(threads) ⇒ Object

Executes the passed block with the thread number as argument (0-numbered) in threads processes



10
11
12
13
14
# File 'lib/miga/parallel.rb', line 10

def process(threads)
  threads.times
    .map { |i| Process.fork { yield(i) } }
    .map { |pid| Process.waitpid2(pid) }
end

.thread_enum(enum, threads, thr) ⇒ Object

Enum through enum executing the passed block only for thread with index thr, one of threads threads. The passed block has the same arguments as the one in #distribute



29
30
31
32
33
# File 'lib/miga/parallel.rb', line 29

def thread_enum(enum, threads, thr)
  enum.each_with_index do |obj, idx|
    yield(obj, idx, thr) if idx % threads == thr
  end
end