Class: Step

Inherits:
Object
  • Object
show all
Defined in:
lib/scout/workflow/step.rb,
lib/scout/workflow/step/file.rb,
lib/scout/workflow/step/info.rb,
lib/scout/workflow/step/load.rb,
lib/scout/workflow/step/config.rb,
lib/scout/workflow/step/inputs.rb,
lib/scout/workflow/step/status.rb,
lib/scout/workflow/step/archive.rb,
lib/scout/workflow/step/children.rb,
lib/scout/workflow/step/progress.rb,
lib/scout/workflow/step/provenance.rb,
lib/scout/workflow/step/dependencies.rb

Constant Summary collapse

SERIALIZER =
Scout::Config.get(:serializer, :step_info, :info, :step, env: "SCOUT_SERIALIZER", default: :json)

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(path = nil, inputs = nil, dependencies = nil, id = nil, non_default_inputs = nil, provided_inputs = nil, compute = nil, exec_context: nil, &task) ⇒ Step

Returns a new instance of Step.



19
20
21
22
23
24
25
26
27
28
29
30
31
# File 'lib/scout/workflow/step.rb', line 19

def initialize(path = nil, inputs = nil, dependencies = nil, id = nil, non_default_inputs = nil, provided_inputs = nil, compute = nil, exec_context: nil, &task)
  @path = path
  @inputs = inputs
  @dependencies = dependencies
  @id = id
  @non_default_inputs = non_default_inputs
  @provided_inputs = provided_inputs
  @compute = compute 
  @task = task
  @mutex = Mutex.new
  @tee_copies = 1
  @exec_context = exec_context || self
end

Instance Attribute Details

#computeObject

Returns the value of attribute compute.



18
19
20
# File 'lib/scout/workflow/step.rb', line 18

def compute
  @compute
end

#dependenciesObject

Returns the value of attribute dependencies.



18
19
20
# File 'lib/scout/workflow/step.rb', line 18

def dependencies
  @dependencies
end

#exec_contextObject

Returns the value of attribute exec_context.



18
19
20
# File 'lib/scout/workflow/step.rb', line 18

def exec_context
  @exec_context
end

#idObject

Returns the value of attribute id.



18
19
20
# File 'lib/scout/workflow/step.rb', line 18

def id
  @id
end

#inputsObject

Returns the value of attribute inputs.



18
19
20
# File 'lib/scout/workflow/step.rb', line 18

def inputs
  @inputs
end

#non_default_inputsObject

Returns the value of attribute non_default_inputs.



18
19
20
# File 'lib/scout/workflow/step.rb', line 18

def non_default_inputs
  @non_default_inputs
end

#overridenObject

Returns the value of attribute overriden.



18
19
20
# File 'lib/scout/workflow/step.rb', line 18

def overriden
  @overriden
end

#overriden_taskObject

Returns the value of attribute overriden_task.



18
19
20
# File 'lib/scout/workflow/step.rb', line 18

def overriden_task
  @overriden_task
end

#overriden_workflowObject

Returns the value of attribute overriden_workflow.



18
19
20
# File 'lib/scout/workflow/step.rb', line 18

def overriden_workflow
  @overriden_workflow
end

#pathObject

Returns the value of attribute path.



18
19
20
# File 'lib/scout/workflow/step.rb', line 18

def path
  @path
end

#provided_inputsObject

Returns the value of attribute provided_inputs.



18
19
20
# File 'lib/scout/workflow/step.rb', line 18

def provided_inputs
  @provided_inputs
end

#resultObject (readonly)

Returns the value of attribute result.



169
170
171
# File 'lib/scout/workflow/step.rb', line 169

def result
  @result
end

#taskObject

Returns the value of attribute task.



18
19
20
# File 'lib/scout/workflow/step.rb', line 18

def task
  @task
end

#task_nameObject

Returns the value of attribute task_name.



105
106
107
# File 'lib/scout/workflow/step.rb', line 105

def task_name
  @task_name
end

#tee_copiesObject

Returns the value of attribute tee_copies.



18
19
20
# File 'lib/scout/workflow/step.rb', line 18

def tee_copies
  @tee_copies
end

#typeObject

Returns the value of attribute type.



83
84
85
# File 'lib/scout/workflow/step.rb', line 83

def type
  @type
end

#workflowObject

Returns the value of attribute workflow.



18
19
20
# File 'lib/scout/workflow/step.rb', line 18

def workflow
  @workflow
end

Class Method Details

._load(path) ⇒ Object



218
219
220
# File 'lib/scout/workflow/step/info.rb', line 218

def self._load(path)
  Step.new path
end

.clean(file) ⇒ Object



53
54
55
# File 'lib/scout/workflow/step/status.rb', line 53

def self.clean(file)
  Step.new(file).clean
end

.job_path?(path) ⇒ Boolean

Returns:

  • (Boolean)


2
3
4
# File 'lib/scout/workflow/step/provenance.rb', line 2

def self.job_path?(path)
  path.split("/")[-4] == "jobs"
end

.load(path) ⇒ Object



15
16
17
18
19
# File 'lib/scout/workflow/step/load.rb', line 15

def self.load(path)
  path = relocate(path) unless Open.exists?(path)
  #raise "Could not load #{path}" unless Open.exists?(path)
  s = Step.new path
end

.load_info(info_file) ⇒ Object



14
15
16
17
18
19
20
21
22
23
24
25
# File 'lib/scout/workflow/step/info.rb', line 14

def self.load_info(info_file)
  info = begin
           Persist.load(info_file, SERIALIZER) || {}
         rescue
           begin
             Persist.load(info_file, :marshal) || {}
           rescue
             {status: :noinfo}
           end
         end
  IndiferentHash.setup(info)
end

.prov_indent(step, offset, input_dependencies) ⇒ Object



89
90
91
92
93
94
95
96
97
98
99
100
101
102
# File 'lib/scout/workflow/step/provenance.rb', line 89

def self.prov_indent(step, offset, input_dependencies)
  return (" " * (offset))
  if step.alias?
    (" " * offset + "🡵")
  elsif step.overriden_task
    (" " * offset + "🡇")
  elsif input_dependencies.include?(step)
    (" " * offset + "")
  elsif step.input_dependencies.any?
    (" " * offset + "")
  else
    (" " * (offset+1))
  end
end

.prov_report(step, offset = 0, task = nil, seen = [], expand_repeats = false, input = nil, input_dependencies = nil) ⇒ Object



104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
# File 'lib/scout/workflow/step/provenance.rb', line 104

def self.prov_report(step, offset = 0, task = nil, seen = [], expand_repeats = false, input = nil, input_dependencies = nil)
  info = step.info  || {}
  info[:task_name] = task
  path  = step.path
  status = info[:status] || :missing
  status = status.to_sym if String === status
  status = :noinfo if status == :missing && Open.exist?(path)
  status = :remote if Open.remote?(path) || Open.ssh?(path)
  name = info[:name] || File.basename(path)
  status = :unsync if status == :done and not Open.exist?(path)
  status = :notfound if status == :noinfo and not Open.exist?(path)

  this_step_msg = prov_report_msg(status, name, path, info, input)

  input_dependencies ||= {}
  step.dependencies.each do |dep|
    if dep.input_dependencies.any?
      dep.input_dependencies.each do |id|
        input_name, _dep = dep.recursive_inputs.select{|f,d| 
          d == id || (String === d && d.start_with?(id.files_dir)) || (Array === d && d.include?(id))
        }.keys.last
        if input_name
          input_dependencies[id] ||= []
          input_dependencies[id] << [dep, input_name]
        end
      end
    end
  end if step.dependencies

  str = []

  indent = prov_indent(step, offset, input_dependencies)
  str << indent + this_step_msg if ENV["SCOUT_ORIGINAL_STACK"] == 'true'

  step.dependencies.dup.tap{|l| 
    l.reverse! if ENV["SCOUT_ORIGINAL_STACK"] == 'true'
  }.each do |dep|
    path = dep.path
    new = ! seen.include?(path)
    if new
      seen << path
      str.concat(prov_report(dep, offset + 1, task, seen, expand_repeats, input_dependencies[dep], input_dependencies.dup).split("\n"))
    else
      if expand_repeats
        str << Log.color(Step.status_color(dep.status), Log.uncolor(prov_report(dep, offset+1, task, input_dependencies[dep], input_dependencies.dup)))
      else
        info = dep.info  || {}
        status = info[:status] || :missing
        status = "remote" if Open.remote?(path) || Open.ssh?(path)
        name = info[:name] || File.basename(path)
        status = :unsync if status == :done and not Open.exist?(path)
        status = :notfound if status == :noinfo and not Open.exist?(path)

        dep_indent = prov_indent(dep, offset+1, input_dependencies)
        str << dep_indent + Log.color(Step.status_color(status), Log.uncolor(prov_report_msg(status, name, path, info, input_dependencies[dep])))
      end
    end
  end if step.dependencies

  str << indent + this_step_msg unless ENV["SCOUT_ORIGINAL_STACK"] == 'true'

  str * "\n"
end

.prov_report_msg(status, name, path, info, input = nil) ⇒ Object



32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
# File 'lib/scout/workflow/step/provenance.rb', line 32

def self.prov_report_msg(status, name, path, info, input = nil)
  parts = path.sub(/\{.*/,'').split "/"

  parts.pop

  task = Log.color(:yellow, parts.pop)
  workflow = Log.color(:magenta, parts.pop)

  if ! Step.job_path?(path)
    task, status, workflow = Log.color(:yellow, info[:task_name]), Log.color(:green, "file"), Log.color(:magenta, "-")
  end

  path_mtime = begin
                 Open.mtime(path)
               rescue Exception
                 nil
               end

  if input.nil? || input.empty?
    input_str = nil
  else
    input = input.reject{|dep,name| (input & dep.dependencies.collect{|d| [d,name]}).any? }
    input = input.reject{|dep,name| (input & dep.input_dependencies.collect{|d| [d,name]}).any? }
    input_str = Log.color(:magenta, "-> ") + input.collect{|dep,name| Log.color(:yellow, dep.task_name.to_s) + ":" + Log.color(:yellow, name) }.uniq * " "
  end

  str = if ! (Open.remote?(path) || Open.ssh?(path)) && (Open.exists?(path) && $main_mtime && path_mtime && ($main_mtime - path_mtime) < -2)
          prov_status_msg(status.to_s) << " " << [workflow, task, path, input_str].compact * " " << " (#{Log.color(:red, "Mtime out of sync") })"
        else
          prov_status_msg(status.to_s) << " " << [workflow, task, path, input_str].compact * " " 
        end

  if $inputs and $inputs.any? 
    job_inputs = Step.new(path).recursive_inputs.to_hash
    IndiferentHash.setup(job_inputs)

    $inputs.each do |input|
      value = job_inputs[input]
      next if  value.nil?
      value_str = Log.fingerprint(value)
      str << "\t#{Log.color :magenta, input}=#{value_str}"
    end
  end

  if $info_fields and $info_fields.any?
    $info_fields.each do |field|
      IndiferentHash.setup(info)
      value = info[field]
      next if value.nil?
      value_str = Log.fingerprint(value)
      str << "\t#{Log.color :magenta, field}=#{value_str}"
    end
  end

  str
end

.prov_status_msg(status) ⇒ Object



27
28
29
30
# File 'lib/scout/workflow/step/provenance.rb', line 27

def self.prov_status_msg(status)
  color = status_color(status)
  Log.color(color, status.to_s)
end

.relocate(path) ⇒ Object



2
3
4
5
6
7
8
9
10
11
12
13
# File 'lib/scout/workflow/step/load.rb', line 2

def self.relocate(path)
  return path if Open.exists?(path)
  Path.setup(path) unless Path === path
  relocated = path.relocate
  return relocated if Open.exists?(relocated)
  if path.scan("/").length >= 2
    subpath = path.split("/")[-3..-1] * "/"
    relocated = Path.setup("var/jobs")[subpath]
    return relocated if Open.exists?(relocated)
  end
  path
end

.status_color(status) ⇒ Object



6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# File 'lib/scout/workflow/step/provenance.rb', line 6

def self.status_color(status)
  case status.to_sym
  when :error, :aborted, :dead, :unsync
    :red
  when :streaming, :started
    :cyan
  when :done, :noinfo
    :green
  when :dependencies, :waiting, :setup
    :yellow
  when :notfound, :cleaned, :missing
    :blue
  else
    if status.to_s.index ">"
      :cyan
    else
      :cyan
    end
  end
end

.wait_for_jobs(jobs, canfail = false) ⇒ Object



106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
# File 'lib/scout/workflow/step/dependencies.rb', line 106

def self.wait_for_jobs(jobs, canfail=false)
  threads = []
  jobs.each do |job|
    threads << Thread.new do
      Thread.current.report_on_exception = false
      begin
        job.join
      rescue Exception
        case canfail
        when TrueClass
          next
        else
          if canfail === $!
            next
          else
            raise $!
          end
        end
      end
    end
  end
  threads.each do |t|
    t.join
  end
end

Instance Method Details

#_dump(level) ⇒ Object

Marshal Step



214
215
216
# File 'lib/scout/workflow/step/info.rb', line 214

def _dump(level)
  @path
end

#abort(exception = nil) ⇒ Object



2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# File 'lib/scout/workflow/step/status.rb', line 2

def abort(exception = nil)
  if (pid = info[:pid]) && pid != Process.pid && Misc.pid_alive?(pid)
    Log.debug "Kill process #{pid} to abort step #{Log.fingerprint self}"
    begin
      s = Misc.abort_child pid, true
      Log.medium "Aborted pid #{path} #{s}"
    rescue 
      Log.debug("Aborted job #{pid} was not killed: #{$!.message}")
    end
  else
    while @result && streaming? && stream = self.stream
      stream.abort(exception)
    end
    @take_stream.abort(exception) if streaming?
  end
end

#abort_dependenciesObject



102
103
104
# File 'lib/scout/workflow/step/dependencies.rb', line 102

def abort_dependencies
  all_dependencies.each{|dep| dep.abort if dep.running? } 
end

#aborted?Boolean

Returns:

  • (Boolean)


179
180
181
# File 'lib/scout/workflow/step/info.rb', line 179

def aborted?
  status == :aborted || status == 'aborted'
end

#alias?Boolean

Returns:

  • (Boolean)


445
446
447
# File 'lib/scout/workflow/step.rb', line 445

def alias?
  task.alias? if task
end

#all_dependenciesObject



67
68
69
70
71
72
73
74
# File 'lib/scout/workflow/step/dependencies.rb', line 67

def all_dependencies
  @all_dependencies ||= begin
                          all_dependencies = []
                          all_dependencies += dependencies if dependencies
                          all_dependencies += input_dependencies if input_dependencies
                          all_dependencies
                        end
end

#archive_deps(jobs = nil) ⇒ Object



29
30
31
32
33
34
35
36
37
38
39
40
41
# File 'lib/scout/workflow/step/archive.rb', line 29

def archive_deps(jobs = nil)
  jobs = dependencies if jobs.nil?

  archived_info = jobs.inject({}) do |acc,dep|
    next acc unless Open.exists?(dep.info_file)
    acc[dep.path] = dep.info
    acc.merge!(dep.archived_info)
    acc
  end

  self.set_info :archived_info, archived_info
  self.set_info :archived_dependencies, info[:dependencies]
end

#archived_infoObject



2
3
4
5
# File 'lib/scout/workflow/step/archive.rb', line 2

def archived_info
  return {} unless Open.exists?(info_file)
  info[:archived_info] || {}
end

#archived_inputsObject



7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# File 'lib/scout/workflow/step/archive.rb', line 7

def archived_inputs
  return [] unless info[:archived_dependencies]
  archived_info = self.archived_info

  all_inputs = NamedArray.setup([],[])
  deps = info[:archived_dependencies].dup
  seen = []
  while path = deps.pop
    dep_info = archived_info[path]
    if Hash === dep_info
      dep_inputs = dep_info[:inputs]
      NamedArray.setup(dep_inputs, dep_info[:input_names])
      all_inputs.concat(dep_inputs)
      deps.concat(dep_info[:dependencies].collect{|p| p.last } - seen) if dep_info[:dependencies]
      deps.concat(dep_info[:archived_dependencies].collect{|p| p.last } - seen) if dep_info[:archived_dependencies]
    end
    seen << path
  end

  all_inputs
end

#bundle_filesObject



33
34
35
# File 'lib/scout/workflow/step/file.rb', line 33

def bundle_files
  [path, info_file, files_dir.glob("**/*")].flatten.select{|f| Open.exist?(f) }
end

#canfail?Boolean

Returns:

  • (Boolean)


65
66
67
# File 'lib/scout/workflow/step/status.rb', line 65

def canfail?
  @compute && @compute[self.path] && @compute[self.path].include?(:canfail)
end

#child(&block) ⇒ Object



2
3
4
5
6
7
8
9
10
11
12
# File 'lib/scout/workflow/step/children.rb', line 2

def child(&block)
  child_pid = Process.fork &block
  children_pids = info[:children_pids]
  if children_pids.nil?
    children_pids = [child_pid]
  else
    children_pids << child_pid
  end
  set_info :children_pids, children_pids
  child_pid
end

#cleanObject



39
40
41
42
43
44
45
46
47
48
49
50
51
# File 'lib/scout/workflow/step/status.rb', line 39

def clean
  Log.debug "Cleaning job files: #{path}"
  @take_stream = nil 
  @result = nil
  @info = nil
  @info_load_time = nil
  @done = nil
  Open.rm path if Open.exist_or_link?(path)
  Open.rm tmp_path if Open.exist_or_link?(tmp_path)
  Open.rm info_file if Open.exist_or_link?(info_file)
  Open.rm_rf files_dir if Open.exist_or_link?(files_dir)
  self
end

#clean_nameObject



96
97
98
99
100
101
102
103
# File 'lib/scout/workflow/step.rb', line 96

def clean_name
  return @id if @id
  return info[:clean_name] if info.include? :clean_name
  if m = name.match(/(.+?)(?:_[a-z0-9]{32})?(?:\..*)?$/)
    return m[1] 
  end
  return name.split(".").first
end

#cmd(*args) ⇒ Object



14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# File 'lib/scout/workflow/step/children.rb', line 14

def cmd(*args)
  all_args = *args

  all_args << {} unless Hash === all_args.last

  level = all_args.last[:log] || 0
  level = 0 if TrueClass === level
  level = 10 if FalseClass === level
  level = level.to_i

  all_args.last[:log] = true
  all_args.last[:pipe] = true

  io = CMD.cmd(*all_args)
  child_pid = io.pids.first

  children_pids = info[:children_pids]
  if children_pids.nil?
    children_pids = [child_pid]
  else
    children_pids << child_pid
  end
  set_info :children_pids, children_pids

  while c = io.getc
    STDERR << c if Log.severity <= level
    if c == "\n"
      Log.logn "STDOUT [#{child_pid}]: ", level
    end
  end 

  io.join

  nil
end

#config(key, *tokens) ⇒ Object



4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# File 'lib/scout/workflow/step/config.rb', line 4

def config(key, *tokens)
  options = tokens.pop if Hash === tokens.last
  options ||= {}

  new_tokens = []
  if workflow
    workflow_name = workflow.name
    new_tokens << ("workflow:" + workflow_name)
    new_tokens << ("task:" + workflow_name << "#" << task_name.to_s)
  end
  new_tokens << ("task:" + task_name.to_s)
  new_tokens << (task_name.to_s)
  new_tokens << (workflow_name)
  new_tokens << ("task")

  Scout::Config.get(key, tokens + new_tokens, options)
end

#consume_all_streamsObject



346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
# File 'lib/scout/workflow/step.rb', line 346

def consume_all_streams
  threads = [] 
  while @result && streaming? && stream = self.stream
    threads << Open.consume_stream(stream, true)
  end

  threads.compact!

  threads.each do |t| 
    begin
      t.join 
    rescue Exception
      threads.compact.each{|t| t.raise(Aborted); t.join }
      raise $!
    end
  end
end

#copy_linked_files_dirObject



37
38
39
40
41
42
43
44
45
46
47
# File 'lib/scout/workflow/step/file.rb', line 37

def copy_linked_files_dir
  if File.symlink?(self.files_dir)
    begin
      realpath = Open.realpath(self.files_dir)
      Open.rm self.files_dir
      Open.link_dir realpath, self.files_dir
    rescue
      Log.warn "Copy files_dir for #{self.workflow_short_path} failed: " + $!.message
    end
  end
end

#digest_strObject



432
433
434
# File 'lib/scout/workflow/step.rb', line 432

def digest_str
  "Step: " + identify_path
end

#dirty?Boolean

Returns:

  • (Boolean)


81
82
83
# File 'lib/scout/workflow/step/status.rb', line 81

def dirty?
  done? && ! updated?
end

#done?Boolean

Returns:

  • (Boolean)


311
312
313
# File 'lib/scout/workflow/step.rb', line 311

def done?
  @done ||= Open.exist?(path)
end

#error?Boolean

Returns:

  • (Boolean)


175
176
177
# File 'lib/scout/workflow/step/info.rb', line 175

def error?
  status == :error || status == 'error'
end

#exceptionObject



203
204
205
206
207
208
209
210
211
# File 'lib/scout/workflow/step/info.rb', line 203

def exception
  return nil unless info[:exception]
  begin
    Marshal.load(Base64.decode64(info[:exception]))
  rescue
    Log.exception $!
    return Exception.new messages.last
  end
end

#execObject



127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
# File 'lib/scout/workflow/step.rb', line 127

def exec

  if inputs
    if Task === task
      types = task.inputs.collect{|name,type| type }
      new_inputs = inputs.zip(types).collect{|input,info|
        type, desc, default, options = info
        next input unless Step === input
        input.join if input.streaming?
        Task.format_input(input.join.path, type, options)
      }
    else
      if Hash === inputs
        new_inputs = inputs.values
      else
        new_inputs = inputs
      end
      new_inputs = new_inputs.collect{|input|
        Step === input ? input.load : input
      }
    end
    inputs = new_inputs
  end

  @result = begin
              @in_exec = true
              @exec_context.instance_exec(*inputs, &task)
            ensure
              @in_exec = false
            end
end

#file(file = nil) ⇒ Object



20
21
22
23
24
25
# File 'lib/scout/workflow/step/file.rb', line 20

def file(file = nil)
  dir = files_dir
  Path.setup(dir) unless Path === dir
  return dir if file.nil?
  dir[file]
end

#filesObject



27
28
29
30
31
# File 'lib/scout/workflow/step/file.rb', line 27

def files
  files_dir.glob("**/*").reject{|path| File.directory? path }.collect do |path|
    Misc.path_relative_to(files_dir, path)
  end
end

#files_dirObject



2
3
4
5
6
7
8
9
10
11
12
13
14
# File 'lib/scout/workflow/step/file.rb', line 2

def files_dir
  @files_dir ||= begin
                   dir = @path + ".files"
                   if Path === @path
                     @path.annotate(dir)
                   else
                     Path.setup(dir)
                   end
                   dir = dir.find
                   dir.pkgdir = self
                   dir
                 end
end

#files_dir=(dir) ⇒ Object



16
17
18
# File 'lib/scout/workflow/step/file.rb', line 16

def files_dir=(dir)
  @files_dir = dir
end

#fingerprintObject



436
437
438
# File 'lib/scout/workflow/step.rb', line 436

def fingerprint
  digest_str
end

#fork(noload = false, semaphore = nil) ⇒ Object



290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
# File 'lib/scout/workflow/step.rb', line 290

def fork(noload = false, semaphore = nil)
  pid = Process.fork do
    Signal.trap(:TERM) do
      raise Aborted, "Recieved TERM Signal on forked process #{Process.pid}"
    end
    reset_info status: :queue, pid: Process.pid unless present?
    if semaphore
      ScoutSemaphore.synchronize(semaphore) do
        run(noload)
      end
    else
      run(noload)
    end
    join
    exit! 0
  end
  Process.detach pid
  grace
  self
end

#full_task_nameObject



121
122
123
124
125
# File 'lib/scout/workflow/step.rb', line 121

def full_task_name
  return nil if task_name.nil?
  return task_name.to_s if workflow.nil?
  [workflow, task_name] * "#"
end

#graceObject



370
371
372
373
374
375
# File 'lib/scout/workflow/step.rb', line 370

def grace
  while ! present?
    sleep 0.1
  end
  self
end

#identify_pathObject



428
429
430
# File 'lib/scout/workflow/step.rb', line 428

def identify_path
  Resource.identify path
end

#infoObject



45
46
47
48
49
50
51
52
53
54
55
56
57
# File 'lib/scout/workflow/step/info.rb', line 45

def info
  outdated = begin
               @info_load_time && (mtime = Open.mtime(info_file)) && mtime > @info_load_time
             rescue
               true
             end

  if @info.nil? || outdated
    load_info
  end

  @info
end

#info_fileObject



5
6
7
8
9
10
11
12
# File 'lib/scout/workflow/step/info.rb', line 5

def info_file
  return nil if @path.nil?
  @info_file ||= begin
                   info_file = @path + ".info"
                   @path.annotate info_file if Path === @path
                   info_file
                 end
end

#init_info(status = :waiting) ⇒ Object



41
42
43
# File 'lib/scout/workflow/step/info.rb', line 41

def init_info(status=:waiting)
  log status unless info_file.nil? || Open.exists?(info_file)
end

#input_dependenciesObject



24
25
26
27
28
29
30
31
32
33
34
35
# File 'lib/scout/workflow/step/dependencies.rb', line 24

def input_dependencies
  return [] unless inputs
  inputs.collect do |d|
    if Step === d
      d
    elsif (Path === d) && (Step === d.pkgdir)
      d.pkgdir
    else
      nil
    end
  end.compact.uniq
end

#joinObject



381
382
383
384
385
386
387
388
389
390
391
392
393
394
# File 'lib/scout/workflow/step.rb', line 381

def join
  consume_all_streams
  while @result.nil? && (present? && ! (terminated? || done?))
    sleep 0.1
  end

  Misc.wait_child info[:pid] if info[:pid]

  raise self.exception if self.exception

  raise "Error in job #{self.path}" if self.error? or self.aborted? 

  self
end

#loadObject



407
408
409
410
411
# File 'lib/scout/workflow/step.rb', line 407

def load
  return @result unless @result.nil? || streaming?
  join
  done? ? Persist.load(path, type) : exec
end

#load_infoObject



27
28
29
30
# File 'lib/scout/workflow/step/info.rb', line 27

def load_info
  @info = Step.load_info(info_file)
  @info_load_time = Time.now
end

#log(status, message = nil, &block) ⇒ Object



149
150
151
152
153
154
155
156
157
158
159
160
161
# File 'lib/scout/workflow/step/info.rb', line 149

def log(status, message = nil, &block)
  if block_given?
    time = Misc.exec_time &block
    time_str = Misc.format_seconds_short time
    message = message.nil? ? Log.color(:time, time_str) : "#{Log.color :time, time_str} - #{ message }"
  end

  if message
    merge_info :status => status, :message => message
  else
    merge_info :status => status
  end
end

#marshal_load(path) ⇒ Object



222
223
224
# File 'lib/scout/workflow/step/info.rb', line 222

def marshal_load(path)
  Step.new path
end

#merge_info(new_info) ⇒ Object



68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
# File 'lib/scout/workflow/step/info.rb', line 68

def merge_info(new_info)
  info = self.info
  new_info.each do |key,value|
    value = Annotation.purge(value)
    if key == :status
      message = new_info[:message]
      if message.nil? && (value == :done || value == :error || value == :aborted)
        issued = info[:issued]
        start = info[:start]
        eend = new_info[:end]

        start = Time.parse start if String === start
        eend = Time.parse eend if String === eend
        issued = Time.parse issued if String === issued

        if start && eend
          time = eend - start
          total_time = eend - issued
          if total_time - time > 1
            time_str = "#{Misc.format_seconds_short(time)} (#{Misc.format_seconds_short(total_time)})"
          else
            time_str = Misc.format_seconds_short(time)
          end
          info[:time_elapsed] = time
          info[:total_time_elapsed] = total_time
          message = Log.color(:time, time_str)
        end
      end
      report_status value, message 
    end

    if key == :message
      messages = info[:messages] || []
      messages << value
      info[:messages] = messages
      next
    end

    if Exception === value
      begin
        Marshal.dump(value)
      rescue TypeError
        if ScoutException === value
          new = ScoutException.new value.message
        else
          new = Exception.new value.message
        end
        new.set_backtrace(value.backtrace)
        value = new
      end
    end

    if info.include?(key)
      case info[key]
      when Array
        info[key].concat(Array === value ? value : [value])
      when Hash
        info[key].merge! value
      else
        info[key] = value
      end
    else
      info[key] = value
    end
  end
  save_info(info)
end

#message(message) ⇒ Object



167
168
169
# File 'lib/scout/workflow/step/info.rb', line 167

def message(message)
  merge_info :message => message
end

#messagesObject



163
164
165
# File 'lib/scout/workflow/step/info.rb', line 163

def messages
  info[:messages]
end

#monitor_stream(stream, options = {}, &block) ⇒ Object



29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
# File 'lib/scout/workflow/step/progress.rb', line 29

def monitor_stream(stream, options = {}, &block)
  case options[:bar] 
  when TrueClass
    bar = progress_bar 
  when Hash
    bar = progress_bar options[:bar]
  when Numeric
    bar = progress_bar :max => options[:bar]
  else
    bar = options[:bar]
  end

  out = if bar.nil?
          Open.line_monitor_stream stream, &block
        elsif (block.nil? || block.arity == 0)
          Open.line_monitor_stream stream do
            bar.tick
          end
        elsif block.arity == 1
          Open.line_monitor_stream stream do |line|
            bar.tick
            block.call line
          end
        elsif block.arity == 2
          Open.line_monitor_stream stream do |line|
            block.call line, bar
          end
        end

  if bar
    ConcurrentStream.setup(out, :abort_callback => Proc.new{
      bar.done
      Log::ProgressBar.remove_bar(bar, true)
    }, :callback => Proc.new{
      bar.done
      Log::ProgressBar.remove_bar(bar)
    })
  end

  bgzip = (options[:compress] || options[:gzip]).to_s == 'bgzip'
  bgzip = true if options[:bgzip]

  gzip = true if options[:compress] || options[:gzip]
  if bgzip
    Open.bgzip(out)
  elsif gzip
    Open.gzip(out)
  else
    out
  end
end

#nameObject



88
89
90
# File 'lib/scout/workflow/step.rb', line 88

def name
  @name ||= File.basename(@path)
end

#newer_dependenciesObject



23
24
25
26
27
28
# File 'lib/scout/workflow/step/status.rb', line 23

def newer_dependencies
  newer = rec_dependencies.select{|dep| Path.newer?(self.path, dep.path) }
  newer += input_dependencies.select{|dep| Path.newer?(self.path, dep.path) }
  newer += rec_dependencies.collect{|dep| dep.input_dependencies }.flatten.select{|dep| Path.newer?(self.path, dep.path) }
  newer
end

#overriden?Boolean

Returns:

  • (Boolean)


187
188
189
190
# File 'lib/scout/workflow/step/info.rb', line 187

def overriden?
  @overriden = overriden_task || overriden_workflow || overriden_deps.any? if @overriden.nil?
  @overriden
end

#overriden_depsObject



192
193
194
# File 'lib/scout/workflow/step/info.rb', line 192

def overriden_deps
  dependencies.select{|d| d.overriden? }
end

#pidObject



59
60
61
# File 'lib/scout/workflow/step/info.rb', line 59

def pid
  info[:pid]
end

#pid=(pid) ⇒ Object



63
64
65
# File 'lib/scout/workflow/step/info.rb', line 63

def pid=(pid)
  set_info :pid, pid
end

#prepare_dependenciesObject



37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
# File 'lib/scout/workflow/step/dependencies.rb', line 37

def prepare_dependencies
  inverse_dep = {}

  dependencies.each do |dep|
    if dep.present? && ! dep.updated?
      Log.medium "Clean outdated #{dep.path}"
      dep.clean
    end

    next if dep.done?
    next if dep.error? && ! dep.recoverable_error?

    if dep.dependencies
      dep.dependencies.each do |d|
        inverse_dep[d] ||= []
        inverse_dep[d] << dep
      end
    end

    input_dependencies.each do |d|
      inverse_dep[d] ||= []
      inverse_dep[d] << dep
    end
  end if dependencies

  inverse_dep.each do |dep,list|
    dep.tee_copies = list.length
  end
end

#present?Boolean

Returns:

  • (Boolean)


364
365
366
367
368
# File 'lib/scout/workflow/step.rb', line 364

def present?
  Open.exist?(path) ||
    Open.exist?(info_file) ||
    Open.exist?(files_dir)
end

#produce(with_fork: false) ⇒ Object



396
397
398
399
400
401
402
403
404
405
# File 'lib/scout/workflow/step.rb', line 396

def produce(with_fork: false)
  clean if error? && recoverable_error?
  if with_fork
    self.fork
    self.join
  else
    run(:no_load)
  end
  self
end

#progress_bar(msg = "Progress", options = nil, &block) ⇒ Object



2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# File 'lib/scout/workflow/step/progress.rb', line 2

def progress_bar(msg = "Progress", options = nil, &block)
  if Hash === msg and options.nil?
    options = msg
    msg = nil
  end
  options = {} if options.nil?

  max = options[:max]
  Open.mkdir files_dir
  bar = Log::ProgressBar.new_bar(max, {:desc => msg, :file => (@exec ? nil : file(:progress))}.merge(options))

  if block_given?
    bar.init
    res = yield bar
    bar.remove
    res
  else
    bar
  end
end

#rec_dependencies(connected = false, seen = Set.new) ⇒ Object



2
3
4
5
6
7
8
9
10
11
12
13
14
# File 'lib/scout/workflow/step/dependencies.rb', line 2

def rec_dependencies(connected = false, seen = Set.new)
  @rec_dependencies = {}
  @rec_dependencies[connected] ||= begin
                          direct_deps = []
                          dependencies.each do |dep|
                            next if seen.include? dep.path
                            next if connected && dep.done? && dep.updated?
                            direct_deps << dep
                          end if dependencies
                          seen += direct_deps.collect{|d| d.path }
                          direct_deps.inject(Set.new(direct_deps)){|acc,d| acc += d.rec_dependencies(connected, seen) }
                        end
end

#recoverable_error?Boolean

Returns:

  • (Boolean)


19
20
21
# File 'lib/scout/workflow/step/status.rb', line 19

def recoverable_error?
  self.error? && ! (ScoutException === self.exception)
end

#recursive_cleanObject



58
59
60
61
62
63
# File 'lib/scout/workflow/step/status.rb', line 58

def recursive_clean
  dependencies.each do |dep|
    dep.recursive_clean
  end
  clean
end

#recursive_inputsObject



16
17
18
19
20
21
22
# File 'lib/scout/workflow/step/dependencies.rb', line 16

def recursive_inputs
  recursive_inputs = NamedArray === inputs ? inputs.to_hash : {}
  return recursive_inputs if dependencies.nil?
  dependencies.inject(recursive_inputs) do |acc,dep|
    acc = dep.recursive_inputs.merge(acc)
  end
end

#recursive_overriden_depsObject



196
197
198
199
200
201
# File 'lib/scout/workflow/step/info.rb', line 196

def recursive_overriden_deps
  overriden_deps = dependencies.select{|d| 
    d.overriden? 
  }
  (overriden_deps + overriden_deps.collect{|d| d.recursive_overriden_deps }).flatten.uniq
end

#report_status(status, message = nil) ⇒ Object



140
141
142
143
144
145
146
147
# File 'lib/scout/workflow/step/info.rb', line 140

def report_status(status, message = nil)
  if message.nil?
    Log.info [Log.color(:status, status, true), Log.color(:task, task_name, true), Log.color(:path, path)] * " "
  else
    message = Log.fingerprint(message.split("\n").first).sub(/^'/,'').sub(/'$/,'')
    Log.info [Log.color(:status, status, true), Log.color(:task, task_name, true), message, Log.color(:path, path)] * " "
  end
end

#reset_info(info = {}) ⇒ Object



37
38
39
# File 'lib/scout/workflow/step/info.rb', line 37

def reset_info(info = {})
  save_info(@info = info)
end

#run(stream = false) ⇒ Object



170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
# File 'lib/scout/workflow/step.rb', line 170

def run(stream = false)
  case stream
  when TrueClass, :stream
    no_load = :stream
  when :no_load
    no_load = true
  else
    no_load = false
  end

  if done?
    if no_load
      if no_load == :stream
        return self.stream
      else
        return self.path
      end
    else
      return @result || self.load
    end
  end


  return @result || self.load if done?
  prepare_dependencies
  begin


    @result = Persist.persist(name, type, :path => path, :tee_copies => tee_copies, no_load: no_load) do
      input_names = (task.respond_to?(:inputs) && task.inputs) ? task.inputs.collect{|name,_| name} : []


      reset_info :status => :setup, :issued => Time.now,
        :pid => Process.pid, :pid_hostname => Misc.hostname, 
        :task_name => task_name, :workflow => workflow.to_s,
        :provided_inputs => IndiferentHash.serializable(provided_inputs),
        :non_default_inputs => non_default_inputs,
        :inputs => IndiferentHash.serializable(inputs), :input_names => input_names, :type => type,
        :dependencies => (dependencies || []) .collect{|d| d.path }

      run_dependencies

      set_info :start, Time.now
      log :start
      @exec_result = exec

      if @exec_result.nil? && Open.exist?(self.tmp_path) && ! Open.exist?(self.path)
        Open.mv self.tmp_path, self.path
      else
        @exec_result = @exec_result.stream if @exec_result.respond_to?(:stream) && ! (TSV === @exec_result)
      end

      @exec_result

      if (IO === @exec_result || StringIO === @exec_result) && (ENV["SCOUT_NO_STREAM"] == "true" || ! stream)
        Open.sensible_write(self.path, @exec_result)
        @exec_result = nil
      else
        @exec_result
      end
    end

    if TrueClass === no_load
      consume_all_streams if streaming?
      @result = nil
    elsif no_load && ! (IO === @result)
      @result = nil
    end

    @result
  rescue Exception => e
    begin
      begin
        if ConcurrentStreamProcessFailed === e
          s = e.concurrent_stream
          e.concurrent_stream = nil
          exception_encoded = Base64.encode64(Marshal.dump(e))
          e.concurrent_stream = s
        else
          exception_encoded = Base64.encode64(Marshal.dump(e))
        end
        merge_info :status => :error, :exception => exception_encoded, :end => Time.now, :backtrace => e.backtrace, :message => "#{e.class}: #{e.message}"
      rescue Exception
        exception_encoded = Base64.encode64(Marshal.dump(Exception.new(e.message)))
        merge_info :status => :error, :exception => exception_encoded, :end => Time.now, :backtrace => e.backtrace, :message => "#{e.class}: #{e.message}"
      end

      abort_dependencies
    ensure
      raise e
    end
  ensure
    if ! (error? || aborted?)
      if @result && streaming?
        ConcurrentStream.setup(@result) do
          merge_info :status => :done, :end => Time.now
        end

        @result.abort_callback = proc do |exception|
          Open.rm self.path
          if exception.nil? || Aborted === exception || Interrupt === exception
            merge_info :status => :aborted, :end => Time.now
          else
            begin
              merge_info :status => :error, :exception => exception, :end => Time.now
            rescue Exception
              Log.exception $!
            end
          end
        end


        log :streaming
      else
        merge_info :status => :done, :end => Time.now
      end
    end
  end
end

#run_dependenciesObject



76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
# File 'lib/scout/workflow/step/dependencies.rb', line 76

def run_dependencies
  all_dependencies.each do |dep| 
    next if dep.running? || dep.done?
    next if dep.error? && ! dep.recoverable_error?

    compute_options = compute[dep.path] if compute
    compute_options = [] if compute_options.nil?

    next if compute_options.include?(false)

    stream = compute_options.include?(:stream)
    stream = true unless ENV["SCOUT_EXPLICIT_STREAMING"] == 'true'
    stream = :no_load if compute_options.include?(:produce)

    begin
      dep.run(stream)
    rescue ScoutException
      if compute_options.include?(:canfail)
        Log.medium "Allow failing of #{dep.path}"
      else
        raise $!
      end
    end
  end
end

#running?Boolean

Returns:

  • (Boolean)


183
184
185
# File 'lib/scout/workflow/step/info.rb', line 183

def running?
  ! (done? && status == :done) && (info[:pid] && Misc.pid_alive?(info[:pid]))
end

#save_info(info = nil) ⇒ Object



32
33
34
35
# File 'lib/scout/workflow/step/info.rb', line 32

def save_info(info = nil)
  Persist.save(@info = info, info_file, SERIALIZER)
  @info_load_time = Time.now
end

#save_input_bundle(input_bundle) ⇒ Object



14
15
16
17
18
19
20
21
22
23
# File 'lib/scout/workflow/step/inputs.rb', line 14

def save_input_bundle(input_bundle)
  TmpFile.with_dir do |dir|
    TmpFile.with_file do |tmp_tar|
      save_inputs(dir)
      Open.mkdir File.dirname(input_bundle)
      Misc.tarize dir, tmp_tar
      Open.mv tmp_tar, input_bundle
    end
  end
end

#save_inputs(inputs_dir) ⇒ Object



2
3
4
5
6
7
8
9
10
11
12
# File 'lib/scout/workflow/step/inputs.rb', line 2

def save_inputs(inputs_dir)
  if clean_name != name
    #hash = name[clean_name.length..-1]
    #inputs_dir += hash
    Log.medium "Saving job inputs to: #{Log.fingerprint inputs_dir}"
    self.task.save_inputs(inputs_dir, provided_inputs)
  else
    Log.medium "Saving no input job: #{Log.fingerprint inputs_dir}"
    Open.touch(inputs_dir)
  end
end

#set_info(key, value) ⇒ Object



136
137
138
# File 'lib/scout/workflow/step/info.rb', line 136

def set_info(key, value)
  merge_info(key => value)
end

#short_pathObject



92
93
94
# File 'lib/scout/workflow/step.rb', line 92

def short_path
  [workflow.to_s, task_name, name] * "/"
end

#started?Boolean

Returns:

  • (Boolean)


69
70
71
72
73
74
75
# File 'lib/scout/workflow/step/status.rb', line 69

def started?
  return true if done?
  return false unless Open.exist?(info_file)
  pid = info[:pid]
  return false unless pid
  return Misc.pid_alive?(pid)
end

#statusObject



171
172
173
# File 'lib/scout/workflow/step/info.rb', line 171

def status
  info[:status].tap{|s| s.nil? ? s : s.to_sym }
end

#step(task_name) ⇒ Object



413
414
415
416
417
418
419
420
421
422
423
424
425
426
# File 'lib/scout/workflow/step.rb', line 413

def step(task_name)
  task_name = task_name.to_sym
  dependencies.each do |dep|
    return dep if dep.task_name && dep.task_name.to_sym == task_name
    return dep if dep.overriden_task && dep.overriden_task.to_sym == task_name
  end
  dependencies.each do |dep|
    return dep if dep.task_name && dep.task_name.to_sym == task_name
    return dep if dep.overriden_task && dep.overriden_task.to_sym == task_name
    rec_dep = dep.step(task_name)
    return rec_dep if rec_dep
  end
  nil
end

#streamObject



319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
# File 'lib/scout/workflow/step.rb', line 319

def stream
  synchronize do
    if streaming? && ! @result.nil?
      if @result.next
        Log.debug "Taking result #{Log.fingerprint @result} next #{Log.fingerprint @result.next}"
      else
        Log.debug "Taking result #{Log.fingerprint @result}"
      end

      @take_stream, @result = @result, @result.next

      return @take_stream
    end
  end

  if done?
    Open.open(self.path)
  else
    if running? || waiting?
      join
      Open.open(self.path)
    else
      exec
    end
  end
end

#streaming?Boolean

Returns:

  • (Boolean)


315
316
317
# File 'lib/scout/workflow/step.rb', line 315

def streaming?
  @take_stream || IO === @result || StringIO === @result 
end

#synchronize(&block) ⇒ Object



33
34
35
# File 'lib/scout/workflow/step.rb', line 33

def synchronize(&block)
  @mutex.synchronize(&block)
end

#task_signatureObject



440
441
442
443
# File 'lib/scout/workflow/step.rb', line 440

def task_signature
  workflow_name = String === workflow ? workflow : workflow.name
  [workflow_name, task_name] * "#"
end

#terminated?Boolean

Returns:

  • (Boolean)


377
378
379
# File 'lib/scout/workflow/step.rb', line 377

def terminated?
  ! @in_exec && (done? || error? || aborted?)
end

#tmp_pathObject



159
160
161
162
163
164
165
166
167
# File 'lib/scout/workflow/step.rb', line 159

def tmp_path
  @tmp_path ||= begin
                  basename = File.basename(@path)
                  dirname = File.dirname(@path)
                  tmp_path = File.join(dirname, '.' + basename)
                  @path.setup(tmp_path) if Path === @path
                  tmp_path
                end
end

#to_jsonObject



21
22
23
# File 'lib/scout/workflow/step/load.rb', line 21

def to_json(...)
  self.path.to_json
end

#traverse(obj, desc: nil, **kwargs, &block) ⇒ Object



23
24
25
26
27
# File 'lib/scout/workflow/step/progress.rb', line 23

def traverse(obj, desc: nil , **kwargs, &block)
  desc = "Processing #{self.short_path}" if desc.nil?
  kwargs[:bar] = self.progress_bar(desc) unless kwargs.include?(:bar)
  TSV.traverse obj, **kwargs, &block
end

#updated?Boolean

Returns:

  • (Boolean)


30
31
32
33
34
35
36
37
# File 'lib/scout/workflow/step/status.rb', line 30

def updated?
  return false if self.error? && self.recoverable_error?
  return true if (self.done? || (self.error? && ! self.recoverable_error?)) && ! ENV["SCOUT_UPDATE"]
  newer = newer_dependencies

  Log.low "Newer deps found for #{Log.fingerprint self}: #{Log.fingerprint newer}" if newer.any?
  newer.empty?
end

#waiting?Boolean

Returns:

  • (Boolean)


77
78
79
# File 'lib/scout/workflow/step/status.rb', line 77

def waiting?
  present? and not started?
end