Module: Workflow

Extended by:
Annotation
Defined in:
lib/scout/workflow.rb,
lib/scout/workflow/util.rb,
lib/scout/workflow/usage.rb,
lib/scout/workflow/export.rb,
lib/scout/workflow/persist.rb,
lib/scout/workflow/task/info.rb,
lib/scout/workflow/definition.rb,
lib/scout/workflow/deployment.rb,
lib/scout/workflow/documentation.rb,
lib/scout/workflow/deployment/queue.rb,
lib/scout/workflow/deployment/trace.rb

Defined Under Namespace

Modules: Scheduler Classes: LocalExecutor, Orchestrator

Constant Summary collapse

FORGET_TASK_ALIAS =
begin 
  %w(SCOUT_FORGET_TASK_ALIAS SCOUT_FORGET_DEP_TASKS RBBT_FORGET_DEP_TASKS).select do |var|
    ENV[var] == 'true'
  end.any?
end
REMOVE_TASK_ALIAS =
begin 
  remove = %w(SCOUT_REMOVE_TASK_ALIAS SCOUT_REMOVE_DEP_TASKS RBBT_REMOVE_DEP_TASKS).select do |var|
    ENV.include?(var) && ENV[var] != 'false'
  end.first
  remove.nil? ? false : remove
end

Class Attribute Summary collapse

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Annotation

list_tsv_values, load_info, load_tsv, load_tsv_values, obj_tsv_values, resolve_tsv_array, tsv

Class Attribute Details

.autoinstallObject

Returns the value of attribute autoinstall.



19
20
21
# File 'lib/scout/workflow.rb', line 19

def autoinstall
  @autoinstall
end

.directoryObject

Returns the value of attribute directory.



8
9
10
# File 'lib/scout/workflow/definition.rb', line 8

def directory
  @directory
end

.job_cacheObject

Returns the value of attribute job_cache.



19
20
21
# File 'lib/scout/workflow.rb', line 19

def job_cache
  @job_cache
end

.mainObject

Returns the value of attribute main.



19
20
21
# File 'lib/scout/workflow.rb', line 19

def main
  @main
end

.workflow_dirObject

Returns the value of attribute workflow_dir.



19
20
21
# File 'lib/scout/workflow.rb', line 19

def workflow_dir
  @workflow_dir
end

.workflow_repoObject

Returns the value of attribute workflow_repo.



19
20
21
# File 'lib/scout/workflow.rb', line 19

def workflow_repo
  @workflow_repo
end

.workflowsObject

Returns the value of attribute workflows.



19
20
21
# File 'lib/scout/workflow.rb', line 19

def workflows
  @workflows
end

Instance Attribute Details

#descriptionObject

Returns the value of attribute description.



2
3
4
# File 'lib/scout/workflow/documentation.rb', line 2

def description
  @description
end

#directoryObject

Returns the value of attribute directory.



52
53
54
# File 'lib/scout/workflow/definition.rb', line 52

def directory
  @directory
end

#documentationObject

Returns the value of attribute documentation.



49
50
51
# File 'lib/scout/workflow/documentation.rb', line 49

def documentation
  @documentation
end

#knowledge_baseObject

Returns the value of attribute knowledge_base.



61
62
63
# File 'lib/scout/workflow.rb', line 61

def knowledge_base
  @knowledge_base
end

#libdirObject

Returns the value of attribute libdir.



61
62
63
# File 'lib/scout/workflow.rb', line 61

def libdir
  @libdir
end

#titleObject

Returns the value of attribute title.



2
3
4
# File 'lib/scout/workflow/documentation.rb', line 2

def title
  @title
end

Class Method Details

.annonymous_workflow(name = nil, &block) ⇒ Object



2
3
4
5
6
7
8
9
# File 'lib/scout/workflow/util.rb', line 2

def self.annonymous_workflow(name = nil, &block)
  mod = Module.new
  mod.extend Workflow
  mod.name = name
  mod.directory = Workflow.directory[name] if name
  mod.instance_eval(&block)
  mod
end

.doc_parse_chunks(str, pattern) ⇒ Object



22
23
24
25
26
27
28
# File 'lib/scout/workflow/documentation.rb', line 22

def self.doc_parse_chunks(str, pattern)
  parts = str.split(pattern)
  return {} if parts.length < 2
  tasks = Hash[*parts[1..-1].collect{|v| v.strip}]
  tasks.delete_if{|t,d| d.empty?}
  tasks
end

.doc_parse_first_line(str) ⇒ Object



4
5
6
7
8
9
10
11
# File 'lib/scout/workflow/documentation.rb', line 4

def self.doc_parse_first_line(str)
  if str.match(/^([^\n]*)\n\n(.*)/smu)
    str.replace $2
    $1
  else
    ""
  end
end

.doc_parse_up_to(str, pattern, keep = false) ⇒ Object



13
14
15
16
17
18
19
20
# File 'lib/scout/workflow/documentation.rb', line 13

def self.doc_parse_up_to(str, pattern, keep = false)
  pre, _pat, _post = str.partition pattern
  if _pat
    [pre, (keep ? _pat << _post : _post)]
  else
    _post
  end
end

.extended(base) ⇒ Object



63
64
65
66
67
68
# File 'lib/scout/workflow.rb', line 63

def self.extended(base)
  self.workflows << base
  libdir = Path.caller_lib_dir
  return if libdir.nil?
  base.libdir = Path.setup(libdir).tap{|p| p.resource = base}
end

.get_SOPT(workflow, task) ⇒ Object



333
334
335
336
337
# File 'lib/scout/workflow/usage.rb', line 333

def self.get_SOPT(workflow, task)
  workflow = Workflow.require_workflow workflow if String === workflow
  task = workflow.tasks[task.to_sym] if String === task || Symbol === task
  workflow.get_SOPT(task)
end

.install_workflow(workflow, base_repo_url = nil) ⇒ Object



79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
# File 'lib/scout/workflow.rb', line 79

def self.install_workflow(workflow, base_repo_url = nil)
  case
  when Open.exist?(workflow)
    update_workflow_dir(workflow)
  else
    Misc.in_dir(self.workflow_dir) do
      Log.info "Installing: " + workflow

      repo_base_url ||= self.workflow_repo


      if repo_base_url.include?(workflow) or repo_base_url.include?(Misc.snake_case(workflow))
        repo = repo_base_url
      else
        begin
          repo = File.join(repo_base_url, workflow + '.git')
          CMD.cmd("wget '#{repo}' -O /dev/null").read
        rescue
          Log.debug "Workflow repo does not exist, trying snake_case: #{ repo }"
          begin
            repo = File.join(repo_base_url, Misc.snake_case(workflow) + '.git')
            CMD.cmd("wget '#{repo}' -O /dev/null").read
          rescue
            raise "Workflow repo does not exist: #{ repo }"
          end
        end
      end

      Log.warn "Cloning #{ repo }"
      Misc.insist do
        `git clone "#{repo}" #{ Misc.snake_case(workflow) }`
        raise unless $?.success?
      end
      Log.warn "Initializing and updating submodules for #{repo}. You might be prompted for passwords."
      Misc.in_dir(Misc.snake_case(workflow)) do
        `git submodule init`
        `git submodule update`
      end
    end
  end
end

.installed_workflowsObject



11
12
13
# File 'lib/scout/workflow/util.rb', line 11

def self.installed_workflows
  Path.setup("workflows").glob_all("*").collect{|f| File.basename(f) }.uniq
end

.listObject



24
25
26
# File 'lib/scout/workflow/util.rb', line 24

def self.list
  Path.setup('workflows').glob('*').collect{|p| p.basename }
end

.name2clean_name(name) ⇒ Object



2
3
4
# File 'lib/scout/workflow/deployment/queue.rb', line 2

def self.name2clean_name(name)
  name.reverse.partition("_").last.reverse
end

.parse_workflow_doc(doc) ⇒ Object



30
31
32
33
34
35
36
# File 'lib/scout/workflow/documentation.rb', line 30

def self.parse_workflow_doc(doc)
  title = doc_parse_first_line doc
  description, task_info = doc_parse_up_to doc, /^# Tasks/i
  task_description, tasks = doc_parse_up_to task_info, /^##/, true
  tasks = doc_parse_chunks tasks, /^## (.*)/ 
  {:title => title.strip, :description => description.strip, :task_description => task_description.strip, :tasks => tasks}
end

.produce(jobs) ⇒ Object



7
8
9
10
# File 'lib/scout/workflow/deployment.rb', line 7

def self.produce(jobs, ...)
  rules = Workflow::Orchestrator.load_rules_for_job(jobs)
  Workflow::LocalExecutor.produce(jobs, rules, ...)
end

.queue_job(file) ⇒ Object



6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# File 'lib/scout/workflow/deployment/queue.rb', line 6

def self.queue_job(file)
  workflow, task, name = file.split("/").values_at(-3, -2, -1) if file
  workflow = Workflow.require_workflow workflow

  if Open.directory?(file) || Open.size(file) > 0
    clean_name = name2clean_name name
    clean_name = nil if clean_name == Task::DEFAULT_NAME
    if ! Open.directory?(file) && ! File.exist?(file)
      TmpFile.with_file do |tmp|
        Open.cp file, tmp
        inputs = workflow.tasks[task].load_inputs(tmp)
        workflow.job(task, clean_name, inputs)
      end
    else
      inputs = workflow.tasks[task].load_inputs(file)
      workflow.job(task, clean_name, inputs)
    end
  else
    workflow.job(task, name)
  end
end

.require_workflow(workflow_name_orig) ⇒ Object



127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
# File 'lib/scout/workflow.rb', line 127

def self.require_workflow(workflow_name_orig)
  first = nil
  workflow_name_orig.split("+").each do |complete_workflow_name|
    self.main = nil

    Persist.memory(complete_workflow_name, prefix: "Workflow") do
      begin
        workflow_name, *subworkflows = complete_workflow_name.split("::")
        workflow_file = workflow_name
        workflow_file = Path.setup('workflows')[workflow_name]["workflow.rb"] unless Open.exists?(workflow_file) && ! Open.directory?(workflow_file)
        workflow_file = Path.setup('workflows')[Misc.snake_case(workflow_name)]["workflow.rb"] unless Open.exists?(workflow_file) && ! Open.directory?(workflow_file)
        workflow_file = Path.setup('workflows')[Misc.camel_case(workflow_name)]["workflow.rb"] unless Open.exists?(workflow_file) && ! Open.directory?(workflow_file)

        if Open.exists?(workflow_file) && ! Open.directory?(workflow_file)
          self.main = nil
          require_workflow_file(workflow_file)
        elsif autoinstall
          install_workflow(workflow_name)
          raise TryAgain
        else
          raise "Workflow #{workflow_name} not found"
        end
      rescue TryAgain
        retry
      end
    end

    current = begin
                Kernel.const_get(complete_workflow_name)
              rescue
                self.main || workflows.last
              end

    first ||= current
  end
  first
end

.require_workflow_file(file) ⇒ Object



121
122
123
124
125
# File 'lib/scout/workflow.rb', line 121

def self.require_workflow_file(file)
  file = file.find if Path === file
  $LOAD_PATH.unshift(File.join(File.dirname(file), 'lib'))
  load file
end

.trace(seed_jobs, options = {}) ⇒ Object



164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
# File 'lib/scout/workflow/deployment/trace.rb', line 164

def self.trace(seed_jobs, options = {})
  jobs = []
  seed_jobs.each do |step|
    jobs += step.rec_dependencies.to_a + [step]
    step.info[:archived_info].each do |path,ainfo|
      next unless Hash === ainfo
      archived_step = Step.new path

      archived_step.define_singleton_method :info do
        ainfo
      end

      jobs << archived_step
    end if step.info[:archived_info]

  end

  jobs = jobs.uniq.sort_by{|job| [job, job.info]; t = job.info[:started] || Open.mtime(job.path) || Time.now; Time === t ? t : Time.parse(t) }

  report_keys = options[:report_keys] || ""
  report_keys = report_keys.split(/,\s*/) if String === report_keys

  data = trace_job_times(jobs, options[:fix_gap], report_keys)

  summary = trace_job_summary(jobs, report_keys)


  raise "No jobs to process" if data.size == 0

  size, width, height = options.values_at :size, :width, :height

  size = 800 if size.nil?
  width = size.to_i * 2 if width.nil?
  height = size  if height.nil?

  if options[:plot_data]
    data
  else
    summary
  end
end

.trace_job_summary(jobs, report_keys = []) ⇒ Object



113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
# File 'lib/scout/workflow/deployment/trace.rb', line 113

def self.trace_job_summary(jobs, report_keys = [])
  tasks_info = {}

  report_keys = report_keys.collect{|k| k.to_s}

  jobs.each do |dep|
    next unless dep.info[:end]
    task = [dep.workflow.name, dep.task_name].compact.collect{|s| s.to_s} * "#"
    info = tasks_info[task] ||= IndiferentHash.setup({})
    dep_info = IndiferentHash.setup(dep.info)

    ddone = dep_info[:end]
    started = dep_info[:start]

    started = Time.parse started if String === started
    ddone = Time.parse ddone if String === ddone

    time = ddone - started
    info[:time] ||= []
    info[:time] << time

    report_keys.each do |key|
      info[key] = dep_info[key] 
    end

    dep.info[:config_keys].each do |kinfo| 
      key, value, tokens = kinfo

      info[key.to_s] = value if report_keys.include? key.to_s
    end if dep.info[:config_keys]
  end

  summary = TSV.setup({}, "Task~Calls,Avg. Time,Total Time#:type=:list")

  tasks_info.each do |task, info|
    time_lists = info[:time]
    avg_time = Misc.mean(time_lists).to_i
    total_time = Misc.sum(time_lists).to_i
    calls = time_lists.length
    summary[task] = [calls, avg_time, total_time]
  end

  report_keys.each do |key|
    summary.add_field Misc.humanize(key) do |task|
      tasks_info[task][key]
    end
  end if Array === report_keys && report_keys.any?

  summary
end

.trace_job_times(jobs, fix_gap = false, report_keys = nil) ⇒ Object



4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
# File 'lib/scout/workflow/deployment/trace.rb', line 4

def self.trace_job_times(jobs, fix_gap = false, report_keys = nil)
  data = TSV.setup({}, "Job~Code,Workflow,Task,Start,End#:type=:list")
  min_start = nil
  max_done = nil
  jobs.each do |job|
    next unless job.info[:end]
    started = job.info[:start]
    ddone = job.info[:end]

    started = Time.parse started if String === started
    ddone = Time.parse ddone if String === ddone

    code = [job.workflow.name, job.task_name].compact.collect{|s| s.to_s} * " · "
    code = job.name + " - " + code

    data[job.path] = [code, job.workflow.name, job.task_name, started, ddone]
    if min_start.nil?
      min_start = started
    else
      min_start = started if started < min_start
    end

    if max_done.nil?
      max_done = ddone
    else
      max_done = ddone if ddone > max_done
    end
  end

  data.add_field "Start.second" do |k,value|
    value["Start"] - min_start
  end

  data.add_field "End.second" do |k,value|
    value["End"] - min_start
  end

  if fix_gap
    ranges = []
    data.through do |k,values|
      start, eend = values.values_at "Start.second", "End.second"

      ranges << (start..eend)
    end

    gaps = {}
    last = nil
    Misc.collapse_ranges(ranges).each do |range|
      start = range.begin
      eend = range.end
      if last
        gaps[last] = start - last
      end
      last = eend
    end

    data.process "End.second" do |value,k,values|
      gap = Misc.sum(gaps.select{|pos,size| pos < values["Start.second"]}.collect{|pos,size| size})
      value - gap
    end

    data.process "Start.second" do |value,k,values|
      gap = Misc.sum(gaps.select{|pos,size| pos < values["Start.second"]}.collect{|pos,size| size})
      value - gap
    end

    total_gaps = Misc.sum(gaps.collect{|k,v| v})
    Log.info "Total gaps: #{total_gaps} seconds"
  end

  if report_keys && report_keys.any?
    job_keys = {}
    jobs.each do |job|
      job_info = IndiferentHash.setup(job.info)
      report_keys.each do |key|
        job_keys[job.path] ||= {}
        job_keys[job.path][key] = job_info[key]
      end
    end
    report_keys.each do |key|
      data.add_field Misc.humanize(key) do |p,values|
        job_keys[p][key]
      end
    end
  end

  start = data.column("Start.second").values.flatten.collect{|v| v.to_f}.min
  eend = data.column("End.second").values.flatten.collect{|v| v.to_f}.max
  total = eend - start unless eend.nil? || start.nil?
  Log.info "Total time elapsed: #{total} seconds" if total

  if report_keys && report_keys.any?
    job_keys = {}
    report_keys.each do |key|
      jobs.each do |job|
        job_keys[job.path] ||= {}
        job_keys[job.path][key] = job.info[key]
      end
    end
    report_keys.each do |key|
      data.add_field Misc.humanize(key) do |p,values|
        job_keys[p][key]
      end
    end
  end

  data
end

.unqueue(file, &block) ⇒ Object



28
29
30
31
32
33
34
# File 'lib/scout/workflow/deployment/queue.rb', line 28

def self.unqueue(file, &block)
  Open.lock file do
    job = queue_job(file)
    puts job.run
    Open.rm_rf file
  end
end

.update_workflow_dir(workflow_dir) ⇒ Object



70
71
72
73
74
75
76
77
# File 'lib/scout/workflow.rb', line 70

def self.update_workflow_dir(workflow_dir)
  Misc.in_dir(workflow_dir) do
    Log.info "Updating: " + workflow_dir
    `git pull`
    `git submodule init`
    `git submodule update`
  end
end

Instance Method Details

#_prov_tasks(tree) ⇒ Object



166
167
168
169
170
171
172
173
174
175
176
177
178
# File 'lib/scout/workflow/usage.rb', line 166

def _prov_tasks(tree)
  tasks = [] 
  heap = tree.values
  heap = [tree]
  while heap.any?
    t = heap.pop
    t.each do |k,v|
      tasks << k
      heap << v
    end
  end
  tasks
end

#all_exportsObject Also known as: task_exports



28
29
30
# File 'lib/scout/workflow/export.rb', line 28

def all_exports
  asynchronous_exports + synchronous_exports + exec_exports + stream_exports
end

#all_tasksObject



20
21
22
# File 'lib/scout/workflow/util.rb', line 20

def all_tasks
  tasks.keys
end

#annotate_next_task(type, obj) ⇒ Object



62
63
64
65
66
# File 'lib/scout/workflow/definition.rb', line 62

def annotate_next_task(type, obj)
  @annotate_next_task ||= {}
  @annotate_next_task[type] ||= []
  @annotate_next_task[type] << obj
end

#annotate_next_task_single(type, obj) ⇒ Object



68
69
70
71
# File 'lib/scout/workflow/definition.rb', line 68

def annotate_next_task_single(type, obj)
  @annotate_next_task ||= {}
  @annotate_next_task[type] = obj
end

#asynchronous_exportsObject



5
6
7
# File 'lib/scout/workflow/export.rb', line 5

def asynchronous_exports
  @asynchronous_exports ||= []
end

#clear_exportsObject



21
22
23
24
25
26
# File 'lib/scout/workflow/export.rb', line 21

def clear_exports
  asynchronous_exports.clear
  synchronous_exports.clear
  exec_exports.clear
  stream_exports.clear
end

#dep(*args, &block) ⇒ Object



73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
# File 'lib/scout/workflow/definition.rb', line 73

def dep(*args, &block)
  case args.length
  when 3
    workflow, task, options = args
  when 2
    if Hash === args.last
      task, options = args
    else
      workflow, task = args
    end
  when 1
    task = args.first
    options, task = task, nil if Hash === task
  end
  workflow = self if workflow.nil?
  options = {} if options.nil?
  task = task.to_sym if task
  annotate_next_task :deps, [workflow, task, options, block, args]
end

#dep_tree(task_name, seen = nil, seen_options = nil) ⇒ Object

Raises:



131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
# File 'lib/scout/workflow/usage.rb', line 131

def dep_tree(task_name, seen = nil, seen_options = nil)
  @dep_tree ||= {}
  key = [self, task_name]

  return @dep_tree[key] if @dep_tree.include?(key)
  save = seen.nil?
  seen = Set.new if seen.nil?
  seen_options = {} if seen_options.nil?

  dep_tree = {}
  task = self.tasks[task_name]
  raise TaskNotFound, "Task #{task_name} in #{self.to_s}" if task.nil?

  task.deps.each do |workflow, dep_task, options|
    dep = [workflow, dep_task, options.merge(seen_options)]
    #next if seen.include? dep
    seen << dep
    next if dep_task.nil?

    dep_key = [workflow, dep_task]

    dep_tree[dep_key] = workflow.dep_tree(dep_task, seen, options.merge(seen_options))
  end if task.deps

  @dep_tree[key] = dep_tree if save

  dep_tree
end

#desc(description) ⇒ Object



99
100
101
# File 'lib/scout/workflow/definition.rb', line 99

def desc(description)
  annotate_next_task_single(:description, description)
end

#documentation_markdownObject



38
39
40
41
42
43
44
45
46
47
# File 'lib/scout/workflow/documentation.rb', line 38

def documentation_markdown
  return "" if @libdir.nil?
  file = @libdir['workflow.md'].find
  file = @libdir['README.md'].find unless file.exists?
  if file.exists?
    file.read
  else
    ""
  end
end

#exec_exportsObject



13
14
15
# File 'lib/scout/workflow/export.rb', line 13

def exec_exports
  @exec_exports ||= []
end

#extension(extension) ⇒ Object



107
108
109
# File 'lib/scout/workflow/definition.rb', line 107

def extension(extension)
  annotate_next_task_single(:extension, extension)
end

#find_in_dependencies(name, dependencies) ⇒ Object



15
16
17
18
# File 'lib/scout/workflow/util.rb', line 15

def find_in_dependencies(name, dependencies)
  name = name.to_sym
  dependencies.select{|dep| dep.task_name.to_sym == name }
end

#get_SOPT(task) ⇒ Object



328
329
330
331
# File 'lib/scout/workflow/usage.rb', line 328

def get_SOPT(task)
  sopt_option_string = self.SOPT_str(task)
  SOPT.get sopt_option_string
end

#helper(name, *args, **kwargs, &block) ⇒ Object



28
29
30
31
32
33
34
35
36
37
# File 'lib/scout/workflow/definition.rb', line 28

def helper(name, *args, **kwargs, &block)
  if block_given?
    helpers[name] = block
  else
    raise ScoutException, "helper #{name} unkown in #{self} workflow" unless helpers[name]
    o = Object.new
    o.extend step_module
    o.send(name, *args, **kwargs)
  end
end

#helpersObject



24
25
26
# File 'lib/scout/workflow/definition.rb', line 24

def helpers
  @helpers ||= {}
end

#include_workflow(workflow) ⇒ Object



231
232
233
234
235
236
237
238
239
240
# File 'lib/scout/workflow/definition.rb', line 231

def include_workflow(workflow)
  workflow.documentation
  self.asynchronous_exports += workflow.asynchronous_exports
  self.synchronous_exports += workflow.synchronous_exports
  self.exec_exports += workflow.exec_exports
  self.stream_exports += workflow.stream_exports
  self.tasks.merge! workflow.tasks
  self.tasks.each{|_,t| t.workflow = workflow }
  self.helpers.merge! workflow.helpers
end

#input(name, type = nil, *rest) ⇒ Object



93
94
95
96
97
# File 'lib/scout/workflow/definition.rb', line 93

def input(name, type = nil, *rest)
  name = name.to_sym
  type = type.to_sym if type
  annotate_next_task(:inputs, [name, type] + rest)
end

#job(name, *args) ⇒ Object

Raises:



165
166
167
168
169
170
171
# File 'lib/scout/workflow.rb', line 165

def job(name, *args)
  task = tasks[name]
  raise TaskNotFound, "Task #{name} in #{self.to_s}" if task.nil?
  step = task.job(*args)
  step.extend step_module
  step
end

#load_job(task_name, name) ⇒ Object



38
39
40
# File 'lib/scout/workflow/util.rb', line 38

def load_job(task_name, name)
  Step.new self.directory[task_name][name]
end

#nameObject



20
21
22
# File 'lib/scout/workflow/definition.rb', line 20

def name
  @name || to_s
end

#persist(name, type = :serializer, options = {}, &block) ⇒ Object



2
3
4
5
# File 'lib/scout/workflow/persist.rb', line 2

def persist(name, type = :serializer, options = {}, &block)
  options = IndiferentHash.add_defaults options, dir: Scout.var.workflows[self.name].persist
  Persist.persist(name, type, options, &block)
end

#prov_string(tree) ⇒ Object



180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
# File 'lib/scout/workflow/usage.rb', line 180

def prov_string(tree)
  description = ""

  last = nil
  seen = Set.new

  tasks = _prov_tasks(tree)
  tasks.each do |workflow,task_name|

    next if seen.include?([workflow,task_name])

    child = last && last.include?([workflow, task_name])
    first = last.nil?
    last = _prov_tasks(workflow.dep_tree(task_name))

    if child
      #·description << "->" << task_name.to_s
    elsif first
      description += "" + task_name.to_s
    else
      description += ";" + task_name.to_s
    end
    
    seen << [workflow, task_name]
  end
  description
end

#prov_tree(tree, offset = 0, seen = []) ⇒ Object



208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
# File 'lib/scout/workflow/usage.rb', line 208

def prov_tree(tree, offset = 0, seen = [])
  return "" if tree.empty?

  lines = []

  offset_str = " " * offset

  lines << offset_str 

  tree.each do |p,dtree| 
    next if seen.include?(p)
    seen.push(p)
    workflow, task = p
    lines << offset_str + [workflow.to_s, task.to_s] * "#" + "\n" + workflow.prov_tree(dtree, offset + 1, seen)
  end

  lines * "\n"
end

#rec_input_defaults(task_name) ⇒ Object



25
26
27
28
29
30
31
# File 'lib/scout/workflow/task/info.rb', line 25

def rec_input_defaults(task_name)
  tasks[task_name].recursive_inputs.inject({}) do |acc,l|
    name, type, desc, default, options = l
    acc.merge!(name => default)  unless default.nil? || acc.include?(name)
    acc
  end
end

#rec_input_descriptions(task_name) ⇒ Object



17
18
19
20
21
22
23
# File 'lib/scout/workflow/task/info.rb', line 17

def rec_input_descriptions(task_name)
  tasks[task_name].recursive_inputs.inject({}) do |acc,l|
    name, type, desc, default, options = l
    acc.merge!(name => desc)  unless desc.nil? || acc.include?(name)
    acc
  end
end

#rec_input_options(task_name) ⇒ Object



33
34
35
36
37
38
39
# File 'lib/scout/workflow/task/info.rb', line 33

def rec_input_options(task_name)
  tasks[task_name].recursive_inputs.inject({}) do |acc,l|
    name, type, desc, default, options = l
    acc.merge!(name => options) unless options.nil? unless acc.include?(name)
    acc
  end
end

#rec_input_types(task_name) ⇒ Object



8
9
10
11
12
13
14
# File 'lib/scout/workflow/task/info.rb', line 8

def rec_input_types(task_name)
  tasks[task_name].recursive_inputs.inject({}) do |acc,l|
    name, type, desc, default, options = l
    acc.merge!(name => type) unless acc.include?(name)
    acc
  end
end

#rec_input_use(task_name) ⇒ Object



42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
# File 'lib/scout/workflow/task/info.rb', line 42

def rec_input_use(task_name)
  input_use = {}
  task = self.tasks[task_name]
  task.inputs.each do |name,_| 
    input_use[name] ||= {} 
    input_use[name][self] ||= []
    input_use[name][self] << task_name
  end

  task.deps.inject(input_use) do |acc,p|
    workflow, task_name = p
    next if task_name.nil?
    workflow.rec_input_use(task_name).each do |name,uses|
      acc[name] ||= {}
      uses.each do |workflow, task_names|
        acc[name][workflow] ||= []
        acc[name][workflow].concat(task_names)
      end
    end
    acc
  end if task.deps

  input_use
end

#rec_inputs(task_name) ⇒ Object



4
5
6
# File 'lib/scout/workflow/task/info.rb', line 4

def rec_inputs(task_name)
  tasks[task_name].recursive_inputs.collect{|name, _| name }
end

#recursive_deps(task_name) ⇒ Object



160
161
162
163
164
# File 'lib/scout/workflow/usage.rb', line 160

def recursive_deps(task_name)
  dependencies = []
  dep_tree(task_name, dependencies)
  dependencies
end

#returns(type) ⇒ Object



103
104
105
# File 'lib/scout/workflow/definition.rb', line 103

def returns(type)
  annotate_next_task_single(:returns, type)
end

#SOPT_str(task) ⇒ Object



316
317
318
319
320
321
322
323
324
325
326
# File 'lib/scout/workflow/usage.rb', line 316

def SOPT_str(task)
  sopt_options = []
  self.tasks[task].recursive_inputs.each do |name,type,desc,default,options|
    shortcut = options[:shortcut] || name.to_s.slice(0,1)
    boolean = type == :boolean

    sopt_options << "-#{short}--#{name}#{boolean ? "" : "*"}"
  end

  sopt_options * ":"
end

#step_moduleObject



39
40
41
42
43
44
45
46
47
48
49
50
# File 'lib/scout/workflow/definition.rb', line 39

def step_module
  @_m ||= begin
            m = Module.new

            helpers.each do |name,block|
              m.send(:define_method, name, &block)
            end

            m
          end
  @_m
end

#stream_exportsObject



17
18
19
# File 'lib/scout/workflow/export.rb', line 17

def stream_exports
  @exec_exports ||= []
end

#synchronous_exportsObject



9
10
11
# File 'lib/scout/workflow/export.rb', line 9

def synchronous_exports
  @synchronous_exports ||= []
end

#task(name_and_type, &block) ⇒ Object



111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
# File 'lib/scout/workflow/definition.rb', line 111

def task(name_and_type, &block)
  case name_and_type
  when Hash
    name, type = name_and_type.collect.first
  when Symbol
    name, type = [name_and_type, :binary]
  when String
    name, type = [name_and_type, :binary]
  end
  type = type.to_sym if String === type
  name = name.to_sym if String === name
  @tasks ||= IndiferentHash.setup({})
  block = lambda &self.method(name) if block.nil?
  begin
    @annotate_next_task ||= {}
    @annotate_next_task[:extension] ||=  
      case type
      when :tsv
        "tsv"
      when :yaml
        "yaml"
      when :marshal
        "marshal"
      when :json
        "json"
      else
        nil
      end

    task = Task.setup(block, @annotate_next_task.merge(name: name, type: type, directory: directory[name], workflow: self))
    @tasks[name] = task
  ensure
    @annotate_next_task = {}
  end
end

#task_alias(name, workflow, oname, *rest, &block) ⇒ Object Also known as: dep_task



158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
# File 'lib/scout/workflow/definition.rb', line 158

def task_alias(name, workflow, oname, *rest, &block)
  dep(workflow, oname, *rest, &block) 
  extension :dep_task unless @extension
  task_proc = workflow.tasks[oname] if workflow.tasks
  if task_proc
    returns task_proc.returns if @returns.nil?
    type = task_proc.type 
  end
  task name => type do
    raise ScoutException, "dep_task does not have any dependencies" if dependencies.empty?
    Step.wait_for_jobs dependencies.select{|d| d.streaming? }
    dep = dependencies.last
    dep.join
    raise dep.exception if dep.error?
    raise Aborted, "Aborted dependency #{dep.path}" if dep.aborted?
    merge_info type: dep.info[:type], task_alias: true

    forget = config :forget_task_alias, "forget_task_alias"
    forget = config :forget_dep_tasks, "forget_dep_tasks", :default => FORGET_TASK_ALIAS if forget.nil?

    if forget
      remove = config :remove_task_alias, "remove_task_alias"
      remove = config :remove_dep_tasks, "remove_dep_tasks", :default => REMOVE_TASK_ALIAS if remove.nil?

      Log.medium "Forget task_alias (remove: #{remove}): #{short_path}"

      self.archive_deps
      self.copy_linked_files_dir
      self.dependencies = self.dependencies - [dep]
      Open.rm_rf self.files_dir if Open.exist? self.files_dir
      Open.link_dir dep.files_dir, self.files_dir if Open.exist?(dep.files_dir)

      if dep.overriden? 
        Open.link dep.path, self.tmp_path
      else
        Open.ln_h dep.path, self.tmp_path

        case remove.to_s
        when 'true'
          dep.clean
        when 'recursive'
          (dep.dependencies.to_a + dep.rec_dependencies.to_a).uniq.each do |d|
            next if d.overriden
            d.clean unless Scout::Config.get(:remove_dep, "task:#{d.task_signature}", "task:#{d.task_name}", "workflow:#{d.workflow.name}", :default => true).to_s == 'false'
          end
          dep.clean unless Scout::Config.get(:remove_dep, "task:#{dep.task_signature}", "task:#{dep.task_name}", "workflow:#{dep.workflow.name}", :default => true).to_s == 'false'
        end 
      end
    else
      if Open.exists?(dep.files_dir)
        Open.rm_rf self.files_dir 
        Open.link dep.files_dir, self.files_dir
      end
      if defined?(RemoteStep) && RemoteStep === dep
        Open.write(self.tmp_path, Open.read(dep.path))
      else
        Open.link dep.path, self.path
      end
    end
    nil
  end
end

#task_info(name) ⇒ Object



66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
# File 'lib/scout/workflow/task/info.rb', line 66

def task_info(name)
  name = name.to_sym
  task = tasks[name]
  raise "No '#{name}' task in '#{self.name}' Workflow" if task.nil?
  id = File.join(self.name, name.to_s)
  @task_info ||= {}
  @task_info[id] ||= begin 
                       description = task.description
                       returns = task.returns

                       inputs = rec_inputs(name).uniq
                       input_types = rec_input_types(name)
                       input_descriptions = rec_input_descriptions(name)
                       input_use = rec_input_use(name)
                       input_defaults = rec_input_defaults(name)
                       input_options = rec_input_options(name)
                       extension = task.extension

                       dependencies = tasks[name].deps
                       { :id => id,
                         :description => description,
                         :inputs => inputs,
                         :input_types => input_types,
                         :input_descriptions => input_descriptions,
                         :input_defaults => input_defaults,
                         :input_options => input_options,
                         :input_use => input_use,
                         :returns => returns,
                         :dependencies => dependencies,
                         :extension => extension
                       }
                     end
end

#task_jobs(task_name) ⇒ Object



34
35
36
# File 'lib/scout/workflow/util.rb', line 34

def task_jobs(task_name)
  task_jobs_files(task_name).collect{|f| Step.load f }
end

#task_jobs_files(task_name) ⇒ Object



28
29
30
31
32
# File 'lib/scout/workflow/util.rb', line 28

def task_jobs_files(task_name)
  self.directory[task_name].glob("**").
    collect{|f| %w(info files).include?(f.get_extension) ? f.unset_extension : f }.
    uniq
end

#to_sObject



16
17
18
# File 'lib/scout/workflow/definition.rb', line 16

def to_s
  @name || super
end

#unexport(*names) ⇒ Object



34
35
36
37
38
39
40
41
# File 'lib/scout/workflow/export.rb', line 34

def unexport(*names)
  names = names.collect{|n| n.to_s} + names.collect{|n| n.to_sym}
  names.uniq!
  exec_exports.replace exec_exports - names if exec_exports
  synchronous_exports.replace synchronous_exports - names if synchronous_exports
  asynchronous_exports.replace asynchronous_exports - names if asynchronous_exports
  stream_exports.replace stream_exports - names if stream_exports
end

#usage(task = nil, abridge = false) ⇒ Object



227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
# File 'lib/scout/workflow/usage.rb', line 227

def usage(task = nil, abridge = false)

  str = StringIO.new

  if self.documentation[:title] and not self.documentation[:title].empty?
    title = self.name + " - " + self.documentation[:title]
    str.puts Log.color :magenta, title
    str.puts Log.color :magenta, "=" * title.length
  else
    str.puts Log.color :magenta, self.name 
    str.puts Log.color :magenta, "=" * self.name.length
  end

  str.puts

  if tasks.nil?
    str.puts Log.color(:title, "No tasks")
  elsif task.nil?

    if self.documentation[:description] and not self.documentation[:description].empty?
      str.puts Misc.format_paragraph self.documentation[:description] 
      str.puts
    end

    str.puts Log.color :magenta, "## TASKS"
    if self.documentation[:task_description] and not self.documentation[:task_description].empty?
      str.puts
      str.puts Misc.format_paragraph self.documentation[:task_description] 
    end
    str.puts

    final = Set.new
    not_final = Set.new
    tasks.each do |name,task|
      tree = dep_tree(name)
      not_final += tree.keys
      final << name unless not_final.include?(name)
    end

    not_final.each do |p|
      final -= [p.last]
    end

    tasks.each do |name,task|
      description = task.description || ""
      description = description.split("\n\n").first

      next if abridge && ! final.include?(name)
      str.puts Misc.format_definition_list_item(name.to_s, description, nil, nil, color: :yellow)

      prov_string = prov_string(dep_tree(name))
      str.puts Misc.format_paragraph Log.color(:blue, "->" + prov_string) if prov_string && ! prov_string.empty?
    end 

  else

    if Task === task
      task_name = task.name
    else
      task_name = task
      task = self.tasks[task_name]
    end

    str.puts task.usage(self, self.recursive_deps(task_name))

    dep_tree = {[self, task_name] => dep_tree(task_name)}
    prov_tree = prov_tree(dep_tree)
    if prov_tree && ! prov_tree.empty? && prov_tree.split("\n").length > 2

      str.puts
      str.puts Log.color :magenta, "## DEPENDENCY GRAPH (abridged)"
      str.puts
      prov_tree.split("\n").each do |line|
        next if line.strip.empty?
        if m = line.match(/^( *)(\w+?)#(\w*)/i)
            offset, workflow, task_name =  m.values_at 1, 2, 3
            str.puts [offset, Log.color(:magenta, workflow), "#", Log.color(:yellow, task_name)] * ""
        else
          str.puts Log.color :blue, line 
        end
      end
      str.puts
    end
  end

  str.rewind
  str.read
end