Class: QueueManager

Inherits:
Object
  • Object
show all
Defined in:
lib/autoflow/queue_manager.rb

Direct Known Subclasses

BashManager, SlurmManager, SlurmManager2

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(exec_folder, options, commands, persist_variables) ⇒ QueueManager



4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# File 'lib/autoflow/queue_manager.rb', line 4

def initialize(exec_folder, options, commands, persist_variables)
  @exec_folder = exec_folder
  @commands = commands
  @persist_variables = persist_variables
  @verbose = options[:verbose]
  @show_submit = options[:show_submit_command]
  @job_identifier = options[:identifier]
  @files = {}
  @remote = options[:remote]
  @ssh = options[:ssh]
  @write_sh = options[:write_sh]
  @external_dependencies = options[:external_dependencies]
  @active_jobs = []
  @extended_logging = options[:extended_logging]
  @comment = options[:comment]
end

Class Method Details

.available?Boolean



328
329
330
# File 'lib/autoflow/queue_manager.rb', line 328

def self.available?
  return FALSE
end

.descendantsObject

SELECT AND PREPARE MANAGER



25
26
27
# File 'lib/autoflow/queue_manager.rb', line 25

def self.descendants
  ObjectSpace.each_object(Class).select { |klass| klass < self }
end

.priorityObject



332
333
334
# File 'lib/autoflow/queue_manager.rb', line 332

def self.priority
  return -1
end

.select_manager(options) ⇒ Object



43
44
45
46
47
48
49
50
51
52
53
# File 'lib/autoflow/queue_manager.rb', line 43

def self.select_manager(options)
  queue_manager = nil
  priority = 0
  descendants.each do |descendant|
    if descendant.available?(options) && priority <= descendant.priority
      queue_manager = descendant
      priority = descendant.priority
    end
  end
  return queue_manager
end

.select_queue_manager(exec_folder, options, jobs, persist_variables) ⇒ Object



29
30
31
32
33
34
35
36
37
38
39
40
41
# File 'lib/autoflow/queue_manager.rb', line 29

def self.select_queue_manager(exec_folder, options, jobs, persist_variables)
  path_managers = File.join(File.dirname(__FILE__),'queue_managers')
  Dir.glob(path_managers+'/*').each do |manager|
    require manager
  end
  if options[:batch]
    queue_manager = BashManager
  else
    queue_manager = select_manager(options)
  end
  warn("Selected queue manager: #{queue_manager}")
  return queue_manager.new(exec_folder, options, jobs, persist_variables)
end

.system_call(cmd, path = nil, remote = FALSE, ssh = nil) ⇒ Object



260
261
262
263
264
265
266
267
268
# File 'lib/autoflow/queue_manager.rb', line 260

def self.system_call(cmd, path = nil, remote = FALSE, ssh = nil)
  cmd = "cd #{path}; " + cmd if !path.nil?
  if remote
    call = ssh.exec!(cmd)
  else
    call = %x[#{cmd}] 
  end
  return call
end

Instance Method Details

#asign_queue_id(ar_jobs, id) ⇒ Object



292
293
294
295
296
# File 'lib/autoflow/queue_manager.rb', line 292

def asign_queue_id(ar_jobs, id)
  ar_jobs.each do |id_job, job|
    job.queue_id=id
  end
end

#close_file(file_name, permissions = nil) ⇒ Object

SSH



225
226
227
228
229
230
231
232
233
234
235
236
237
# File 'lib/autoflow/queue_manager.rb', line 225

def close_file(file_name, permissions = nil) #SSH
  path, content = @files.delete(file_name)
  file_path = File.join(path, file_name)
  if @remote
    @ssh.exec!("echo '#{content}' > #{file_path}")
    @ssh.exec!("chmod #{permissions} #{file_path}") if !permissions.nil?
  else
    local_file = File.open(file_path,'w')
    local_file.chmod(permissions) if !permissions.nil?
    local_file.print content
    local_file.close
  end
end

#create_file(file_name, path) ⇒ Object



217
218
219
# File 'lib/autoflow/queue_manager.rb', line 217

def create_file(file_name, path) 
  @files[file_name] = [path, '']
end

#create_folder(folder_name) ⇒ Object



209
210
211
212
213
214
215
# File 'lib/autoflow/queue_manager.rb', line 209

def create_folder(folder_name)
  if @remote
    @ssh.exec!("if ! [ -d  #{folder_name} ]; then mkdir -p #{folder_name}; fi")
  else
    Dir.mkdir(folder_name) if !File.exists?(folder_name)
  end
end

#execObject

EXECUTING WORKFLOW WITH MANAGER



59
60
61
62
63
64
65
66
67
68
# File 'lib/autoflow/queue_manager.rb', line 59

def exec
  create_folder(@exec_folder)
  make_environment_file if !@persist_variables.empty?
  create_file('versions', @exec_folder)
  write_file('versions',"autoflow\t#{Autoflow::VERSION}")
  close_file('versions')
  create_file('index_execution', @exec_folder)
  launch_all_jobs
  close_file('index_execution')
end

#get_all_deps(ar_dependencies) ⇒ Object



306
307
308
309
310
311
# File 'lib/autoflow/queue_manager.rb', line 306

def get_all_deps(ar_dependencies)
  final_dep = []
  final_dep.concat(get_queue_system_dependencies(ar_dependencies)) if !ar_dependencies.empty?
  final_dep.concat(@external_dependencies)
  return final_dep
end

#get_dependencies(job, id = nil) ⇒ Object



285
286
287
288
289
290
# File 'lib/autoflow/queue_manager.rb', line 285

def get_dependencies(job, id = nil)
  ar_dependencies = []
  ar_dependencies += job.dependencies
  ar_dependencies.delete(id) if !id.nil? #Delete autodependency
  return ar_dependencies
end

#get_queue_system_dependencies(ar_dependencies) ⇒ Object



298
299
300
301
302
303
304
# File 'lib/autoflow/queue_manager.rb', line 298

def get_queue_system_dependencies(ar_dependencies)
  queue_system_ids=[]
  ar_dependencies.each do |dependency|
    queue_system_ids << @commands[dependency].queue_id
  end
  return queue_system_ids
end

#get_queue_system_id(shell_output) ⇒ Object



324
325
326
# File 'lib/autoflow/queue_manager.rb', line 324

def get_queue_system_id(shell_output)

end

#get_relations_and_foldersObject



90
91
92
93
94
95
96
# File 'lib/autoflow/queue_manager.rb', line 90

def get_relations_and_folders
  relations = {}
  @commands.each do |name, job|
    relations[name] = [job.attrib[:exec_folder], job.dependencies]
  end
  return relations
end

#init_logObject

TODO adapt to remote execution



70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
# File 'lib/autoflow/queue_manager.rb', line 70

def init_log #TODO adapt to remote execution
  log_path = [@exec_folder, '.wf_log'].join('/') #Join must assume linux systems so File.join canot be used for windows hosts
  log = parse_log(log_path) #TODO modify to folder
  job_relations_with_folders = get_relations_and_folders
  if @write_sh
    create_file('wf.json', @exec_folder)
    write_file('wf.json', job_relations_with_folders.to_json)
    close_file('wf.json')
  end
    @active_jobs.each do |task|
      query = log[task]
      if query.nil?
      log[task] = {'set' => [Time.now.to_i]}
      else
      log[task]['set'] << Time.now.to_i
      end
  end
  write_log(log, log_path, job_relations_with_folders)
end

#launch2queue_system(job, id, buffered_jobs) ⇒ Object



155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
# File 'lib/autoflow/queue_manager.rb', line 155

def launch2queue_system(job, id, buffered_jobs)
  sh_name = job.name+'.sh'
  if @write_sh
    # Write sh file
    #--------------------------------
    create_file(sh_name, job.attrib[:exec_folder])
    write_file(sh_name, '#!/usr/bin/env bash')
    write_file(sh_name, '##JOB_GROUP_ID='+@job_identifier)
    write_header(id, job, sh_name)
  end

  #Get dependencies
  #------------------------------------
  ar_dependencies = get_dependencies(job, id)
  buffered_jobs.each do |id_buff_job, buff_job|
    ar_dependencies += get_dependencies(buff_job, id_buff_job)
    if @write_sh
      write_job(buff_job, sh_name)
      buff_job.attrib[:exec_folder] = job.attrib[:exec_folder]
    end
  end
  ar_dependencies.uniq!

  if @write_sh
    #Write sh body
    #--------------------------------
    write_file(sh_name, 'hostname')
    log_file_path = [@exec_folder, '.wf_log', File.basename(job.attrib[:exec_folder])].join('/')
    write_file(sh_name, "flow_logger -e #{log_file_path} -s #{job.name}")
    write_file(sh_name, "source #{File.join(@exec_folder, 'env_file')}") if !@persist_variables.empty?
    write_job(job, sh_name)
    write_file(sh_name, "flow_logger -e #{log_file_path} -f #{job.name}")
    write_file(sh_name, "echo 'General time'")
    write_file(sh_name, "times")
    close_file(sh_name, 0755)
  end

  #Submit node
  #-----------------------------------
  if !@verbose
    queue_id = submit_job(job, ar_dependencies)
    job.queue_id = queue_id # Returns id of running tag on queue system 
    asign_queue_id(buffered_jobs, queue_id)
  end
end

#launch_all_jobsObject



98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
# File 'lib/autoflow/queue_manager.rb', line 98

def launch_all_jobs
  buffered_jobs = []
  sorted_jobs = sort_jobs_by_dependencies
  sorted_jobs.each do |name, job|
    @active_jobs << job.name if !job.attrib[:done]
  end
  init_log
  sorted_jobs.each do |name, job|
    write_file('index_execution', "#{name}\t#{job.attrib[:exec_folder]}") 
    if job.attrib[:done]
      next
    else
      rm_done_dependencies(job)
    end 
    buffered_jobs = launch_job_in_folder(job, name, buffered_jobs)
  end
end

#launch_job_in_folder(job, id, buffered_jobs) ⇒ Object



143
144
145
146
147
148
149
150
151
152
# File 'lib/autoflow/queue_manager.rb', line 143

def launch_job_in_folder(job, id, buffered_jobs)
  create_folder(job.attrib[:exec_folder])
  if !job.attrib[:buffer]  # Launch with queue_system the job and all buffered jobs
    launch2queue_system(job, id, buffered_jobs)
    buffered_jobs = []#Clean buffer
  else # Buffer job
    buffered_jobs << [id, job]
  end
  return buffered_jobs 
end

#make_environment_fileObject



201
202
203
204
205
206
207
# File 'lib/autoflow/queue_manager.rb', line 201

def make_environment_file
  create_file('env_file', @exec_folder)
  @persist_variables.each do |var, value|
    write_file('env_file', "export #{var}=#{value}")
  end
  close_file('env_file')
end

#read_file(file_path) ⇒ Object



239
240
241
242
243
244
245
246
247
248
# File 'lib/autoflow/queue_manager.rb', line 239

def read_file(file_path)
  content = nil
  if @remote
    res = @ssh.exec!("[ ! -f #{file_path} ] && echo 'Autoflow:File Not Found' || cat #{file_path}")
    content = res if !content.include?('Autoflow:File Not Found')
  else
    content = File.open(file_path).read if File.exists?(file_path)
  end
  return content
end

#rm_done_dependencies(job) ⇒ Object



133
134
135
136
137
138
139
140
141
# File 'lib/autoflow/queue_manager.rb', line 133

def rm_done_dependencies(job)
  remove=[]
  job.dependencies.each do |dependency|      
    remove << dependency if @commands[dependency].attrib[:done]
  end
  remove.each do |rm|
    job.dependencies.delete(rm)
  end
end

#sort_jobs_by_dependenciesObject

We need job ids from queue system so we ask for each job and we give the previous queue system ids as dependencies if necessary



116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
# File 'lib/autoflow/queue_manager.rb', line 116

def sort_jobs_by_dependencies # We need job ids from queue system so we ask for each job and we give the previous queue system ids as dependencies if necessary
  ar_jobs = @commands.to_a
  sorted_jobs = []
  jobs_without_dep = ar_jobs.select{|job| job.last.dependencies.empty?}
  sorted_jobs.concat(jobs_without_dep)
  while ar_jobs.length != sorted_jobs.length
    ids = sorted_jobs.map{|job| job.first}
    ar_jobs.each do |job|
      if !sorted_jobs.include?(job) 
        deps = job.last.dependencies - ids
        sorted_jobs << job if deps.empty?
      end
    end
  end
  return sorted_jobs
end

#submit_job(job, ar_dependencies) ⇒ Object



320
321
322
# File 'lib/autoflow/queue_manager.rb', line 320

def submit_job(job, ar_dependencies)

end

#system_call(cmd, path = nil) ⇒ Object



250
251
252
253
254
255
256
257
258
# File 'lib/autoflow/queue_manager.rb', line 250

def system_call(cmd, path = nil)
  cmd = "cd #{path}; " + cmd if !path.nil?
  if @remote
    call = @ssh.exec!(cmd)
  else
    call = %x[#{cmd}] 
  end
  return call
end

#write_file(file_name, content) ⇒ Object



221
222
223
# File 'lib/autoflow/queue_manager.rb', line 221

def write_file(file_name, content)
  @files[file_name].last << content+"\n"
end

#write_header(id, node, sh) ⇒ Object

QUEUE DEPENDANT METHODS



316
317
318
# File 'lib/autoflow/queue_manager.rb', line 316

def write_header(id, node, sh)

end

#write_job(job, sh_name) ⇒ Object



270
271
272
273
274
275
276
277
278
279
280
281
282
283
# File 'lib/autoflow/queue_manager.rb', line 270

def write_job(job, sh_name) 
  write_file(sh_name, job.initialization) if !job.initialization.nil?
  if @comment
    cmd = '#' + job.parameters
  else
    if @extended_logging
      log_command = '/usr/bin/time -o process_data -v '
    else
      log_command = 'time '
    end
    cmd = log_command + job.parameters
  end
  write_file(sh_name, cmd)
end