Module: Task

Extended by:
Annotation
Defined in:
lib/scout/workflow/task.rb,
lib/scout/workflow/usage.rb,
lib/scout/workflow/task/inputs.rb,
lib/scout/workflow/task/dependencies.rb

Constant Summary collapse

DEFAULT_NAME =
"Default"

Class Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Annotation

list_tsv_values, load_info, load_tsv, load_tsv_values, obj_tsv_values, resolve_tsv_array, tsv

Class Attribute Details

.default_directoryObject

Returns the value of attribute default_directory.



15
16
17
# File 'lib/scout/workflow/task.rb', line 15

def default_directory
  @default_directory
end

Class Method Details

.format_input(value, type, options = {}) ⇒ Object



3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# File 'lib/scout/workflow/task/inputs.rb', line 3

def self.format_input(value, type, options = {})
  return value if IO === value || StringIO === value || Step === value

  if String === value && ! [:path, :file, :folder, :binary, :tsv].include?(type) && ! (options &&  (options[:noload] || options[:stream] || options[:nofile] || options[:asfile]))
    if Open.exists?(value) && ! Open.directory?(value)
      Persist.load(value, type) 
    else
      Persist.deserialize(value, type)
    end
  else
    if m = type.to_s.match(/(.*)_array/)
      if Array === value
        value.collect{|v| self.format_input(v, m[1].to_sym, options) }
      end
    else
      value
    end
  end
end

.load_input_from_file(filename, type, options = nil) ⇒ Object



149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
# File 'lib/scout/workflow/task/inputs.rb', line 149

def self.load_input_from_file(filename, type, options = nil)
  if Open.exists?(filename) || filename = Dir.glob(File.join(filename + ".*")).first
    if filename.end_with?('.as_file')
      value = Open.read(filename).strip
      value.sub!(/^\./, File.dirname(filename)) if value.start_with?("./")
      value
    elsif filename.end_with?('.as_step')
      value = Open.read(filename).strip
      Step.load value
    elsif filename.end_with?('.as_path')
      value = Open.read(filename).strip
      Path.setup value
    elsif type.to_s == 'file' || (options &&  (options[:noload] || options[:stream] || options[:nofile] || options[:asfile]))
      filename
    else
      Persist.load(filename, type)
    end
  else
    return nil
  end
end

.save_file_input(orig_file, directory) ⇒ Object



89
90
91
92
93
94
95
96
97
98
99
100
101
102
# File 'lib/scout/workflow/task/inputs.rb', line 89

def self.save_file_input(orig_file, directory)
  orig_file = orig_file.path if Step === orig_file
  basename = File.basename(orig_file)
  digest = Misc.digest(orig_file)
  if basename.include? '.'
    basename = basename.sub(/(\.[^.]+(?:\.[^.]+)*)?$/, "-#{digest}\\1")
  else
    basename += "-#{digest}"
  end
  new_file = File.join(directory, 'saved_input_files', basename)
  relative_file = File.join('.', 'saved_input_files', basename) 
  Open.link orig_file, new_file
  relative_file
end

.save_input(directory, name, type, value) ⇒ Object



104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
# File 'lib/scout/workflow/task/inputs.rb', line 104

def self.save_input(directory, name, type, value)
  input_file = File.join(directory, name.to_s)

  if Path.is_filename?(value) 
    if type == :path
      Open.write(input_file + ".as_path", value)
    elsif Path.step_file?(value)
      Open.write(input_file + ".as_path", value)
    else
      relative_file = save_file_input(value, directory)
      Open.write(input_file + ".as_file", relative_file)
    end
  elsif Step === value
    Open.write(input_file + ".as_step", value.short_path)
  elsif type == :file
    relative_file = save_file_input(value, directory)
    Persist.save(relative_file, input_file, :file)
  elsif type == :file_array
    new_files = value.collect do |orig_file|
      save_file_input(orig_file, directory)
    end
    Persist.save(new_files, input_file, type)
  elsif Open.is_stream?(value)
    Open.sensible_write(input_file, value)
  elsif Open.has_stream?(value)
    Open.sensible_write(input_file, value.stream)
  else
    Persist.save(value, input_file, type)
  end
end

Instance Method Details

#alias?Boolean

Returns:

  • (Boolean)


127
128
129
# File 'lib/scout/workflow/task.rb', line 127

def alias?
  @extension == :dep_task
end

#assign_inputs(provided_inputs = {}, id = nil) ⇒ Object



23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
# File 'lib/scout/workflow/task/inputs.rb', line 23

def assign_inputs(provided_inputs = {}, id = nil)
  if self.inputs.nil? || (self.inputs.empty? && Array === provided_inputs)
    case provided_inputs
    when Array
      return [provided_inputs, provided_inputs]
    else
      return [[], []]
    end
  end

  IndiferentHash.setup(provided_inputs) if Hash === provided_inputs

  input_array = []
  input_names = []
  non_default_inputs = []
  jobname_input = nil
  self.inputs.each_with_index do |p,i|
    name, type, desc, value, options = p
    input_names << name
    provided = Hash === provided_inputs ? provided_inputs[name] : provided_inputs[i]
    provided = Task.format_input(provided, type, options || {})

    if provided == value
      same_as_default = true
    elsif String === provided && Symbol === value && provided == value.to_s
      same_as_default = true
    elsif String === value && Symbol === provided && provided.to_s == value
      same_as_default = true
    else
      same_as_default = false
    end

    if options && options[:jobname] && id == provided
      same_as_jobname = true
    end

    jobname_input = name if same_as_jobname

    final = if ! provided.nil? && ! same_as_default && ! same_as_jobname
              non_default_inputs << name.to_sym
              provided
            elsif options && options[:jobname] && id
              non_default_inputs << name.to_sym
              provided || id
            else
              value
            end

    final = Path.setup(final.dup) if String === final && ! (Path === final) && (type == :file || type == :path || (options && options[:asfile]))

    final = final.find if (Path === final) && (type == :file)

    input_array << final
  end

  NamedArray.setup(input_array, input_names)

  [input_array, non_default_inputs, jobname_input]
end

#dependencies(id, provided_inputs, non_default_inputs = [], compute = {}) ⇒ Object



2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
# File 'lib/scout/workflow/task/dependencies.rb', line 2

def dependencies(id, provided_inputs, non_default_inputs = [], compute = {})
  return [] if deps.nil?
  dependencies = []
  
  provided_inputs ||= {}

  # Helper function
  load_dep = proc do |id, workflow, task, step_options, definition_options, dependencies|
    task = step_options.delete(:task) if step_options.include?(:task)
    workflow = step_options.delete(:workflow) if step_options.include?(:workflow)

    step_id = id
    step_id = step_options.delete(:jobname) if step_options.include?(:jobname)
    step_id = nil if step_id == Task::DEFAULT_NAME

    step_inputs = step_options.include?(:inputs)? step_options.delete(:inputs) : step_options
    step_inputs = IndiferentHash.add_defaults step_inputs.dup, definition_options

    resolved_inputs = {}
    step_inputs.each do |k,v|
      if Symbol === v
        input_dep = dependencies.select{|d| d.task_name == v }.first
        resolved_inputs[k] = if input_dep
                               input_dep
                             elsif provided_inputs.include?(v) && self.inputs.collect(&:first).include?(v)
                               provided_inputs[v]
                             elsif step_inputs.include?(v) && self.inputs.collect(&:first).include?(v)
                               step_inputs[v]
                             else
                               v
                             end
      else
        resolved_inputs[k] = v
      end
    end

    job = workflow.job(task, step_id, resolved_inputs)
    compute_options = definition_options[:compute] || []
    compute_options = [compute_options] unless Array === compute_options
    compute_options << :canfail if definition_options[:canfail]
    compute_options << :produce if definition_options[:produce]
    compute_options << :stream if definition_options[:stream]
    compute[job.path] = compute_options if compute_options.any?

    job.overriden = false if definition_options[:not_overriden]
    job.compute = job.compute.nil? ? compute : job.compute.merge(compute)

    [job, step_inputs]
  end

  # Helper function
  filter_dep_non_default_inputs = proc do |dep,definition_options|
    dep_non_default_inputs = dep.non_default_inputs
    dep_non_default_inputs.reject! do |name|
      definition_options.include?(name)
    end

    dep_non_default_inputs
  end

  deps.each do |workflow,task,definition_options,block=nil|
    definition_options = definition_options.nil? ? {} : definition_options.dup

    dep_id = definition_options.include?(:jobname) ? definition_options.delete(:jobname) : id

    if provided_inputs.include?(overriden = [workflow.name, task] * "#")
      dep = provided_inputs[overriden]
      dep = Step.new dep unless Step === dep
      dep = dep.dup
      dep.type = workflow.tasks[task].type
      dep.overriden_task = task
      dep.overriden_workflow = workflow
      dependencies << dep
      non_default_inputs << overriden
      next
    end


    if block
      fixed_provided_inputs = self.assign_inputs(provided_inputs, dep_id).first.to_hash
      self.inputs.each do |name,type,desc,value,options|
        fixed_provided_inputs[name] = value unless fixed_provided_inputs.include?(name)
      end
      fixed_provided_inputs = IndiferentHash.add_defaults fixed_provided_inputs, provided_inputs
      block_options = IndiferentHash.add_defaults definition_options.dup, fixed_provided_inputs

      res = block.call dep_id, block_options, dependencies

      case res
      when Step
        dep = res
        dependencies << dep
        dep_non_default_inputs = filter_dep_non_default_inputs.call(dep, definition_options)
        non_default_inputs.concat(dep_non_default_inputs)
      when Hash
        step_options = block_options.merge(res)
        dep, step_inputs = load_dep.call(dep_id, workflow, task, step_options.dup, block_options, dependencies)
        dependencies << dep
        dep_non_default_inputs = filter_dep_non_default_inputs.call(dep, definition_options)
        non_default_inputs.concat(dep_non_default_inputs)
      when Array
        res.each do |_res|
          if Hash === _res
            step_options = block_options.merge(_res)
            dep, step_inputs = load_dep.call(dep_id, workflow, task, step_options.dup, block_options, dependencies)
            dependencies << dep
            dep_non_default_inputs = filter_dep_non_default_inputs.call(dep, definition_options)
            non_default_inputs.concat(dep_non_default_inputs)
          else
            dep = _res
            dependencies << dep
            dep_non_default_inputs = filter_dep_non_default_inputs.call(dep, definition_options)
            non_default_inputs.concat(dep_non_default_inputs)
          end
        end
      end
    else
      step_options = IndiferentHash.add_defaults definition_options.dup, provided_inputs
      dep, step_inputs = load_dep.call(dep_id, workflow, task, step_options, definition_options, dependencies)
      dependencies << dep
      dep_non_default_inputs = filter_dep_non_default_inputs.call(dep, definition_options)
      non_default_inputs.concat(dep_non_default_inputs)
    end
  end

  dependencies
end

#directoryObject



26
27
28
# File 'lib/scout/workflow/task.rb', line 26

def directory
  @directory ||= Task.default_directory
end

#exec_on(binding = self, *inputs) ⇒ Object



30
31
32
# File 'lib/scout/workflow/task.rb', line 30

def exec_on(binding = self, *inputs)
  binding.instance_exec(*inputs, &self)
end

#get_SOPTObject



115
116
117
118
119
120
121
122
123
124
125
126
# File 'lib/scout/workflow/usage.rb', line 115

def get_SOPT
  sopt_option_string = self.SOPT_str
  job_options = SOPT.get sopt_option_string

  recursive_inputs.uniq.each do |name,type|
    next unless type.to_s.include?('array')
    if job_options.include?(name) && (! Open.exist?(job_options[name]) || type.to_s.include?('file') || type.to_s.include?('path'))
      job_options[name] = job_options[name].split(",")
    end
  end
  job_options
end

#inputsObject



22
23
24
# File 'lib/scout/workflow/task.rb', line 22

def inputs
  @inputs ||= []
end

#job(id = nil, provided_inputs = nil) ⇒ Object



34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
# File 'lib/scout/workflow/task.rb', line 34

def job(id = nil, provided_inputs = nil)

  if Hash === provided_inputs
    IndiferentHash.setup provided_inputs
    memory_inputs = provided_inputs.values_at *self.recursive_inputs.collect{|t| t.first }.uniq
    memory_inputs += provided_inputs.select{|k,v| k.to_s.include?("#") }.collect{|p| p * "=" }
    memory_inputs << provided_inputs[:load_inputs]
  else
    memory_inputs = provided_inputs
  end

  memory_inputs = nil if Array === memory_inputs && memory_inputs.compact.empty?
  memory_inputs = nil if Hash === memory_inputs && memory_inputs.empty?
  Persist.memory("Task job #{self.name}", repo: Workflow.job_cache, other: {task: self.name, id: id, provided_inputs: memory_inputs}) do
    provided_inputs, id = id, nil if (provided_inputs.nil? || provided_inputs.empty?) && (Hash === id || Array === id)
    provided_inputs = {} if provided_inputs.nil?
    IndiferentHash.setup(provided_inputs)

    jobname_input = nil
    inputs.each do |name,type,desc,default,input_options|
      next unless input_options && input_options[:jobname]
      jobname_input = name
    end

    id = provided_inputs[jobname_input] if jobname_input && id.nil?
    #id = provided_inputs[:id] if provided_inputs.include?(:id)

    missing_inputs = []
    self.inputs.each do |input,type,desc,val,options|
      next unless options && options[:required]
      missing_inputs << input unless provided_inputs.include?(input)
    end if self.inputs

    if missing_inputs.length == 1
      raise ParameterException, "Input '#{missing_inputs.first}' is required but was not provided or is nil"
    end

    if missing_inputs.length > 1
      raise ParameterException, "Inputs #{Misc.humanize_list(missing_inputs)} are required but were not provided or are nil"
    end

    provided_inputs = load_inputs(provided_inputs.delete(:load_inputs)).merge(provided_inputs) if Hash === provided_inputs && provided_inputs[:load_inputs]

    job_inputs, non_default_inputs, input_digest_str = process_inputs provided_inputs, id

    compute = {}
    dependencies = dependencies(id, provided_inputs, non_default_inputs, compute)

    non_default_inputs.uniq!

    non_default_inputs.delete_if{|k| k.to_s.include? "#" } unless dependencies.select{|d| d.overriden? }.any?

    id = DEFAULT_NAME if id.nil?

    sanitized_id = Path.sanitize_filename(id, 150)
    if non_default_inputs.any? && !(non_default_inputs == [jobname_input] && provided_inputs[jobname_input] == id)
      hash = Misc.digest(:inputs => input_digest_str, :dependencies => dependencies)
      name = [sanitized_id, hash] * "_"
    else
      name = sanitized_id
    end

    extension = self.extension
    if extension == :dep_task
      extension = nil
      if dependencies.any?
        dep_basename = File.basename(dependencies.last.path)
        if dep_basename.include? "."
          parts = dep_basename.split(".")
          extension = [parts.pop]
          while parts.last.length <= 4
            extension << parts.pop
          end
          extension = extension.reverse * "."
        end
      end
    end

    path = directory[name]

    path = path.set_extension(extension) if extension

    if hash
      Log.debug "ID #{self.name} #{id} - #{hash}: #{Log.fingerprint(:input_digest => input_digest_str, :non_default_inputs => non_default_inputs, :dependencies => dependencies)}"
    else
      Log.debug "ID #{self.name} #{id} - Clean"
    end
    NamedArray.setup(job_inputs, @inputs.collect{|i| i[0] }) if @inputs
    step_provided_inputs = Hash === provided_inputs ? provided_inputs.slice(*non_default_inputs) : provided_inputs
    Step.new path.find, job_inputs, dependencies, id, non_default_inputs, step_provided_inputs, compute, &self
  end
end

#load_inputs(directory) ⇒ Object



171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
# File 'lib/scout/workflow/task/inputs.rb', line 171

def load_inputs(directory)
  if Open.exists?(directory) && ! Open.directory?(directory) && ! Open.size(directory) == 0
    TmpFile.with_file do |tmp_directory|
      Misc.in_dir tmp_directory do
        CMD.cmd("tar xvfz '#{directory}'")
      end
      return load_inputs(tmp_directory)
    end
  end

  inputs = IndiferentHash.setup({})
  seen = []
  self.recursive_inputs.each do |p|
    name, type, desc, value, options = p
    next if seen.include?(name)
    filename = File.join(directory, name.to_s) 
    value = Task.load_input_from_file(filename, type, options)
    inputs[name] = value unless value.nil?
    seen << name
  end

  directory = Path.setup(directory.dup) unless Path === directory
  directory.glob("*#*").each do |file|
    override_dep, _, extension = File.basename(file).partition(".")

    inputs[override_dep] = Task.load_input_from_file(file, :file)
  end

  inputs
end

#process_inputs(provided_inputs = {}, id = nil) ⇒ Object



83
84
85
86
87
# File 'lib/scout/workflow/task/inputs.rb', line 83

def process_inputs(provided_inputs = {}, id = nil)
  input_array, non_default_inputs, jobname_input = assign_inputs provided_inputs, id
  digest_str = Misc.digest_str(input_array)
  [input_array, non_default_inputs, digest_str, jobname_input]
end

#recursive_inputs(overriden = []) ⇒ Object



202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
# File 'lib/scout/workflow/task/inputs.rb', line 202

def recursive_inputs(overriden = [])
  return inputs.dup if deps.nil?
  deps.inject(inputs.dup) do |acc,dep|
    workflow, task, options = dep
    next acc if workflow.nil? || task.nil?
    next acc if overriden.include?([workflow.name, task.to_s] * "#")
    overriden.concat options.keys.select{|k| k.to_s.include?("#") } if options

    workflow.tasks[task].recursive_inputs(overriden).dup.each do |info|
      name, _ = info
      next if options.include?(name.to_sym) || options.include?(name.to_s)
      acc << info
    end

    acc
  end
end

#save_inputs(directory, provided_inputs = {}) ⇒ Object



135
136
137
138
139
140
141
142
143
144
145
146
147
# File 'lib/scout/workflow/task/inputs.rb', line 135

def save_inputs(directory, provided_inputs = {})
  saved = []
  self.recursive_inputs.each_with_index do |p,i|
    name, type, desc, value, options = p
    next unless provided_inputs.include?(name)

    saved << name

    value = provided_inputs[name]
    Task.save_input(directory, name, type, value)
  end
  saved
end

#SOPT_strObject



103
104
105
106
107
108
109
110
111
112
113
# File 'lib/scout/workflow/usage.rb', line 103

def SOPT_str
  sopt_options = []
  self.recursive_inputs.each do |name,type,desc,default,options|
    shortcut = (options && options[:shortcut]) || name.to_s.slice(0,1)
    boolean = type == :boolean

    sopt_options << "-#{shortcut}--#{name}#{boolean ? "" : "*"}"
  end

  sopt_options * ":"
end

#usage(workflow = nil, deps = nil) ⇒ Object



4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
# File 'lib/scout/workflow/usage.rb', line 4

def usage(workflow = nil, deps = nil)
  str = StringIO.new

  if description
    title, paragraph = description.split("\n\n")
    if title.length < Misc::MAX_TTY_LINE_WIDTH
      title = self.name.to_s + " - " + title
      str.puts Log.color :yellow, title
      str.puts Log.color :yellow, "-" * title.length
      if paragraph
        str.puts "\n" << Misc.format_paragraph(paragraph) 
      end
    else
      title = self.name.to_s
      str.puts Log.color :yellow, title
      str.puts Log.color :yellow, "-" * title.length
      str.puts "\n" << Misc.format_paragraph(description) 
    end
  else
    title = self.name.to_s
    str.puts Log.color :yellow, title
    str.puts Log.color :yellow, "-" * title.length
  end


  selects = []

  if inputs && inputs.any?
    str.puts
    str.puts Log.color(:magenta, "Inputs")
    str.puts
    str.puts SOPT.input_array_doc(inputs)

    inputs.select{|name,type, _| type == :select }.each do |name,_,_,_,options|
      next unless options
      selects << [name, options[:select_options]] if options[:select_options]
    end
  end

  deps = workflow ? workflow.recursive_deps(self.name) : self.deps if deps.nil?
  if deps and deps.any?
    seen = inputs.collect{|name,_| name }
    dep_inputs = {}
    deps.each do |dep_workflow,task_name,options|
      next if task_name.nil?
      task = dep_workflow.tasks[task_name]

      next if task.inputs.nil?

      inputs = task.inputs.reject{|name, _| seen.include? name }
      inputs = task.inputs.reject{|name, _| options.include? name }
      next unless inputs.any?
      input_names = inputs.collect{|name,_| name }
      task.inputs.select{|name,_| input_names.include? name }.each do |name,_,_,_,options|
        selects << [name, options[:select_options]] if options && options[:select_options]
      end

      dep = workflow.nil? || dep_workflow.name != workflow.name ? ["#{dep_workflow.name}", task_name.to_s] *"#" : task_name.to_s
      dep_inputs[dep] = inputs
    end

    str.puts
    str.puts Log.color(:magenta, "Inputs from dependencies:") if dep_inputs.any?
    dep_inputs.each do |dep,inputs|
      str.puts
      str.puts Log.color :yellow, dep + ":"
      str.puts
      str.puts SOPT.input_array_doc(inputs)
    end
  end

  case
  when inputs && inputs.select{|name,type| type == :array }.any?
    str.puts
    str.puts Log.color(:green, Misc.format_paragraph("Lists are specified as arguments using ',' or '|'. When specified as files the '\\n'
    also works in addition to the others. You may use the '--array_separator' option
    the change this default. Whenever a file is specified it may also accept STDIN using
    the '-' character."))

  when inputs && inputs.select{|name,type| type == :file || type == :tsv }.any?
    str.puts
    str.puts Log.color(:green, Misc.format_paragraph("Whenever a file is specified it may also accept STDIN using the '-' character."))
  end

  str.puts
  str.puts Log.color(:magenta, "Returns: ") << Log.color(:blue, type.to_s) << "\n"

  if selects.any?
    str.puts
    str.puts Log.color(:magenta, "Input select options")
    selects.collect{|p| p}.uniq.each do |input,options|
      str.puts 
      str.puts Log.color(:blue, input.to_s + ": ") << Misc.format_paragraph(options.collect{|o| Array === o ? o.first.to_s : o.to_s} * ", ") << "\n"
    end
  end
  str.rewind
  str.read
end