Class: QueueManager
- Inherits:
-
Object
- Object
- QueueManager
- Defined in:
- lib/autoflow/queue_manager.rb
Direct Known Subclasses
Class Method Summary collapse
- .available? ⇒ Boolean
-
.descendants ⇒ Object
SELECT AND PREPARE MANAGER.
- .priority ⇒ Object
- .select_manager(options) ⇒ Object
- .select_queue_manager(exec_folder, options, jobs, persist_variables) ⇒ Object
- .system_call(cmd, path = nil, remote = FALSE, ssh = nil) ⇒ Object
Instance Method Summary collapse
- #asign_queue_id(ar_jobs, id) ⇒ Object
-
#close_file(file_name, permissions = nil) ⇒ Object
SSH.
- #create_file(file_name, path) ⇒ Object
- #create_folder(folder_name) ⇒ Object
-
#exec ⇒ Object
EXECUTING WORKFLOW WITH MANAGER.
- #get_all_deps(ar_dependencies) ⇒ Object
- #get_dependencies(job, id = nil) ⇒ Object
- #get_queue_system_dependencies(ar_dependencies) ⇒ Object
- #get_queue_system_id(shell_output) ⇒ Object
- #get_relations_and_folders ⇒ Object
-
#init_log ⇒ Object
TODO adapt to remote execution.
-
#initialize(exec_folder, options, commands, persist_variables) ⇒ QueueManager
constructor
A new instance of QueueManager.
- #launch2queue_system(job, id, buffered_jobs) ⇒ Object
- #launch_all_jobs ⇒ Object
- #launch_job_in_folder(job, id, buffered_jobs) ⇒ Object
- #make_environment_file ⇒ Object
- #read_file(file_path) ⇒ Object
- #rm_done_dependencies(job) ⇒ Object
-
#sort_jobs_by_dependencies ⇒ Object
We need job ids from queue system so we ask for each job and we give the previous queue system ids as dependencies if necessary.
- #submit_job(job, ar_dependencies) ⇒ Object
- #system_call(cmd, path = nil) ⇒ Object
- #write_file(file_name, content) ⇒ Object
-
#write_header(id, node, sh) ⇒ Object
QUEUE DEPENDANT METHODS.
- #write_job(job, sh_name) ⇒ Object
Constructor Details
#initialize(exec_folder, options, commands, persist_variables) ⇒ QueueManager
4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
# File 'lib/autoflow/queue_manager.rb', line 4 def initialize(exec_folder, , commands, persist_variables) @exec_folder = exec_folder @commands = commands @persist_variables = persist_variables @verbose = [:verbose] @show_submit = [:show_submit_command] @job_identifier = [:identifier] @files = {} @remote = [:remote] @ssh = [:ssh] @write_sh = [:write_sh] @external_dependencies = [:external_dependencies] @active_jobs = [] @extended_logging = [:extended_logging] @comment = [:comment] end |
Class Method Details
.available? ⇒ Boolean
328 329 330 |
# File 'lib/autoflow/queue_manager.rb', line 328 def self.available? return FALSE end |
.descendants ⇒ Object
SELECT AND PREPARE MANAGER
25 26 27 |
# File 'lib/autoflow/queue_manager.rb', line 25 def self.descendants ObjectSpace.each_object(Class).select { |klass| klass < self } end |
.priority ⇒ Object
332 333 334 |
# File 'lib/autoflow/queue_manager.rb', line 332 def self.priority return -1 end |
.select_manager(options) ⇒ Object
43 44 45 46 47 48 49 50 51 52 53 |
# File 'lib/autoflow/queue_manager.rb', line 43 def self.select_manager() queue_manager = nil priority = 0 descendants.each do |descendant| if descendant.available?() && priority <= descendant.priority queue_manager = descendant priority = descendant.priority end end return queue_manager end |
.select_queue_manager(exec_folder, options, jobs, persist_variables) ⇒ Object
29 30 31 32 33 34 35 36 37 38 39 40 41 |
# File 'lib/autoflow/queue_manager.rb', line 29 def self.select_queue_manager(exec_folder, , jobs, persist_variables) path_managers = File.join(File.dirname(__FILE__),'queue_managers') Dir.glob(path_managers+'/*').each do |manager| require manager end if [:batch] queue_manager = BashManager else queue_manager = select_manager() end warn("Selected queue manager: #{queue_manager}") return queue_manager.new(exec_folder, , jobs, persist_variables) end |
.system_call(cmd, path = nil, remote = FALSE, ssh = nil) ⇒ Object
260 261 262 263 264 265 266 267 268 |
# File 'lib/autoflow/queue_manager.rb', line 260 def self.system_call(cmd, path = nil, remote = FALSE, ssh = nil) cmd = "cd #{path}; " + cmd if !path.nil? if remote call = ssh.exec!(cmd) else call = %x[#{cmd}] end return call end |
Instance Method Details
#asign_queue_id(ar_jobs, id) ⇒ Object
292 293 294 295 296 |
# File 'lib/autoflow/queue_manager.rb', line 292 def asign_queue_id(ar_jobs, id) ar_jobs.each do |id_job, job| job.queue_id=id end end |
#close_file(file_name, permissions = nil) ⇒ Object
SSH
225 226 227 228 229 230 231 232 233 234 235 236 237 |
# File 'lib/autoflow/queue_manager.rb', line 225 def close_file(file_name, = nil) #SSH path, content = @files.delete(file_name) file_path = File.join(path, file_name) if @remote @ssh.exec!("echo '#{content}' > #{file_path}") @ssh.exec!("chmod #{permissions} #{file_path}") if !.nil? else local_file = File.open(file_path,'w') local_file.chmod() if !.nil? local_file.print content local_file.close end end |
#create_file(file_name, path) ⇒ Object
217 218 219 |
# File 'lib/autoflow/queue_manager.rb', line 217 def create_file(file_name, path) @files[file_name] = [path, ''] end |
#create_folder(folder_name) ⇒ Object
209 210 211 212 213 214 215 |
# File 'lib/autoflow/queue_manager.rb', line 209 def create_folder(folder_name) if @remote @ssh.exec!("if ! [ -d #{folder_name} ]; then mkdir -p #{folder_name}; fi") else Dir.mkdir(folder_name) if !File.exists?(folder_name) end end |
#exec ⇒ Object
EXECUTING WORKFLOW WITH MANAGER
59 60 61 62 63 64 65 66 67 68 |
# File 'lib/autoflow/queue_manager.rb', line 59 def exec create_folder(@exec_folder) make_environment_file if !@persist_variables.empty? create_file('versions', @exec_folder) write_file('versions',"autoflow\t#{Autoflow::VERSION}") close_file('versions') create_file('index_execution', @exec_folder) launch_all_jobs close_file('index_execution') end |
#get_all_deps(ar_dependencies) ⇒ Object
306 307 308 309 310 311 |
# File 'lib/autoflow/queue_manager.rb', line 306 def get_all_deps(ar_dependencies) final_dep = [] final_dep.concat(get_queue_system_dependencies(ar_dependencies)) if !ar_dependencies.empty? final_dep.concat(@external_dependencies) return final_dep end |
#get_dependencies(job, id = nil) ⇒ Object
285 286 287 288 289 290 |
# File 'lib/autoflow/queue_manager.rb', line 285 def get_dependencies(job, id = nil) ar_dependencies = [] ar_dependencies += job.dependencies ar_dependencies.delete(id) if !id.nil? #Delete autodependency return ar_dependencies end |
#get_queue_system_dependencies(ar_dependencies) ⇒ Object
298 299 300 301 302 303 304 |
# File 'lib/autoflow/queue_manager.rb', line 298 def get_queue_system_dependencies(ar_dependencies) queue_system_ids=[] ar_dependencies.each do |dependency| queue_system_ids << @commands[dependency].queue_id end return queue_system_ids end |
#get_queue_system_id(shell_output) ⇒ Object
324 325 326 |
# File 'lib/autoflow/queue_manager.rb', line 324 def get_queue_system_id(shell_output) end |
#get_relations_and_folders ⇒ Object
90 91 92 93 94 95 96 |
# File 'lib/autoflow/queue_manager.rb', line 90 def get_relations_and_folders relations = {} @commands.each do |name, job| relations[name] = [job.attrib[:exec_folder], job.dependencies] end return relations end |
#init_log ⇒ Object
TODO adapt to remote execution
70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 |
# File 'lib/autoflow/queue_manager.rb', line 70 def init_log #TODO adapt to remote execution log_path = [@exec_folder, '.wf_log'].join('/') #Join must assume linux systems so File.join canot be used for windows hosts log = parse_log(log_path) #TODO modify to folder job_relations_with_folders = get_relations_and_folders if @write_sh create_file('wf.json', @exec_folder) write_file('wf.json', job_relations_with_folders.to_json) close_file('wf.json') end @active_jobs.each do |task| query = log[task] if query.nil? log[task] = {'set' => [Time.now.to_i]} else log[task]['set'] << Time.now.to_i end end write_log(log, log_path, job_relations_with_folders) end |
#launch2queue_system(job, id, buffered_jobs) ⇒ Object
155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 |
# File 'lib/autoflow/queue_manager.rb', line 155 def launch2queue_system(job, id, buffered_jobs) sh_name = job.name+'.sh' if @write_sh # Write sh file #-------------------------------- create_file(sh_name, job.attrib[:exec_folder]) write_file(sh_name, '#!/usr/bin/env bash') write_file(sh_name, '##JOB_GROUP_ID='+@job_identifier) write_header(id, job, sh_name) end #Get dependencies #------------------------------------ ar_dependencies = get_dependencies(job, id) buffered_jobs.each do |id_buff_job, buff_job| ar_dependencies += get_dependencies(buff_job, id_buff_job) if @write_sh write_job(buff_job, sh_name) buff_job.attrib[:exec_folder] = job.attrib[:exec_folder] end end ar_dependencies.uniq! if @write_sh #Write sh body #-------------------------------- write_file(sh_name, 'hostname') log_file_path = [@exec_folder, '.wf_log', File.basename(job.attrib[:exec_folder])].join('/') write_file(sh_name, "flow_logger -e #{log_file_path} -s #{job.name}") write_file(sh_name, "source #{File.join(@exec_folder, 'env_file')}") if !@persist_variables.empty? write_job(job, sh_name) write_file(sh_name, "flow_logger -e #{log_file_path} -f #{job.name}") write_file(sh_name, "echo 'General time'") write_file(sh_name, "times") close_file(sh_name, 0755) end #Submit node #----------------------------------- if !@verbose queue_id = submit_job(job, ar_dependencies) job.queue_id = queue_id # Returns id of running tag on queue system asign_queue_id(buffered_jobs, queue_id) end end |
#launch_all_jobs ⇒ Object
98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 |
# File 'lib/autoflow/queue_manager.rb', line 98 def launch_all_jobs buffered_jobs = [] sorted_jobs = sort_jobs_by_dependencies sorted_jobs.each do |name, job| @active_jobs << job.name if !job.attrib[:done] end init_log sorted_jobs.each do |name, job| write_file('index_execution', "#{name}\t#{job.attrib[:exec_folder]}") if job.attrib[:done] next else rm_done_dependencies(job) end buffered_jobs = launch_job_in_folder(job, name, buffered_jobs) end end |
#launch_job_in_folder(job, id, buffered_jobs) ⇒ Object
143 144 145 146 147 148 149 150 151 152 |
# File 'lib/autoflow/queue_manager.rb', line 143 def launch_job_in_folder(job, id, buffered_jobs) create_folder(job.attrib[:exec_folder]) if !job.attrib[:buffer] # Launch with queue_system the job and all buffered jobs launch2queue_system(job, id, buffered_jobs) buffered_jobs = []#Clean buffer else # Buffer job buffered_jobs << [id, job] end return buffered_jobs end |
#make_environment_file ⇒ Object
201 202 203 204 205 206 207 |
# File 'lib/autoflow/queue_manager.rb', line 201 def make_environment_file create_file('env_file', @exec_folder) @persist_variables.each do |var, value| write_file('env_file', "export #{var}=#{value}") end close_file('env_file') end |
#read_file(file_path) ⇒ Object
239 240 241 242 243 244 245 246 247 248 |
# File 'lib/autoflow/queue_manager.rb', line 239 def read_file(file_path) content = nil if @remote res = @ssh.exec!("[ ! -f #{file_path} ] && echo 'Autoflow:File Not Found' || cat #{file_path}") content = res if !content.include?('Autoflow:File Not Found') else content = File.open(file_path).read if File.exists?(file_path) end return content end |
#rm_done_dependencies(job) ⇒ Object
133 134 135 136 137 138 139 140 141 |
# File 'lib/autoflow/queue_manager.rb', line 133 def rm_done_dependencies(job) remove=[] job.dependencies.each do |dependency| remove << dependency if @commands[dependency].attrib[:done] end remove.each do |rm| job.dependencies.delete(rm) end end |
#sort_jobs_by_dependencies ⇒ Object
We need job ids from queue system so we ask for each job and we give the previous queue system ids as dependencies if necessary
116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 |
# File 'lib/autoflow/queue_manager.rb', line 116 def sort_jobs_by_dependencies # We need job ids from queue system so we ask for each job and we give the previous queue system ids as dependencies if necessary ar_jobs = @commands.to_a sorted_jobs = [] jobs_without_dep = ar_jobs.select{|job| job.last.dependencies.empty?} sorted_jobs.concat(jobs_without_dep) while ar_jobs.length != sorted_jobs.length ids = sorted_jobs.map{|job| job.first} ar_jobs.each do |job| if !sorted_jobs.include?(job) deps = job.last.dependencies - ids sorted_jobs << job if deps.empty? end end end return sorted_jobs end |
#submit_job(job, ar_dependencies) ⇒ Object
320 321 322 |
# File 'lib/autoflow/queue_manager.rb', line 320 def submit_job(job, ar_dependencies) end |
#system_call(cmd, path = nil) ⇒ Object
250 251 252 253 254 255 256 257 258 |
# File 'lib/autoflow/queue_manager.rb', line 250 def system_call(cmd, path = nil) cmd = "cd #{path}; " + cmd if !path.nil? if @remote call = @ssh.exec!(cmd) else call = %x[#{cmd}] end return call end |
#write_file(file_name, content) ⇒ Object
221 222 223 |
# File 'lib/autoflow/queue_manager.rb', line 221 def write_file(file_name, content) @files[file_name].last << content+"\n" end |
#write_header(id, node, sh) ⇒ Object
QUEUE DEPENDANT METHODS
316 317 318 |
# File 'lib/autoflow/queue_manager.rb', line 316 def write_header(id, node, sh) end |
#write_job(job, sh_name) ⇒ Object
270 271 272 273 274 275 276 277 278 279 280 281 282 283 |
# File 'lib/autoflow/queue_manager.rb', line 270 def write_job(job, sh_name) write_file(sh_name, job.initialization) if !job.initialization.nil? if @comment cmd = '#' + job.parameters else if @extended_logging log_command = '/usr/bin/time -o process_data -v ' else log_command = 'time ' end cmd = log_command + job.parameters end write_file(sh_name, cmd) end |