Class: Braavos::Parallel

Inherits:
Object
  • Object
show all
Defined in:
lib/braavos/parallel.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(command = "parallel", parallel_jobs = "100%") ⇒ Parallel

wget ftp.gnu.org/gnu/parallel/parallel-latest.tar.bz2 tar -xvjf parallel* cd parallel* ./configure && make && sudo make install



12
13
14
15
16
17
18
# File 'lib/braavos/parallel.rb', line 12

def initialize(command = "parallel", parallel_jobs = "100%")
  @parallel_cmd = command
  @parallel_jobs = parallel_jobs

  validate_parallel_installed
  validate_timeout_installed
end

Instance Attribute Details

#parallel_cmdObject

Returns the value of attribute parallel_cmd.



5
6
7
# File 'lib/braavos/parallel.rb', line 5

def parallel_cmd
  @parallel_cmd
end

#parallel_jobsObject

Returns the value of attribute parallel_jobs.



5
6
7
# File 'lib/braavos/parallel.rb', line 5

def parallel_jobs
  @parallel_jobs
end

Instance Method Details

#execute(script, input, options = {}) ⇒ Object



20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
# File 'lib/braavos/parallel.rb', line 20

def execute(script, input, options={})
  script_temp_file = Tempfile.new('brav-parl-scr')
  script_temp_file.write(script)
  script_temp_file.close
  FileUtils.chmod("+rx", script_temp_file.path)

  input_files = generate_input_files(input)

  begin
    input_file_names = input_files.map{|inf| inf.path}.join(' ')
    command = generate_command(script_temp_file, input_file_names, options)
    Braavos.logger.info("Parallel Execute: #{command}")

    rval = execute_command command

    unless rval
      Braavos.logger.info("Retrying failed parallel jobs.")
      retry_command = generate_command(script_temp_file, input_file_names, options, true)
      rval = execute_command retry_command
    end

    raise StandardError, "Failed to run backup due to errored jobs" unless rval
  ensure
    input_files.each do |inf| inf.close! end
  end
end

#execute_command(command) ⇒ Object



82
83
84
85
86
87
# File 'lib/braavos/parallel.rb', line 82

def execute_command(command)
  system(command)
  success = $?.exitstatus == 0
  Braavos.logger.warn("Parallel execution failed with status #{$?.exitstatus}") unless success
  success
end

#generate_command(script_temp_file, input_file_names, options, resume_failed = false) ⇒ Object



73
74
75
76
77
78
79
80
# File 'lib/braavos/parallel.rb', line 73

def generate_command(script_temp_file, input_file_names, options, resume_failed = false)
  joblog_path = "/tmp/braavos_joblog.txt"
  if resume_failed
    "parallel --no-notice --joblog #{joblog_path} --resume-failed -j #{parallel_jobs} #{options[:parallel_opts] || ''} --xapply #{script_temp_file.path} :::: #{input_file_names} "
  else
    "parallel --no-notice --joblog #{joblog_path} -j #{parallel_jobs} #{options[:parallel_opts] || ''} --xapply #{script_temp_file.path} :::: #{input_file_names} "
  end
end

#generate_input_files(input) ⇒ Object



47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
# File 'lib/braavos/parallel.rb', line 47

def generate_input_files(input)
  input_files = []

  if (first = input.first) && first.is_a?(Array)
    (0..first.size).each do |i|
      input_tf = Tempfile.new('brav-parl-inp')
      input_files << input_tf
      File.open(input_tf.path, 'w') do |f|
        input.each do |minput|
          f.puts minput[i]
        end
      end
    end
  else
    input_tf = Tempfile.new('parl-inp')
    input_files << input_tf
    File.open(input_tf.path, 'w') do |f|
      input.each do |minput|
        f.puts minput
      end
    end
  end

  input_files
end

#validate_parallel_installedObject



89
90
91
92
93
# File 'lib/braavos/parallel.rb', line 89

def validate_parallel_installed
  `#{parallel_cmd} --version`
rescue StandardError
  raise ArgumentError, "GNU parallel not available"
end

#validate_timeout_installedObject



95
96
97
98
99
100
101
102
103
# File 'lib/braavos/parallel.rb', line 95

def validate_timeout_installed
  `timeout --version`
rescue StandardError
  begin
    `gtimeout --version`
  rescue StandardError
    raise ArgumentError, "timeout from GNU coreutils is not available"
  end
end