Class: Workflow::Orchestrator

Inherits:
Object
  • Object
show all
Defined in:
lib/scout/workflow/deployment/orchestrator/rules.rb,
lib/scout/workflow/deployment/orchestrator/chains.rb,
lib/scout/workflow/deployment/orchestrator/batches.rb,
lib/scout/workflow/deployment/orchestrator/workload.rb

Class Method Summary collapse

Class Method Details

.accumulate_rules(current, new_val) ⇒ Object

Accumulate across multiple rule sources (e.g., across jobs in a batch)



41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
# File 'lib/scout/workflow/deployment/orchestrator/rules.rb', line 41

def self.accumulate_rules(current, new_val)
  current = IndiferentHash.setup((current || {}).dup)
  new_val = IndiferentHash.setup((new_val || {}).dup)
  return current if new_val.nil? || new_val.empty?

  new_val.each do |k, value|
    case k.to_s
    when "config_keys"
      current[k] = add_config_keys current["config_keys"], value
    when "cpus"
      # choose max
      vals = [current[k], value].compact.map{|v| v.to_i }
      current[k] = vals.max unless vals.empty?
    when "time"
      # sum time budgets
      t = [current[k], value].compact.inject(0){|acc,tv| acc + Misc.timespan(tv) }
      current[k] = Misc.format_seconds(t)
    when "skip"
      skip = (current.key?(k) ? current[k] : true) && value
      if skip
        current[k] = true
      else
        current.delete k
      end
    else
      next if current.include?(k)
      current[k] = value
    end
  end
  current
end

.add_batch_deps(batches) ⇒ Object



42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
# File 'lib/scout/workflow/deployment/orchestrator/batches.rb', line 42

def self.add_batch_deps(batches)
  batches.each do |batch|
    jobs = batch[:jobs]
    all_deps = jobs.collect{|j| job_dependencies(j) }.flatten.uniq - jobs

    minimum = all_deps.dup
    all_deps.each do |dep|
      minimum -= job_dependencies(dep)
    end

    all_deps = minimum
    deps = all_deps.collect do |d|
      (batches - [batch]).select{|b| b[:jobs].collect(&:path).include? d.path }
    end.flatten.uniq
    batch[:deps] = deps
  end

  batches
end

.add_config_keys(current, new_val) ⇒ Object

Merge config_keys strings preserving order and de-duplicating tokens



4
5
6
7
8
9
# File 'lib/scout/workflow/deployment/orchestrator/rules.rb', line 4

def self.add_config_keys(current, new_val)
  return new_val if current.nil?
  current = current * ',' if Array === current
  new_val = new_val * ',' if Array === new_val
  (new_val.to_s + ',' + current.to_s).gsub(/,\s*/, ',').split(',').reverse.uniq.reverse * ","
end

.add_rules_and_consolidate(rules, batches) ⇒ Object



62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
# File 'lib/scout/workflow/deployment/orchestrator/batches.rb', line 62

def self.add_rules_and_consolidate(rules, batches)
  chain_rules = parse_chains(rules)

  batches.each do |batch|
    job_rules_acc = batch[:jobs].inject(nil) do |acc, p|
      job, deps = p
      workflow = job.workflow
      task_name = job.task_name
      task_rules = task_specific_rules(rules, workflow, task_name)
      acc = accumulate_rules(acc, task_rules.dup)
    end

    if chain = batch[:chain]
      batch[:rules] = merge_rules(chain_rules[chain][:rules].dup, job_rules_acc)
    else
      batch[:rules] = job_rules_acc
    end
  end

  begin
    batches.each do |batch|
      batch[:deps] = batch[:deps].collect do |dep|
        dep[:target] || dep
      end if batch[:deps]
    end

    batches.each do |batch|
      next if batch[:top_level].overriden?
      next unless batch[:rules] && batch[:rules][:skip]
      batch[:rules].delete :skip
      next if batch[:deps].nil?

      if batch[:deps].any?
        batch_dep_jobs = batch[:top_level].rec_dependencies
        target = batch[:deps].select do |target|
          batch_dep_jobs.include?(target[:top_level]) &&
            (batch[:deps] - [target] - target[:deps]).empty?
        end.first
        next if target.nil?
        target[:jobs] = batch[:jobs] + target[:jobs]
        target[:deps] = (target[:deps] + batch[:deps]).uniq - [target]
        target[:top_level] = batch[:top_level]
        target[:rules] = accumulate_rules(target[:rules], batch[:rules])
        batch[:target] = target
      end
      raise TryAgain
    end
  rescue TryAgain
    retry
  end

  batches.delete_if{|b| b[:target] }

  batches
end

.chain_batches(rules, chains, workload) ⇒ Object



7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
# File 'lib/scout/workflow/deployment/orchestrator/batches.rb', line 7

def self.chain_batches(rules, chains, workload)
  chain_rules = parse_chains(rules)

  batches = []
  jobs = workload.keys
  while job = jobs.pop
    next if job.done?
    matches = chains.select{|name,info| info[:jobs].include? job }
    if matches.any?
      name, info = matches.sort_by do |n, info|
        num_jobs = info[:jobs].length
        total_tasks = chain_rules[n][:tasks].values.flatten.uniq.length
        num_jobs.to_f + 1.0/total_tasks
      end.last
      jobs = jobs - info[:jobs]
      info[:chain] = name
      batch = info
    else
      batch = {:jobs => [job], :top_level => job}
    end

    chains.delete_if{|n,info| batch[:jobs].include? info[:top_level] }

    chains.each do |n,info|
      info[:jobs] = info[:jobs] - batch[:jobs]
    end

    chains.delete_if{|n,info| info[:jobs].length < 2 }

    batches << IndiferentHash.setup(batch)
  end

  batches
end

.check_chains(chains, job) ⇒ Object



4
5
6
7
8
9
10
11
12
13
14
15
# File 'lib/scout/workflow/deployment/orchestrator/chains.rb', line 4

def self.check_chains(chains, job)
  return [] if Symbol === job.overriden_task
  matches = []
  chains.each do |name, chain|
    workflow = job.overriden_workflow || job.workflow
    task_name = job.overriden_task || job.task_name
    next unless chain[:tasks].include?(workflow.to_s)
    next unless chain[:tasks][workflow.to_s].include?(task_name.to_s)
    matches << name
  end
  matches
end

.done_batch?(batch) ⇒ Boolean

Returns:

  • (Boolean)


63
64
65
66
# File 'lib/scout/workflow/deployment/orchestrator/workload.rb', line 63

def self.done_batch?(batch)
  top = batch[:top_level]
  top.done? || top.running? || (top.error? && ! top.recoverable_error?)
end

.job_batches(rules, jobs) ⇒ Object



118
119
120
121
122
123
124
125
126
127
128
129
# File 'lib/scout/workflow/deployment/orchestrator/batches.rb', line 118

def self.job_batches(rules, jobs)
  jobs = [jobs] unless Array === jobs

  workload = job_workload(jobs)
  job_chains_map = jobs.inject([]){|acc,job| acc.concat(self.job_chains(rules, job)) }

  batches = chain_batches(rules, job_chains_map, workload)
  batches = add_batch_deps(batches)
  batches = add_rules_and_consolidate(rules, batches)

  batches
end

.job_chains(rules, job, computed = {}) ⇒ Object



61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
# File 'lib/scout/workflow/deployment/orchestrator/chains.rb', line 61

def self.job_chains(rules, job, computed = {})
key = Log.fingerprint([rules, job.path, job.object_id])
return computed[key] if computed.has_key?(key)

chains = parse_chains(rules)
matches = check_chains(chains, job)
dependencies = job_dependencies(job)

job_chains = []
new_job_chains = {}
dependencies.each do |dep|
  dep_matches = check_chains(chains, dep)
  common = matches & dep_matches

  dep_chains = job_chains(rules, dep, computed)
  found = []
  dep_chains.each do |match, info|
    if common.include?(match)
      found << match
      new_info = new_job_chains[match] ||= {}
      new_info[:jobs] ||= []
      new_info[:jobs].concat info[:jobs]
      new_info[:top_level] = job
    else
      job_chains << [match, info]
    end
  end

  (common - found).each do |match|
      info = {}
      info[:jobs] = [job, dep]
      info[:top_level] = job
      job_chains << [match, info]
    end
  end

  new_job_chains.each do |match, info|
    info[:jobs].prepend job
    job_chains << [match, info]
  end

  computed[key] = job_chains
end

.job_dependencies(job) ⇒ Object



59
60
61
# File 'lib/scout/workflow/deployment/orchestrator/workload.rb', line 59

def self.job_dependencies(job)
  (job.dependencies + job.input_dependencies).uniq.select{|d| ! d.done? || d.dirty? }
end

.job_resources(rules, job) ⇒ Object

Build a numeric-only resources hash for scheduling (parallel orchestrator)



97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
# File 'lib/scout/workflow/deployment/orchestrator/rules.rb', line 97

def self.job_resources(rules, job)
  jr = IndiferentHash.setup(job_rules(rules, job) || {})

  resources = IndiferentHash.setup({})
  # Nested resources
  if jr[:resources].is_a?(Hash)
    jr[:resources].each do |k,v|
      resources[k] = v
    end
  end
  # Top-level aliases
  resources[:cpus] ||= jr[:cpus] if jr.key?(:cpus)
  resources[:IO]   ||= jr[:IO]   if jr.key?(:IO)
  resources[:io]   ||= jr[:io]   if jr.key?(:io)
  # Memory settings are ignored for numeric scheduling unless numeric
  resources[:mem]        ||= jr[:mem] if jr.key?(:mem)
  resources[:mem_per_cpu] ||= jr[:mem_per_cpu] if jr.key?(:mem_per_cpu)

  # Default resources fallback
  default_resources = rules["default_resources"]
  default_resources ||= rules["defaults"]["resources"] if rules["defaults"]
  default_resources ||= {}
  IndiferentHash.setup(default_resources).each do |k,v|
    resources[k] = v if resources[k].nil?
  end

  # If still empty, use cpus:1 as safe default
  resources = {:cpus => 1} if resources.empty?

  # Only keep numeric-like values for the scheduler summations/accounting
  numeric_resources = {}
  resources.each do |k,v|
    next if k.to_s == 'size'
    if Numeric === v
      numeric_resources[k] = v
    elsif v.respond_to?(:to_s) && v.to_s.strip =~ /^\d+(?:\.\d+)?$/
      numeric_resources[k] = v.to_s.include?(".") ? v.to_f : v.to_i
    end
  end

  IndiferentHash.setup(numeric_resources)
end

.job_rules(rules, job, force = false) ⇒ Object

Recursive job rules: accumulate down the dependency tree



87
88
89
90
91
92
93
94
# File 'lib/scout/workflow/deployment/orchestrator/rules.rb', line 87

def self.job_rules(rules, job, force = false)
  return {} if (job.done? || job.error?) && !force
  jr = task_specific_rules(rules, job.workflow.to_s, job.task_name.to_s)
  job.dependencies.each do |dep|
    jr = accumulate_rules(jr, job_rules(rules, dep))
  end
  jr
end

.job_workload(jobs) ⇒ Object



3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# File 'lib/scout/workflow/deployment/orchestrator/workload.rb', line 3

def self.job_workload(jobs)
  workload = []
  path_jobs = {}

  jobs = [jobs] unless Array === jobs

  jobs.each do |job|
    path_jobs[job.path] = job
  end

  heap = []
  heap += jobs.collect(&:path)
  while job_path = heap.pop
    j = path_jobs[job_path]
    next if j.done?
    workload << j

    deps = job_dependencies(j)
    deps.each do |d|
      path_jobs[d.path] ||= d
    end

    heap.concat deps.collect(&:path)
    heap.uniq!
  end

  path_jobs
end

.load_rules(rule_files = nil) ⇒ Object



210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
# File 'lib/scout/workflow/deployment/orchestrator/rules.rb', line 210

def self.load_rules(rule_files = nil)
  rule_files = [:default] if rule_files.nil?
  rule_files = [rule_files] unless Array === rule_files

  rule_files = rule_files.inject({}) do |acc,file|
    if Path.is_filename?(file) && Open.exists?(file) and Path.can_read?(file)
      file_rules = Open.yaml(file)
      raise "Unknown rule file #{file}" unless Hash === file_rules
    else
      orig = file
      file = Scout.etc.batch[file].find_with_extension(:yaml)

      if file.exists?
        file_rules = Open.yaml(file)
      else
        Log.debug "Rule file #{orig} not found"
        next acc
      end
    end

    file_rules = IndiferentHash.setup(file_rules)

    if file_rules[:import]
      imports = file_rules.delete(:import)
      merge_rule_file(file_rules, load_rules(imports))
    end

    merge_rule_file(acc, file_rules)
  end
end

.load_rules_for_job(jobs) ⇒ Object



241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
# File 'lib/scout/workflow/deployment/orchestrator/rules.rb', line 241

def self.load_rules_for_job(jobs)
  jobs = [jobs] unless Array === jobs

  deploy_files = jobs.collect do |job|
    job.workflow.to_s
  end.compact

  deploy_files += jobs.collect do |job|
    job.rec_dependencies.collect{|d| d.workflow }.compact.collect(&:to_s).uniq
  end.compact.flatten

  deploy_files << :default

  load_rules(deploy_files)
end

.merge_rule_file(current, new) ⇒ Object



195
196
197
198
199
200
201
202
203
204
205
206
207
208
# File 'lib/scout/workflow/deployment/orchestrator/rules.rb', line 195

def self.merge_rule_file(current, new)
  current = IndiferentHash.setup(current)
  new.each do |key,value|
    if current[key].nil?
      current[key] = value
    elsif Hash === value
      current[key] = merge_rules(current[key], value)
    else
      current[key] = value
    end
  end

  current
end

.merge_rules(current, new_val) ⇒ Object

Prefer current unless new provides config_keys; do not override existing keys by default



21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
# File 'lib/scout/workflow/deployment/orchestrator/rules.rb', line 21

def self.merge_rules(current, new_val)
  current = IndiferentHash.setup((current || {}).dup)
  new_val = IndiferentHash.setup((new_val || {}).dup)
  return current if new_val.nil? || new_val.empty?

  new_val.each do |k, value|
    case k.to_s
    when "config_keys"
      current[k] = add_config_keys current["config_keys"], value
    when 'defaults'
      current[k] = merge_rules current[k], value
    else
      next if current.include?(k)
      current[k] = value
    end
  end
  current
end

.normalize_resources_from_rules(rules_block) ⇒ Object

Helper to extract a resources hash from various rule styles



179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
# File 'lib/scout/workflow/deployment/orchestrator/rules.rb', line 179

def self.normalize_resources_from_rules(rules_block)
  return {} if rules_block.nil? || rules_block.empty?
  rules_block = IndiferentHash.setup rules_block

  r = rules_block[:resources] || {}
  r = IndiferentHash.setup r

  r = IndiferentHash.add_defaults r, 
    cpus: rules_block[:cpus] || rules_block[:task_cpus] || 1,
    time: rules_block[:time]

  r.delete_if{|k,v| v.nil?}

  IndiferentHash.setup(r)
end

.parse_chains(rules) ⇒ Object



17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
# File 'lib/scout/workflow/deployment/orchestrator/chains.rb', line 17

def self.parse_chains(rules)
  rules = IndiferentHash.setup(rules || {})
  chains = IndiferentHash.setup({})

  # Rules may contain chains under workflows and/or top-level
  rules.each do |workflow_name, wf_rules|
    next unless wf_rules.is_a?(Hash)
    next unless wf_rules["chains"]
    wf_rules["chains"].each do |name, cr|
      cr = IndiferentHash.setup(cr.dup)
      chain_tasks = cr.delete(:tasks).to_s.split(/,\s*/)
      wf = cr.delete(:workflow) if cr.include?(:workflow)

      chain_tasks.each do |task|
        chain_workflow, chain_task = task.split('#')
        chain_task, chain_workflow = chain_workflow, wf if chain_task.nil? || chain_task.empty?

        chains[name] ||= IndiferentHash.setup({:tasks => {}, :rules => cr })
        chains[name][:tasks][chain_workflow] ||= []
        chains[name][:tasks][chain_workflow] << chain_task
      end
    end
  end

  if rules["chains"]
    rules["chains"].each do |name, cr|
      cr = IndiferentHash.setup(cr.dup)
      chain_tasks = cr.delete(:tasks).to_s.split(/,\s*/)
      wf = cr.delete(:workflow)

      chain_tasks.each do |task|
        chain_workflow, chain_task = task.split('#')
        chain_task, chain_workflow = chain_workflow, wf if chain_task.nil? || chain_task.empty?

        chains[name] ||= IndiferentHash.setup({:tasks => {}, :rules => cr })
        chains[name][:tasks][chain_workflow] ||= []
        chains[name][:tasks][chain_workflow] << chain_task
      end
    end
  end

  chains
end

.resources_from_rules_hash(rules_hash, global_rules = {}) ⇒ Object

Build resources hash directly from a rules hash (e.g., consolidated batch rules)



141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
# File 'lib/scout/workflow/deployment/orchestrator/rules.rb', line 141

def self.resources_from_rules_hash(rules_hash, global_rules = {})
  rules_hash = IndiferentHash.setup(rules_hash || {})
  resources = IndiferentHash.setup({})

  # Nested resources
  if rules_hash[:resources].is_a?(Hash)
    rules_hash[:resources].each{|k,v| resources[k] = v }
  end
  # Top-level cpus/IO
  resources[:cpus] ||= rules_hash[:cpus] if rules_hash.key?(:cpus)
  resources[:IO]   ||= rules_hash[:IO]   if rules_hash.key?(:IO)
  resources[:io]   ||= rules_hash[:io]   if rules_hash.key?(:io)
  resources[:mem]        ||= rules_hash[:mem] if rules_hash.key?(:mem)
  resources[:mem_per_cpu] ||= rules_hash[:mem_per_cpu] if rules_hash.key?(:mem_per_cpu)

  # Default resources fallback from global rules
  default_resources = global_rules["default_resources"]
  default_resources ||= global_rules["defaults"]["resources"] if global_rules["defaults"]
  default_resources ||= {}
  IndiferentHash.setup(default_resources).each do |k,v|
    resources[k] = v if resources[k].nil?
  end

  # Numeric-only for local scheduling
  numeric_resources = {}
  resources.each do |k,v|
    next if k.to_s == 'size'
    if Numeric === v
      numeric_resources[k] = v
    elsif v.respond_to?(:to_s) && v.to_s.strip =~ /^\d+(?:\.\d+)?$/
      numeric_resources[k] = v.to_s.include?(".") ? v.to_f : v.to_i
    end
  end

  IndiferentHash.setup(numeric_resources)
end

.task_specific_rules(rules, workflow, task) ⇒ Object

Compute task-specific rules: defaults -> workflow defaults -> task overrides



74
75
76
77
78
79
80
81
82
83
84
# File 'lib/scout/workflow/deployment/orchestrator/rules.rb', line 74

def self.task_specific_rules(rules, workflow, task)
  rules = IndiferentHash.setup(rules || {})
  defaults = IndiferentHash.setup(rules[:defaults] || {})
  wf = workflow.to_s
  tk = task.to_s

  wf_defaults = merge_rules(workflow_rules(rules, wf), defaults)
  return IndiferentHash.setup(wf_defaults.dup) if rules[wf].nil? || rules[wf][tk].nil?

  merge_rules(rules[wf][tk], wf_defaults)
end

.workflow_rules(rules, workflow) ⇒ Object

Workflow-level defaults



12
13
14
15
16
17
18
# File 'lib/scout/workflow/deployment/orchestrator/rules.rb', line 12

def self.workflow_rules(rules, workflow)
  rules = IndiferentHash.setup(rules || {})
  wf = workflow.to_s
  return {} if rules[wf].nil?
  return {} if rules[wf]["defaults"].nil?
  IndiferentHash.setup(rules[wf]["defaults"].dup)
end