Module: BatchExperiment
- Defined in:
- lib/batch_experiment.rb
Defined Under Namespace
Classes: FilenameSanitizer
Class Method Summary collapse
-
.batch(commands, conf) ⇒ String
Takes a list of commands, execute them only on the designed core/cpus, and kill them if the timeout expires, never lets a core/cpu rest for more than conf seconds between a command and another.
-
.experiment(comms_info, batch_conf, conf, files) ⇒ NilClass, Array<String>
Takes N shell commands and M files/parameters, execute each command of the N commands over the M files, save the output of each command/file combination, use objects provided with the command to extract relevant information from the output file, and group those information in a CVS file.
-
.gencommff(comm, patt, files) ⇒ Array<String>
gencommff: GENerate COMMands For Files.
-
.intercalate(xss) ⇒ Array<Object>
Intercalate a variable number of variable sized arrays in one array.
-
.update_finished(free_cpus, comms_running, comms_executed) ⇒ Object
Internal use only.
Class Method Details
.batch(commands, conf) ⇒ String
This procedure was not designed to support equal commands (the last equal command executed will subscribe the ‘.out’, ‘.err’ and ‘.unfinished’ files used by any previous equal command). But the parameter conf can be used to circumvent the restriction over equal commands (if the object has state it can return a different filename for every time it’s called with the same argument).
This procedure makes use of the following linux commands: time (not the bash internal one, but the package one, i.e. www.archlinux.org/packages/extra/x86_64/time/); timeout (from coreutils); taskset (from util-linux, www.archlinux.org/packages/core/x86_64/util-linux/); sh (the shell).
The command is executed inside a call to “sh -c command”, so it has to be a valid sh command.
The output of the command “time -f #:time_fmt” will be appended to the ‘.out’ file of every command. If you set conf to a empty string only a newline will be appended.
Takes a list of commands, execute them only on the designed core/cpus, and kill them if the timeout expires, never lets a core/cpu rest for more than conf seconds between a command and another. The conf is called over the commands to generate partial filenames. Appending ‘.out’ to one of the partial filenames will give the filename were the command stdout was redirected. The analogue is valid for ‘.err’ and stderr. The first partial filename corresponds to the first command in commands, and so on. Right before a command begans to run, a “partial_filename.#:unfinished_ext” file is created. After the command ends its execution this file is removed. If the command ends its execution by means of a timeout the file is also removed. The file only remains if the batch procedure is interrupted (not a specific command).
97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 |
# File 'lib/batch_experiment.rb', line 97 def self.batch(commands, conf) # Throw exceptions if required configurations aren't provided. fail "conf[:cpus_available] not set" unless conf[:cpus_available] fail "conf[:timeout] not set" unless conf[:timeout] # Initialize optional configurations with default values if they weren't # provided. Don't change the conf argument, only our version of conf. conf = conf.clone conf[:time_fmt] ||= 'ext_time: %e\\next_mem: %M\\n' conf[:unfinished_ext] ||= '.unfinished' conf[:out_ext] ||= '.out' conf[:err_ext] ||= '.err' conf[:busy_loop_sleep] ||= 0.1 conf[:post_timeout] ||= 5 conf[:fname_sanitizer] ||= BatchExperiment::FilenameSanitizer.new conf[:skip_done_comms] = true if conf[:skip_done_comms].nil? # Initialize main variables free_cpus = conf[:cpus_available].clone comms_running = [] cpu = nil comms_executed = [] commands.each do | command | commfname = conf[:fname_sanitizer].call(command) out_fname = commfname + conf[:out_ext] err_fname = commfname + conf[:err_ext] lockfname = commfname + conf[:unfinished_ext] if conf[:skip_done_comms] && File.exists?(out_fname) if File.exists?(lockfname) puts "found file #{out_fname}, but a #{lockfname} also exists" puts "will execute command '#{command}' anyway" else puts "found file #{commfname}, skipping command: #{command}" STDOUT.flush next end end puts "waiting to execute command: #{command}" STDOUT.flush while free_cpus.empty? do sleep conf[:busy_loop_sleep] update_finished(free_cpus, comms_running, comms_executed) end cpu = free_cpus.pop cproc = ChildProcess.build( 'taskset', '-c', cpu.to_s, 'time', '-f', conf[:time_fmt], '--append', '-o', out_fname, 'timeout', '--preserve-status', '-k', "#{conf[:post_timeout]}s", "#{conf[:timeout]}s", 'sh', '-c', command ) File.open(lockfname, 'w') {} # empty on purpose out = File.open(out_fname, 'w') err = File.open(err_fname, 'w') cproc.io.stdout = out cproc.io.stderr = err cproc.start comms_running << { proc: cproc, cpu: cpu, lockfname: lockfname, command: command } puts "command assigned to cpu#{cpu}" STDOUT.flush end until comms_running.empty? do sleep conf[:busy_loop_sleep] update_finished(free_cpus, comms_running, comms_executed) end comms_executed end |
.experiment(comms_info, batch_conf, conf, files) ⇒ NilClass, Array<String>
Takes N shell commands and M files/parameters, execute each command of the N commands over the M files, save the output of each command/file combination, use objects provided with the command to extract relevant information from the output file, and group those information in a CVS file. Easier to understand seeing the sample_batch.rb example in action.
270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 |
# File 'lib/batch_experiment.rb', line 270 def self.experiment(comms_info, batch_conf, conf, files) # Throw exceptions if required configurations aren't provided. fail 'conf[:csvfname] is not defined' unless conf[:csvfname] # Initialize optional configurations with default values if they weren't # provided. Don't change the conf argument, only our version of conf. conf = conf.clone conf[:separator] ||= ';' conf[:ic_columns] = true if conf[:ic_columns].nil? conf[:ic_comms] = true if conf[:ic_comms].nil? #conf[:skip_commands] defaults to false/nil # Get some of the batch config that we use inside here too. out_ext = batch_conf[:out_ext] || '.out' unfinished_ext = batch_conf[:unfinished_ext] || '.unfinished' fname_sanitizer = batch_conf[:fname_sanitizer] fname_sanitizer ||= BatchExperiment::FilenameSanitizer.new # Create commands the templates and the file list. comms_sets = [] comms_info.each do | comm_info | comms_sets << gencommff(comm_info[:command], comm_info[:pattern], files) end comm_list = conf[:ic_comm] ? intercalate(comms_sets) : comms_sets.flatten # Execute the commands (or not). ret = batch(comm_list, batch_conf) unless conf[:skip_commands] # Build header (first csv line, column names). header = [] comms_info.each do | comm_info | prefixed_names = comm_info[:extractor].names.map do | name | (comm_info[:prefix] + ' ') << name end header << prefixed_names end header = intercalate(header) if conf[:ic_columns] header = ['Filename'].concat(header).join(conf[:separator]) # Build body (inspect all output files an make csv lines). body = [header] files.each_with_index do | inst_fname, j | line = [] comms_info.each_with_index do | comm_info, i | command = if conf[:ic_comm] comm_list[(j * comms_info.size) + i] else comm_list[(i * files.size) + j] end partial_fname = fname_sanitizer.call(command) out_fname = partial_fname + out_ext lockfname = partial_fname + unfinished_ext if File.exists?(out_fname) f_content = File.open(out_fname, 'r') { | f | f.read } line << comm_info[:extractor].extract(f_content) else # if the file wasn't created insert a empty column set # of the same size the true column set would be line << comm_info[:extractor].names.map { | _ | '' } end end line = intercalate(line) if conf[:ic_columns] body << [inst_fname].concat(line).join(conf[:separator]) end body = body.map! { | line | line << conf[:separator] }.join("\n") # Write CSV data into a CSV file. File.open(conf[:csvfname], 'w') { | f | f.write(body) } return ret end |
.gencommff(comm, patt, files) ⇒ Array<String>
gencommff: GENerate COMMands For Files
191 192 193 194 195 |
# File 'lib/batch_experiment.rb', line 191 def self.gencommff(comm, patt, files) ret = [] files.each { | f | ret << comm.gsub(patt, f) } ret end |
.intercalate(xss) ⇒ Array<Object>
Intercalate a variable number of variable sized arrays in one array.
204 205 206 207 208 209 210 211 212 213 214 215 216 |
# File 'lib/batch_experiment.rb', line 204 def self.intercalate(xss) ret = [] xss = xss.map { | xs | xs.reverse } until xss.empty? do xss.delete_if do | xs | unless xs.empty? ret << xs.pop end xs.empty? end end ret end |
.update_finished(free_cpus, comms_running, comms_executed) ⇒ Object
Internal use only. DO NOT DEPEND. Remove any finished commands from comms_running, insert the cpus freed by the commands termination to the free_cpus, insert the terminated commands on comms_executed.
22 23 24 25 26 27 28 29 30 31 |
# File 'lib/batch_experiment.rb', line 22 def self.update_finished(free_cpus, comms_running, comms_executed) comms_running.delete_if do | job | if job[:proc].exited? free_cpus.push(job[:cpu]) File.delete(job[:lockfname]) comms_executed << job[:command] end job[:proc].exited? # bool returned to delete_if end end |