Class: MiGA::Parallel
Overview
Parallel execution in MiGA.
Constant Summary
Constants included from MiGA
CITATION, VERSION, VERSION_DATE, VERSION_NAME
Instance Attribute Summary
Attributes included from Common::Net
Class Method Summary collapse
-
.assess_success(status) ⇒ Object
Assesses the success of all thread exit codes and raises an error if any of the children status in
statusfailed. -
.distribute(enum, threads, &blk) ⇒ Object
Distributes
enumacrossthreadsand calls the passed block with args: 1. -
.process(threads) ⇒ Object
Executes the passed block with the thread number as argument (0-numbered) in
threadsprocesses. -
.thread_enum(enum, threads, thr) ⇒ Object
Enum through
enumexecuting the passed block only for thread with indexthr, one ofthreadsthreads.
Methods inherited from MiGA
CITATION, CITATION_ARRAY, DEBUG, DEBUG_OFF, DEBUG_ON, DEBUG_TRACE_OFF, DEBUG_TRACE_ON, FULL_VERSION, LONG_VERSION, VERSION, VERSION_DATE, #advance, debug?, debug_trace?, initialized?, #like_io?, #num_suffix, rc_path, #result_files_exist?, #say
Methods included from Common::Path
Methods included from Common::Format
#clean_fasta_file, #seqs_length, #tabulate
Methods included from Common::Net
#download_file_ftp, #http_request, #known_hosts, #main_server, #net_method, #normalize_encoding, #remote_connection
Methods included from Common::SystemCall
Class Method Details
.assess_success(status) ⇒ Object
Assesses the success of all thread exit codes and raises an error if any of the children status in status failed. It can be used as:
status = MiGA::Parallel.process(3) { |i| 1/i }
MiGA::Parallel.assess_success(status)
Or in conjunction with MiGA::Parallel.distribute
43 44 45 46 47 48 49 50 51 |
# File 'lib/miga/parallel.rb', line 43 def assess_success(status) failed = status.map { |i| i[1].success? ? 0 : 1 }.inject(:+) return if failed.zero? raise MiGA::Error.new( "Child threads failed: #{failed}/#{status.size}. " \ "Maximum exit status: #{status.map { |i| i[1].exitstatus || 0 }.max}" ) end |
.distribute(enum, threads, &blk) ⇒ Object
Distributes enum across threads and calls the passed block with args:
-
Unitary object from
enum -
Index of the unitary object
-
Index of the acting thread
21 22 23 |
# File 'lib/miga/parallel.rb', line 21 def distribute(enum, threads, &blk) process(threads) { |thr| thread_enum(enum, threads, thr, &blk) } end |
.process(threads) ⇒ Object
Executes the passed block with the thread number as argument (0-numbered) in threads processes
10 11 12 13 14 |
# File 'lib/miga/parallel.rb', line 10 def process(threads) threads.times .map { |i| Process.fork { yield(i) } } .map { |pid| Process.waitpid2(pid) } end |
.thread_enum(enum, threads, thr) ⇒ Object
Enum through enum executing the passed block only for thread with index thr, one of threads threads. The passed block has the same arguments as the one in #distribute
29 30 31 32 33 |
# File 'lib/miga/parallel.rb', line 29 def thread_enum(enum, threads, thr) enum.each_with_index do |obj, idx| yield(obj, idx, thr) if idx % threads == thr end end |