15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
|
# File 'lib/scout/workflow/deployment/scheduler.rb', line 15
def self.process_batches(batches, process_options = {})
failed_jobs = []
pending = batches.dup
sorted = []
while pending.any?
leaf_nodes = batches.select{|batch| (batch[:deps] - sorted).empty? }
sorted.concat(leaf_nodes - sorted)
pending -= leaf_nodes
end
batch_system = Scout::Config.get :system, :batch, :scheduler, 'env:BATCH_SYSTEM', default: 'SLURM'
batch_ids = {}
sorted.collect do |batch|
job_options = batch[:rules]
job_options = IndiferentHash.add_defaults job_options, process_options.dup
if batch[:deps].nil?
batch_dependencies = []
else
top_jobs = batch[:jobs]
batch_dependencies = batch[:deps].collect{|dep|
dep_target = dep[:top_level]
id = batch_ids[dep_target].to_s
if dep_target.canfail?
'canfail:' + id
else
id
end
}
end
job_options.merge!(:batch_dependencies => batch_dependencies )
job_options.merge!(:manifest => batch[:jobs].collect{|d| d.task_signature })
begin
id, dir = case batch_system
when 'SLURM'
SLURM.run_job(batch[:top_level], job_options)
when 'LSF'
LSF.run_job(batch[:top_level], job_options)
when 'PBS'
PBS.run_job(batch[:top_level], job_options)
when nil
raise "No batch system specified"
else
raise "Unknown batch system #{batch_system}"
end
batch_ids[batch[:top_level]] = id
rescue DryRun
$!.message
end
end
end
|