Class: Step
- Inherits:
-
Object
- Object
- Step
- Defined in:
- lib/scout/workflow/step.rb,
lib/scout/workflow/step/file.rb,
lib/scout/workflow/step/info.rb,
lib/scout/workflow/step/load.rb,
lib/scout/workflow/step/config.rb,
lib/scout/workflow/step/inputs.rb,
lib/scout/workflow/step/status.rb,
lib/scout/workflow/step/archive.rb,
lib/scout/workflow/step/children.rb,
lib/scout/workflow/step/progress.rb,
lib/scout/workflow/step/provenance.rb,
lib/scout/workflow/step/dependencies.rb
Constant Summary collapse
- SERIALIZER =
Scout::Config.get(:serializer, :step_info, :info, :step, env: "SCOUT_SERIALIZER", default: :json)
Instance Attribute Summary collapse
-
#compute ⇒ Object
Returns the value of attribute compute.
-
#dependencies ⇒ Object
Returns the value of attribute dependencies.
-
#exec_context ⇒ Object
Returns the value of attribute exec_context.
-
#id ⇒ Object
Returns the value of attribute id.
-
#inputs ⇒ Object
Returns the value of attribute inputs.
-
#non_default_inputs ⇒ Object
Returns the value of attribute non_default_inputs.
-
#overriden ⇒ Object
Returns the value of attribute overriden.
-
#overriden_task ⇒ Object
Returns the value of attribute overriden_task.
-
#overriden_workflow ⇒ Object
Returns the value of attribute overriden_workflow.
-
#path ⇒ Object
Returns the value of attribute path.
-
#provided_inputs ⇒ Object
Returns the value of attribute provided_inputs.
-
#result ⇒ Object
readonly
Returns the value of attribute result.
-
#task ⇒ Object
Returns the value of attribute task.
-
#task_name ⇒ Object
Returns the value of attribute task_name.
-
#tee_copies ⇒ Object
Returns the value of attribute tee_copies.
-
#type ⇒ Object
Returns the value of attribute type.
-
#workflow ⇒ Object
Returns the value of attribute workflow.
Class Method Summary collapse
- ._load(path) ⇒ Object
- .clean(file) ⇒ Object
- .job_path?(path) ⇒ Boolean
- .load(path) ⇒ Object
- .load_info(info_file) ⇒ Object
- .prov_indent(step, offset, input_dependencies) ⇒ Object
- .prov_report(step, offset = 0, task = nil, seen = [], expand_repeats = false, input = nil, input_dependencies = nil) ⇒ Object
- .prov_report_msg(status, name, path, info, input = nil) ⇒ Object
- .prov_status_msg(status) ⇒ Object
- .relocate(path) ⇒ Object
- .status_color(status) ⇒ Object
- .wait_for_jobs(jobs, canfail = false) ⇒ Object
Instance Method Summary collapse
-
#_dump(level) ⇒ Object
Marshal Step.
- #abort(exception = nil) ⇒ Object
- #abort_dependencies ⇒ Object
- #aborted? ⇒ Boolean
- #alias? ⇒ Boolean
- #all_dependencies ⇒ Object
- #archive_deps(jobs = nil) ⇒ Object
- #archived_info ⇒ Object
- #archived_inputs ⇒ Object
- #bundle_files ⇒ Object
- #canfail? ⇒ Boolean
- #child(&block) ⇒ Object
- #clean ⇒ Object
- #clean_name ⇒ Object
- #cmd(*args) ⇒ Object
- #config(key, *tokens) ⇒ Object
- #consume_all_streams ⇒ Object
- #copy_linked_files_dir ⇒ Object
- #digest_str ⇒ Object
- #dirty? ⇒ Boolean
- #done? ⇒ Boolean
- #error? ⇒ Boolean
- #exception ⇒ Object
- #exec ⇒ Object
- #file(file = nil) ⇒ Object
- #files ⇒ Object
- #files_dir ⇒ Object
- #files_dir=(dir) ⇒ Object
- #fingerprint ⇒ Object
- #fork(noload = false, semaphore = nil) ⇒ Object
- #full_task_name ⇒ Object
- #grace ⇒ Object
- #identify_path ⇒ Object
- #info ⇒ Object
- #info_file ⇒ Object
- #init_info(status = :waiting) ⇒ Object
-
#initialize(path = nil, inputs = nil, dependencies = nil, id = nil, non_default_inputs = nil, provided_inputs = nil, compute = nil, exec_context: nil, &task) ⇒ Step
constructor
A new instance of Step.
- #input_dependencies ⇒ Object
- #join ⇒ Object
- #load ⇒ Object
- #load_info ⇒ Object
- #log(status, message = nil, &block) ⇒ Object
- #marshal_load(path) ⇒ Object
- #merge_info(new_info) ⇒ Object
- #message(message) ⇒ Object
- #messages ⇒ Object
- #monitor_stream(stream, options = {}, &block) ⇒ Object
- #name ⇒ Object
- #newer_dependencies ⇒ Object
- #overriden? ⇒ Boolean
- #overriden_deps ⇒ Object
- #pid ⇒ Object
- #pid=(pid) ⇒ Object
- #prepare_dependencies ⇒ Object
- #present? ⇒ Boolean
- #produce(with_fork: false) ⇒ Object
- #progress_bar(msg = "Progress", options = nil, &block) ⇒ Object
- #rec_dependencies(connected = false, seen = Set.new) ⇒ Object
- #recoverable_error? ⇒ Boolean
- #recursive_clean ⇒ Object
- #recursive_inputs ⇒ Object
- #recursive_overriden_deps ⇒ Object
- #report_status(status, message = nil) ⇒ Object
- #reset_info(info = {}) ⇒ Object
- #run(stream = false) ⇒ Object
- #run_dependencies ⇒ Object
- #running? ⇒ Boolean
- #save_info(info = nil) ⇒ Object
- #save_input_bundle(input_bundle) ⇒ Object
- #save_inputs(inputs_dir) ⇒ Object
- #set_info(key, value) ⇒ Object
- #short_path ⇒ Object
- #started? ⇒ Boolean
- #status ⇒ Object
- #step(task_name) ⇒ Object
- #stream ⇒ Object
- #streaming? ⇒ Boolean
- #synchronize(&block) ⇒ Object
- #task_signature ⇒ Object
- #terminated? ⇒ Boolean
- #tmp_path ⇒ Object
- #to_json ⇒ Object
- #traverse(obj, desc: nil, **kwargs, &block) ⇒ Object
- #updated? ⇒ Boolean
- #waiting? ⇒ Boolean
Constructor Details
#initialize(path = nil, inputs = nil, dependencies = nil, id = nil, non_default_inputs = nil, provided_inputs = nil, compute = nil, exec_context: nil, &task) ⇒ Step
Returns a new instance of Step.
19 20 21 22 23 24 25 26 27 28 29 30 31 |
# File 'lib/scout/workflow/step.rb', line 19 def initialize(path = nil, inputs = nil, dependencies = nil, id = nil, non_default_inputs = nil, provided_inputs = nil, compute = nil, exec_context: nil, &task) @path = path @inputs = inputs @dependencies = dependencies @id = id @non_default_inputs = non_default_inputs @provided_inputs = provided_inputs @compute = compute @task = task @mutex = Mutex.new @tee_copies = 1 @exec_context = exec_context || self end |
Instance Attribute Details
#compute ⇒ Object
Returns the value of attribute compute.
18 19 20 |
# File 'lib/scout/workflow/step.rb', line 18 def compute @compute end |
#dependencies ⇒ Object
Returns the value of attribute dependencies.
18 19 20 |
# File 'lib/scout/workflow/step.rb', line 18 def dependencies @dependencies end |
#exec_context ⇒ Object
Returns the value of attribute exec_context.
18 19 20 |
# File 'lib/scout/workflow/step.rb', line 18 def exec_context @exec_context end |
#id ⇒ Object
Returns the value of attribute id.
18 19 20 |
# File 'lib/scout/workflow/step.rb', line 18 def id @id end |
#inputs ⇒ Object
Returns the value of attribute inputs.
18 19 20 |
# File 'lib/scout/workflow/step.rb', line 18 def inputs @inputs end |
#non_default_inputs ⇒ Object
Returns the value of attribute non_default_inputs.
18 19 20 |
# File 'lib/scout/workflow/step.rb', line 18 def non_default_inputs @non_default_inputs end |
#overriden ⇒ Object
Returns the value of attribute overriden.
18 19 20 |
# File 'lib/scout/workflow/step.rb', line 18 def overriden @overriden end |
#overriden_task ⇒ Object
Returns the value of attribute overriden_task.
18 19 20 |
# File 'lib/scout/workflow/step.rb', line 18 def overriden_task @overriden_task end |
#overriden_workflow ⇒ Object
Returns the value of attribute overriden_workflow.
18 19 20 |
# File 'lib/scout/workflow/step.rb', line 18 def overriden_workflow @overriden_workflow end |
#path ⇒ Object
Returns the value of attribute path.
18 19 20 |
# File 'lib/scout/workflow/step.rb', line 18 def path @path end |
#provided_inputs ⇒ Object
Returns the value of attribute provided_inputs.
18 19 20 |
# File 'lib/scout/workflow/step.rb', line 18 def provided_inputs @provided_inputs end |
#result ⇒ Object (readonly)
Returns the value of attribute result.
169 170 171 |
# File 'lib/scout/workflow/step.rb', line 169 def result @result end |
#task ⇒ Object
Returns the value of attribute task.
18 19 20 |
# File 'lib/scout/workflow/step.rb', line 18 def task @task end |
#task_name ⇒ Object
Returns the value of attribute task_name.
105 106 107 |
# File 'lib/scout/workflow/step.rb', line 105 def task_name @task_name end |
#tee_copies ⇒ Object
Returns the value of attribute tee_copies.
18 19 20 |
# File 'lib/scout/workflow/step.rb', line 18 def tee_copies @tee_copies end |
#type ⇒ Object
Returns the value of attribute type.
83 84 85 |
# File 'lib/scout/workflow/step.rb', line 83 def type @type end |
#workflow ⇒ Object
Returns the value of attribute workflow.
18 19 20 |
# File 'lib/scout/workflow/step.rb', line 18 def workflow @workflow end |
Class Method Details
._load(path) ⇒ Object
218 219 220 |
# File 'lib/scout/workflow/step/info.rb', line 218 def self._load(path) Step.new path end |
.clean(file) ⇒ Object
53 54 55 |
# File 'lib/scout/workflow/step/status.rb', line 53 def self.clean(file) Step.new(file).clean end |
.job_path?(path) ⇒ Boolean
2 3 4 |
# File 'lib/scout/workflow/step/provenance.rb', line 2 def self.job_path?(path) path.split("/")[-4] == "jobs" end |
.load(path) ⇒ Object
15 16 17 18 19 |
# File 'lib/scout/workflow/step/load.rb', line 15 def self.load(path) path = relocate(path) unless Open.exists?(path) #raise "Could not load #{path}" unless Open.exists?(path) s = Step.new path end |
.load_info(info_file) ⇒ Object
14 15 16 17 18 19 20 21 22 23 24 25 |
# File 'lib/scout/workflow/step/info.rb', line 14 def self.load_info(info_file) info = begin Persist.load(info_file, SERIALIZER) || {} rescue begin Persist.load(info_file, :marshal) || {} rescue {status: :noinfo} end end IndiferentHash.setup(info) end |
.prov_indent(step, offset, input_dependencies) ⇒ Object
89 90 91 92 93 94 95 96 97 98 99 100 101 102 |
# File 'lib/scout/workflow/step/provenance.rb', line 89 def self.prov_indent(step, offset, input_dependencies) return (" " * (offset)) if step.alias? (" " * offset + "🡵") elsif step.overriden_task (" " * offset + "🡇") elsif input_dependencies.include?(step) (" " * offset + "┝") elsif step.input_dependencies.any? (" " * offset + "╰") else (" " * (offset+1)) end end |
.prov_report(step, offset = 0, task = nil, seen = [], expand_repeats = false, input = nil, input_dependencies = nil) ⇒ Object
104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 |
# File 'lib/scout/workflow/step/provenance.rb', line 104 def self.prov_report(step, offset = 0, task = nil, seen = [], = false, input = nil, input_dependencies = nil) info = step.info || {} info[:task_name] = task path = step.path status = info[:status] || :missing status = status.to_sym if String === status status = :noinfo if status == :missing && Open.exist?(path) status = :remote if Open.remote?(path) || Open.ssh?(path) name = info[:name] || File.basename(path) status = :unsync if status == :done and not Open.exist?(path) status = :notfound if status == :noinfo and not Open.exist?(path) this_step_msg = prov_report_msg(status, name, path, info, input) input_dependencies ||= {} step.dependencies.each do |dep| if dep.input_dependencies.any? dep.input_dependencies.each do |id| input_name, _dep = dep.recursive_inputs.select{|f,d| d == id || (String === d && d.start_with?(id.files_dir)) || (Array === d && d.include?(id)) }.keys.last if input_name input_dependencies[id] ||= [] input_dependencies[id] << [dep, input_name] end end end end if step.dependencies str = [] indent = prov_indent(step, offset, input_dependencies) str << indent + this_step_msg if ENV["SCOUT_ORIGINAL_STACK"] == 'true' step.dependencies.dup.tap{|l| l.reverse! if ENV["SCOUT_ORIGINAL_STACK"] == 'true' }.each do |dep| path = dep.path new = ! seen.include?(path) if new seen << path str.concat(prov_report(dep, offset + 1, task, seen, , input_dependencies[dep], input_dependencies.dup).split("\n")) else if str << Log.color(Step.status_color(dep.status), Log.uncolor(prov_report(dep, offset+1, task, input_dependencies[dep], input_dependencies.dup))) else info = dep.info || {} status = info[:status] || :missing status = "remote" if Open.remote?(path) || Open.ssh?(path) name = info[:name] || File.basename(path) status = :unsync if status == :done and not Open.exist?(path) status = :notfound if status == :noinfo and not Open.exist?(path) dep_indent = prov_indent(dep, offset+1, input_dependencies) str << dep_indent + Log.color(Step.status_color(status), Log.uncolor(prov_report_msg(status, name, path, info, input_dependencies[dep]))) end end end if step.dependencies str << indent + this_step_msg unless ENV["SCOUT_ORIGINAL_STACK"] == 'true' str * "\n" end |
.prov_report_msg(status, name, path, info, input = nil) ⇒ Object
32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 |
# File 'lib/scout/workflow/step/provenance.rb', line 32 def self.prov_report_msg(status, name, path, info, input = nil) parts = path.sub(/\{.*/,'').split "/" parts.pop task = Log.color(:yellow, parts.pop) workflow = Log.color(:magenta, parts.pop) if ! Step.job_path?(path) task, status, workflow = Log.color(:yellow, info[:task_name]), Log.color(:green, "file"), Log.color(:magenta, "-") end path_mtime = begin Open.mtime(path) rescue Exception nil end if input.nil? || input.empty? input_str = nil else input = input.reject{|dep,name| (input & dep.dependencies.collect{|d| [d,name]}).any? } input = input.reject{|dep,name| (input & dep.input_dependencies.collect{|d| [d,name]}).any? } input_str = Log.color(:magenta, "-> ") + input.collect{|dep,name| Log.color(:yellow, dep.task_name.to_s) + ":" + Log.color(:yellow, name) }.uniq * " " end str = if ! (Open.remote?(path) || Open.ssh?(path)) && (Open.exists?(path) && $main_mtime && path_mtime && ($main_mtime - path_mtime) < -2) prov_status_msg(status.to_s) << " " << [workflow, task, path, input_str].compact * " " << " (#{Log.color(:red, "Mtime out of sync") })" else prov_status_msg(status.to_s) << " " << [workflow, task, path, input_str].compact * " " end if $inputs and $inputs.any? job_inputs = Step.new(path).recursive_inputs.to_hash IndiferentHash.setup(job_inputs) $inputs.each do |input| value = job_inputs[input] next if value.nil? value_str = Log.fingerprint(value) str << "\t#{Log.color :magenta, input}=#{value_str}" end end if $info_fields and $info_fields.any? $info_fields.each do |field| IndiferentHash.setup(info) value = info[field] next if value.nil? value_str = Log.fingerprint(value) str << "\t#{Log.color :magenta, field}=#{value_str}" end end str end |
.prov_status_msg(status) ⇒ Object
27 28 29 30 |
# File 'lib/scout/workflow/step/provenance.rb', line 27 def self.prov_status_msg(status) color = status_color(status) Log.color(color, status.to_s) end |
.relocate(path) ⇒ Object
2 3 4 5 6 7 8 9 10 11 12 13 |
# File 'lib/scout/workflow/step/load.rb', line 2 def self.relocate(path) return path if Open.exists?(path) Path.setup(path) unless Path === path relocated = path.relocate return relocated if Open.exists?(relocated) if path.scan("/").length >= 2 subpath = path.split("/")[-3..-1] * "/" relocated = Path.setup("var/jobs")[subpath] return relocated if Open.exists?(relocated) end path end |
.status_color(status) ⇒ Object
6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
# File 'lib/scout/workflow/step/provenance.rb', line 6 def self.status_color(status) case status.to_sym when :error, :aborted, :dead, :unsync :red when :streaming, :started :cyan when :done, :noinfo :green when :dependencies, :waiting, :setup :yellow when :notfound, :cleaned, :missing :blue else if status.to_s.index ">" :cyan else :cyan end end end |
.wait_for_jobs(jobs, canfail = false) ⇒ Object
106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 |
# File 'lib/scout/workflow/step/dependencies.rb', line 106 def self.wait_for_jobs(jobs, canfail=false) threads = [] jobs.each do |job| threads << Thread.new do Thread.current.report_on_exception = false begin job.join rescue Exception case canfail when TrueClass next else if canfail === $! next else raise $! end end end end end threads.each do |t| t.join end end |
Instance Method Details
#_dump(level) ⇒ Object
Marshal Step
214 215 216 |
# File 'lib/scout/workflow/step/info.rb', line 214 def _dump(level) @path end |
#abort(exception = nil) ⇒ Object
2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
# File 'lib/scout/workflow/step/status.rb', line 2 def abort(exception = nil) if (pid = info[:pid]) && pid != Process.pid && Misc.pid_alive?(pid) Log.debug "Kill process #{pid} to abort step #{Log.fingerprint self}" begin s = Misc.abort_child pid, true Log.medium "Aborted pid #{path} #{s}" rescue Log.debug("Aborted job #{pid} was not killed: #{$!.message}") end else while @result && streaming? && stream = self.stream stream.abort(exception) end @take_stream.abort(exception) if streaming? end end |
#abort_dependencies ⇒ Object
102 103 104 |
# File 'lib/scout/workflow/step/dependencies.rb', line 102 def abort_dependencies all_dependencies.each{|dep| dep.abort if dep.running? } end |
#aborted? ⇒ Boolean
179 180 181 |
# File 'lib/scout/workflow/step/info.rb', line 179 def aborted? status == :aborted || status == 'aborted' end |
#alias? ⇒ Boolean
445 446 447 |
# File 'lib/scout/workflow/step.rb', line 445 def alias? task.alias? if task end |
#all_dependencies ⇒ Object
67 68 69 70 71 72 73 74 |
# File 'lib/scout/workflow/step/dependencies.rb', line 67 def all_dependencies @all_dependencies ||= begin all_dependencies = [] all_dependencies += dependencies if dependencies all_dependencies += input_dependencies if input_dependencies all_dependencies end end |
#archive_deps(jobs = nil) ⇒ Object
29 30 31 32 33 34 35 36 37 38 39 40 41 |
# File 'lib/scout/workflow/step/archive.rb', line 29 def archive_deps(jobs = nil) jobs = dependencies if jobs.nil? archived_info = jobs.inject({}) do |acc,dep| next acc unless Open.exists?(dep.info_file) acc[dep.path] = dep.info acc.merge!(dep.archived_info) acc end self.set_info :archived_info, archived_info self.set_info :archived_dependencies, info[:dependencies] end |
#archived_info ⇒ Object
2 3 4 5 |
# File 'lib/scout/workflow/step/archive.rb', line 2 def archived_info return {} unless Open.exists?(info_file) info[:archived_info] || {} end |
#archived_inputs ⇒ Object
7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
# File 'lib/scout/workflow/step/archive.rb', line 7 def archived_inputs return [] unless info[:archived_dependencies] archived_info = self.archived_info all_inputs = NamedArray.setup([],[]) deps = info[:archived_dependencies].dup seen = [] while path = deps.pop dep_info = archived_info[path] if Hash === dep_info dep_inputs = dep_info[:inputs] NamedArray.setup(dep_inputs, dep_info[:input_names]) all_inputs.concat(dep_inputs) deps.concat(dep_info[:dependencies].collect{|p| p.last } - seen) if dep_info[:dependencies] deps.concat(dep_info[:archived_dependencies].collect{|p| p.last } - seen) if dep_info[:archived_dependencies] end seen << path end all_inputs end |
#bundle_files ⇒ Object
33 34 35 |
# File 'lib/scout/workflow/step/file.rb', line 33 def bundle_files [path, info_file, files_dir.glob("**/*")].flatten.select{|f| Open.exist?(f) } end |
#canfail? ⇒ Boolean
65 66 67 |
# File 'lib/scout/workflow/step/status.rb', line 65 def canfail? @compute && @compute[self.path] && @compute[self.path].include?(:canfail) end |
#child(&block) ⇒ Object
2 3 4 5 6 7 8 9 10 11 12 |
# File 'lib/scout/workflow/step/children.rb', line 2 def child(&block) child_pid = Process.fork &block children_pids = info[:children_pids] if children_pids.nil? children_pids = [child_pid] else children_pids << child_pid end set_info :children_pids, children_pids child_pid end |
#clean ⇒ Object
39 40 41 42 43 44 45 46 47 48 49 50 51 |
# File 'lib/scout/workflow/step/status.rb', line 39 def clean Log.debug "Cleaning job files: #{path}" @take_stream = nil @result = nil @info = nil @info_load_time = nil @done = nil Open.rm path if Open.exist_or_link?(path) Open.rm tmp_path if Open.exist_or_link?(tmp_path) Open.rm info_file if Open.exist_or_link?(info_file) Open.rm_rf files_dir if Open.exist_or_link?(files_dir) self end |
#clean_name ⇒ Object
96 97 98 99 100 101 102 103 |
# File 'lib/scout/workflow/step.rb', line 96 def clean_name return @id if @id return info[:clean_name] if info.include? :clean_name if m = name.match(/(.+?)(?:_[a-z0-9]{32})?(?:\..*)?$/) return m[1] end return name.split(".").first end |
#cmd(*args) ⇒ Object
14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 |
# File 'lib/scout/workflow/step/children.rb', line 14 def cmd(*args) all_args = *args all_args << {} unless Hash === all_args.last level = all_args.last[:log] || 0 level = 0 if TrueClass === level level = 10 if FalseClass === level level = level.to_i all_args.last[:log] = true all_args.last[:pipe] = true io = CMD.cmd(*all_args) child_pid = io.pids.first children_pids = info[:children_pids] if children_pids.nil? children_pids = [child_pid] else children_pids << child_pid end set_info :children_pids, children_pids while c = io.getc STDERR << c if Log.severity <= level if c == "\n" Log.logn "STDOUT [#{child_pid}]: ", level end end io.join nil end |
#config(key, *tokens) ⇒ Object
4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
# File 'lib/scout/workflow/step/config.rb', line 4 def config(key, *tokens) = tokens.pop if Hash === tokens.last ||= {} new_tokens = [] if workflow workflow_name = workflow.name new_tokens << ("workflow:" + workflow_name) new_tokens << ("task:" + workflow_name << "#" << task_name.to_s) end new_tokens << ("task:" + task_name.to_s) new_tokens << (task_name.to_s) new_tokens << (workflow_name) new_tokens << ("task") Scout::Config.get(key, tokens + new_tokens, ) end |
#consume_all_streams ⇒ Object
346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 |
# File 'lib/scout/workflow/step.rb', line 346 def consume_all_streams threads = [] while @result && streaming? && stream = self.stream threads << Open.consume_stream(stream, true) end threads.compact! threads.each do |t| begin t.join rescue Exception threads.compact.each{|t| t.raise(Aborted); t.join } raise $! end end end |
#copy_linked_files_dir ⇒ Object
37 38 39 40 41 42 43 44 45 46 47 |
# File 'lib/scout/workflow/step/file.rb', line 37 def copy_linked_files_dir if File.symlink?(self.files_dir) begin realpath = Open.realpath(self.files_dir) Open.rm self.files_dir Open.link_dir realpath, self.files_dir rescue Log.warn "Copy files_dir for #{self.workflow_short_path} failed: " + $!. end end end |
#digest_str ⇒ Object
432 433 434 |
# File 'lib/scout/workflow/step.rb', line 432 def digest_str "Step: " + identify_path end |
#dirty? ⇒ Boolean
81 82 83 |
# File 'lib/scout/workflow/step/status.rb', line 81 def dirty? done? && ! updated? end |
#done? ⇒ Boolean
311 312 313 |
# File 'lib/scout/workflow/step.rb', line 311 def done? @done ||= Open.exist?(path) end |
#error? ⇒ Boolean
175 176 177 |
# File 'lib/scout/workflow/step/info.rb', line 175 def error? status == :error || status == 'error' end |
#exception ⇒ Object
203 204 205 206 207 208 209 210 211 |
# File 'lib/scout/workflow/step/info.rb', line 203 def exception return nil unless info[:exception] begin Marshal.load(Base64.decode64(info[:exception])) rescue Log.exception $! return Exception.new .last end end |
#exec ⇒ Object
127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 |
# File 'lib/scout/workflow/step.rb', line 127 def exec if inputs if Task === task types = task.inputs.collect{|name,type| type } new_inputs = inputs.zip(types).collect{|input,info| type, desc, default, = info next input unless Step === input input.join if input.streaming? Task.format_input(input.join.path, type, ) } else if Hash === inputs new_inputs = inputs.values else new_inputs = inputs end new_inputs = new_inputs.collect{|input| Step === input ? input.load : input } end inputs = new_inputs end @result = begin @in_exec = true @exec_context.instance_exec(*inputs, &task) ensure @in_exec = false end end |
#file(file = nil) ⇒ Object
20 21 22 23 24 25 |
# File 'lib/scout/workflow/step/file.rb', line 20 def file(file = nil) dir = files_dir Path.setup(dir) unless Path === dir return dir if file.nil? dir[file] end |
#files ⇒ Object
27 28 29 30 31 |
# File 'lib/scout/workflow/step/file.rb', line 27 def files files_dir.glob("**/*").reject{|path| File.directory? path }.collect do |path| Misc.path_relative_to(files_dir, path) end end |
#files_dir ⇒ Object
2 3 4 5 6 7 8 9 10 11 12 13 14 |
# File 'lib/scout/workflow/step/file.rb', line 2 def files_dir @files_dir ||= begin dir = @path + ".files" if Path === @path @path.annotate(dir) else Path.setup(dir) end dir = dir.find dir.pkgdir = self dir end end |
#files_dir=(dir) ⇒ Object
16 17 18 |
# File 'lib/scout/workflow/step/file.rb', line 16 def files_dir=(dir) @files_dir = dir end |
#fingerprint ⇒ Object
436 437 438 |
# File 'lib/scout/workflow/step.rb', line 436 def fingerprint digest_str end |
#fork(noload = false, semaphore = nil) ⇒ Object
290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 |
# File 'lib/scout/workflow/step.rb', line 290 def fork(noload = false, semaphore = nil) pid = Process.fork do Signal.trap(:TERM) do raise Aborted, "Recieved TERM Signal on forked process #{Process.pid}" end reset_info status: :queue, pid: Process.pid unless present? if semaphore ScoutSemaphore.synchronize(semaphore) do run(noload) end else run(noload) end join exit! 0 end Process.detach pid grace self end |
#full_task_name ⇒ Object
121 122 123 124 125 |
# File 'lib/scout/workflow/step.rb', line 121 def full_task_name return nil if task_name.nil? return task_name.to_s if workflow.nil? [workflow, task_name] * "#" end |
#grace ⇒ Object
370 371 372 373 374 375 |
# File 'lib/scout/workflow/step.rb', line 370 def grace while ! present? sleep 0.1 end self end |
#identify_path ⇒ Object
428 429 430 |
# File 'lib/scout/workflow/step.rb', line 428 def identify_path Resource.identify path end |
#info ⇒ Object
45 46 47 48 49 50 51 52 53 54 55 56 57 |
# File 'lib/scout/workflow/step/info.rb', line 45 def info outdated = begin @info_load_time && (mtime = Open.mtime(info_file)) && mtime > @info_load_time rescue true end if @info.nil? || outdated load_info end @info end |
#info_file ⇒ Object
5 6 7 8 9 10 11 12 |
# File 'lib/scout/workflow/step/info.rb', line 5 def info_file return nil if @path.nil? @info_file ||= begin info_file = @path + ".info" @path.annotate info_file if Path === @path info_file end end |
#init_info(status = :waiting) ⇒ Object
41 42 43 |
# File 'lib/scout/workflow/step/info.rb', line 41 def init_info(status=:waiting) log status unless info_file.nil? || Open.exists?(info_file) end |
#input_dependencies ⇒ Object
24 25 26 27 28 29 30 31 32 33 34 35 |
# File 'lib/scout/workflow/step/dependencies.rb', line 24 def input_dependencies return [] unless inputs inputs.collect do |d| if Step === d d elsif (Path === d) && (Step === d.pkgdir) d.pkgdir else nil end end.compact.uniq end |
#join ⇒ Object
381 382 383 384 385 386 387 388 389 390 391 392 393 394 |
# File 'lib/scout/workflow/step.rb', line 381 def join consume_all_streams while @result.nil? && (present? && ! (terminated? || done?)) sleep 0.1 end Misc.wait_child info[:pid] if info[:pid] raise self.exception if self.exception raise "Error in job #{self.path}" if self.error? or self.aborted? self end |
#load ⇒ Object
407 408 409 410 411 |
# File 'lib/scout/workflow/step.rb', line 407 def load return @result unless @result.nil? || streaming? join done? ? Persist.load(path, type) : exec end |
#load_info ⇒ Object
27 28 29 30 |
# File 'lib/scout/workflow/step/info.rb', line 27 def load_info @info = Step.load_info(info_file) @info_load_time = Time.now end |
#log(status, message = nil, &block) ⇒ Object
149 150 151 152 153 154 155 156 157 158 159 160 161 |
# File 'lib/scout/workflow/step/info.rb', line 149 def log(status, = nil, &block) if block_given? time = Misc.exec_time &block time_str = Misc.format_seconds_short time = .nil? ? Log.color(:time, time_str) : "#{Log.color :time, time_str} - #{ message }" end if merge_info :status => status, :message => else merge_info :status => status end end |
#marshal_load(path) ⇒ Object
222 223 224 |
# File 'lib/scout/workflow/step/info.rb', line 222 def marshal_load(path) Step.new path end |
#merge_info(new_info) ⇒ Object
68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 |
# File 'lib/scout/workflow/step/info.rb', line 68 def merge_info(new_info) info = self.info new_info.each do |key,value| value = Annotation.purge(value) if key == :status = new_info[:message] if .nil? && (value == :done || value == :error || value == :aborted) issued = info[:issued] start = info[:start] eend = new_info[:end] start = Time.parse start if String === start eend = Time.parse eend if String === eend issued = Time.parse issued if String === issued if start && eend time = eend - start total_time = eend - issued if total_time - time > 1 time_str = "#{Misc.format_seconds_short(time)} (#{Misc.format_seconds_short(total_time)})" else time_str = Misc.format_seconds_short(time) end info[:time_elapsed] = time info[:total_time_elapsed] = total_time = Log.color(:time, time_str) end end report_status value, end if key == :message = info[:messages] || [] << value info[:messages] = next end if Exception === value begin Marshal.dump(value) rescue TypeError if ScoutException === value new = ScoutException.new value. else new = Exception.new value. end new.set_backtrace(value.backtrace) value = new end end if info.include?(key) case info[key] when Array info[key].concat(Array === value ? value : [value]) when Hash info[key].merge! value else info[key] = value end else info[key] = value end end save_info(info) end |
#message(message) ⇒ Object
167 168 169 |
# File 'lib/scout/workflow/step/info.rb', line 167 def () merge_info :message => end |
#messages ⇒ Object
163 164 165 |
# File 'lib/scout/workflow/step/info.rb', line 163 def info[:messages] end |
#monitor_stream(stream, options = {}, &block) ⇒ Object
29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 |
# File 'lib/scout/workflow/step/progress.rb', line 29 def monitor_stream(stream, = {}, &block) case [:bar] when TrueClass = when Hash = [:bar] when Numeric = :max => [:bar] else = [:bar] end out = if .nil? Open.line_monitor_stream stream, &block elsif (block.nil? || block.arity == 0) Open.line_monitor_stream stream do .tick end elsif block.arity == 1 Open.line_monitor_stream stream do |line| .tick block.call line end elsif block.arity == 2 Open.line_monitor_stream stream do |line| block.call line, end end if ConcurrentStream.setup(out, :abort_callback => Proc.new{ .done Log::ProgressBar.(, true) }, :callback => Proc.new{ .done Log::ProgressBar.() }) end bgzip = ([:compress] || [:gzip]).to_s == 'bgzip' bgzip = true if [:bgzip] gzip = true if [:compress] || [:gzip] if bgzip Open.bgzip(out) elsif gzip Open.gzip(out) else out end end |
#name ⇒ Object
88 89 90 |
# File 'lib/scout/workflow/step.rb', line 88 def name @name ||= File.basename(@path) end |
#newer_dependencies ⇒ Object
23 24 25 26 27 28 |
# File 'lib/scout/workflow/step/status.rb', line 23 def newer_dependencies newer = rec_dependencies.select{|dep| Path.newer?(self.path, dep.path) } newer += input_dependencies.select{|dep| Path.newer?(self.path, dep.path) } newer += rec_dependencies.collect{|dep| dep.input_dependencies }.flatten.select{|dep| Path.newer?(self.path, dep.path) } newer end |
#overriden? ⇒ Boolean
187 188 189 190 |
# File 'lib/scout/workflow/step/info.rb', line 187 def overriden? @overriden = overriden_task || overriden_workflow || overriden_deps.any? if @overriden.nil? @overriden end |
#overriden_deps ⇒ Object
192 193 194 |
# File 'lib/scout/workflow/step/info.rb', line 192 def overriden_deps dependencies.select{|d| d.overriden? } end |
#pid ⇒ Object
59 60 61 |
# File 'lib/scout/workflow/step/info.rb', line 59 def pid info[:pid] end |
#pid=(pid) ⇒ Object
63 64 65 |
# File 'lib/scout/workflow/step/info.rb', line 63 def pid=(pid) set_info :pid, pid end |
#prepare_dependencies ⇒ Object
37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 |
# File 'lib/scout/workflow/step/dependencies.rb', line 37 def prepare_dependencies inverse_dep = {} dependencies.each do |dep| if dep.present? && ! dep.updated? Log.medium "Clean outdated #{dep.path}" dep.clean end next if dep.done? next if dep.error? && ! dep.recoverable_error? if dep.dependencies dep.dependencies.each do |d| inverse_dep[d] ||= [] inverse_dep[d] << dep end end input_dependencies.each do |d| inverse_dep[d] ||= [] inverse_dep[d] << dep end end if dependencies inverse_dep.each do |dep,list| dep.tee_copies = list.length end end |
#present? ⇒ Boolean
364 365 366 367 368 |
# File 'lib/scout/workflow/step.rb', line 364 def present? Open.exist?(path) || Open.exist?(info_file) || Open.exist?(files_dir) end |
#produce(with_fork: false) ⇒ Object
396 397 398 399 400 401 402 403 404 405 |
# File 'lib/scout/workflow/step.rb', line 396 def produce(with_fork: false) clean if error? && recoverable_error? if with_fork self.fork self.join else run(:no_load) end self end |
#progress_bar(msg = "Progress", options = nil, &block) ⇒ Object
2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
# File 'lib/scout/workflow/step/progress.rb', line 2 def (msg = "Progress", = nil, &block) if Hash === msg and .nil? = msg msg = nil end = {} if .nil? max = [:max] Open.mkdir files_dir = Log::ProgressBar.(max, {:desc => msg, :file => (@exec ? nil : file(:progress))}.merge()) if block_given? .init res = yield .remove res else end end |
#rec_dependencies(connected = false, seen = Set.new) ⇒ Object
2 3 4 5 6 7 8 9 10 11 12 13 14 |
# File 'lib/scout/workflow/step/dependencies.rb', line 2 def rec_dependencies(connected = false, seen = Set.new) @rec_dependencies = {} @rec_dependencies[connected] ||= begin direct_deps = [] dependencies.each do |dep| next if seen.include? dep.path next if connected && dep.done? && dep.updated? direct_deps << dep end if dependencies seen += direct_deps.collect{|d| d.path } direct_deps.inject(Set.new(direct_deps)){|acc,d| acc += d.rec_dependencies(connected, seen) } end end |
#recoverable_error? ⇒ Boolean
19 20 21 |
# File 'lib/scout/workflow/step/status.rb', line 19 def recoverable_error? self.error? && ! (ScoutException === self.exception) end |
#recursive_clean ⇒ Object
58 59 60 61 62 63 |
# File 'lib/scout/workflow/step/status.rb', line 58 def recursive_clean dependencies.each do |dep| dep.recursive_clean end clean end |
#recursive_inputs ⇒ Object
16 17 18 19 20 21 22 |
# File 'lib/scout/workflow/step/dependencies.rb', line 16 def recursive_inputs recursive_inputs = NamedArray === inputs ? inputs.to_hash : {} return recursive_inputs if dependencies.nil? dependencies.inject(recursive_inputs) do |acc,dep| acc = dep.recursive_inputs.merge(acc) end end |
#recursive_overriden_deps ⇒ Object
196 197 198 199 200 201 |
# File 'lib/scout/workflow/step/info.rb', line 196 def recursive_overriden_deps overriden_deps = dependencies.select{|d| d.overriden? } (overriden_deps + overriden_deps.collect{|d| d.recursive_overriden_deps }).flatten.uniq end |
#report_status(status, message = nil) ⇒ Object
140 141 142 143 144 145 146 147 |
# File 'lib/scout/workflow/step/info.rb', line 140 def report_status(status, = nil) if .nil? Log.info [Log.color(:status, status, true), Log.color(:task, task_name, true), Log.color(:path, path)] * " " else = Log.fingerprint(.split("\n").first).sub(/^'/,'').sub(/'$/,'') Log.info [Log.color(:status, status, true), Log.color(:task, task_name, true), , Log.color(:path, path)] * " " end end |
#reset_info(info = {}) ⇒ Object
37 38 39 |
# File 'lib/scout/workflow/step/info.rb', line 37 def reset_info(info = {}) save_info(@info = info) end |
#run(stream = false) ⇒ Object
170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 |
# File 'lib/scout/workflow/step.rb', line 170 def run(stream = false) case stream when TrueClass, :stream no_load = :stream when :no_load no_load = true else no_load = false end if done? if no_load if no_load == :stream return self.stream else return self.path end else return @result || self.load end end return @result || self.load if done? prepare_dependencies begin @result = Persist.persist(name, type, :path => path, :tee_copies => tee_copies, no_load: no_load) do input_names = (task.respond_to?(:inputs) && task.inputs) ? task.inputs.collect{|name,_| name} : [] reset_info :status => :setup, :issued => Time.now, :pid => Process.pid, :pid_hostname => Misc.hostname, :task_name => task_name, :workflow => workflow.to_s, :provided_inputs => Annotation.purge(provided_inputs), :non_default_inputs => non_default_inputs, :inputs => Annotation.purge(inputs), :input_names => input_names, :type => type, :dependencies => (dependencies || []) .collect{|d| d.path } run_dependencies set_info :start, Time.now log :start @exec_result = exec if @exec_result.nil? && Open.exist?(self.tmp_path) && ! Open.exist?(self.path) Open.mv self.tmp_path, self.path else @exec_result = @exec_result.stream if @exec_result.respond_to?(:stream) && ! (TSV === @exec_result) end @exec_result if (IO === @exec_result || StringIO === @exec_result) && (ENV["SCOUT_NO_STREAM"] == "true" || ! stream) Open.sensible_write(self.path, @exec_result) @exec_result = nil else @exec_result end end if TrueClass === no_load consume_all_streams if streaming? @result = nil elsif no_load && ! (IO === @result) @result = nil end @result rescue Exception => e begin begin if ConcurrentStreamProcessFailed === e s = e.concurrent_stream e.concurrent_stream = nil exception_encoded = Base64.encode64(Marshal.dump(e)) e.concurrent_stream = s else exception_encoded = Base64.encode64(Marshal.dump(e)) end merge_info :status => :error, :exception => exception_encoded, :end => Time.now, :backtrace => e.backtrace, :message => "#{e.class}: #{e.message}" rescue Exception exception_encoded = Base64.encode64(Marshal.dump(Exception.new(e.))) merge_info :status => :error, :exception => exception_encoded, :end => Time.now, :backtrace => e.backtrace, :message => "#{e.class}: #{e.message}" end abort_dependencies ensure raise e end ensure if ! (error? || aborted?) if @result && streaming? ConcurrentStream.setup(@result) do merge_info :status => :done, :end => Time.now end @result.abort_callback = proc do |exception| Open.rm self.path if exception.nil? || Aborted === exception || Interrupt === exception merge_info :status => :aborted, :end => Time.now else begin merge_info :status => :error, :exception => exception, :end => Time.now rescue Exception Log.exception $! end end end log :streaming else merge_info :status => :done, :end => Time.now end end end end |
#run_dependencies ⇒ Object
76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 |
# File 'lib/scout/workflow/step/dependencies.rb', line 76 def run_dependencies all_dependencies.each do |dep| next if dep.running? || dep.done? next if dep.error? && ! dep.recoverable_error? = compute[dep.path] if compute = [] if .nil? next if .include?(false) stream = .include?(:stream) stream = true unless ENV["SCOUT_EXPLICIT_STREAMING"] == 'true' stream = :no_load if .include?(:produce) begin dep.run(stream) rescue ScoutException if .include?(:canfail) Log.medium "Allow failing of #{dep.path}" else raise $! end end end end |
#running? ⇒ Boolean
183 184 185 |
# File 'lib/scout/workflow/step/info.rb', line 183 def running? ! (done? && status == :done) && (info[:pid] && Misc.pid_alive?(info[:pid])) end |
#save_info(info = nil) ⇒ Object
32 33 34 35 |
# File 'lib/scout/workflow/step/info.rb', line 32 def save_info(info = nil) Persist.save(@info = info, info_file, SERIALIZER) @info_load_time = Time.now end |
#save_input_bundle(input_bundle) ⇒ Object
14 15 16 17 18 19 20 21 22 23 |
# File 'lib/scout/workflow/step/inputs.rb', line 14 def save_input_bundle(input_bundle) TmpFile.with_dir do |dir| TmpFile.with_file do |tmp_tar| save_inputs(dir) Open.mkdir File.dirname(input_bundle) Misc.tarize dir, tmp_tar Open.mv tmp_tar, input_bundle end end end |
#save_inputs(inputs_dir) ⇒ Object
2 3 4 5 6 7 8 9 10 11 12 |
# File 'lib/scout/workflow/step/inputs.rb', line 2 def save_inputs(inputs_dir) if clean_name != name #hash = name[clean_name.length..-1] #inputs_dir += hash Log.medium "Saving job inputs to: #{Log.fingerprint inputs_dir}" self.task.save_inputs(inputs_dir, provided_inputs) else Log.medium "Saving no input job: #{Log.fingerprint inputs_dir}" Open.touch(inputs_dir) end end |
#set_info(key, value) ⇒ Object
136 137 138 |
# File 'lib/scout/workflow/step/info.rb', line 136 def set_info(key, value) merge_info(key => value) end |
#short_path ⇒ Object
92 93 94 |
# File 'lib/scout/workflow/step.rb', line 92 def short_path [workflow.to_s, task_name, name] * "/" end |
#started? ⇒ Boolean
69 70 71 72 73 74 75 |
# File 'lib/scout/workflow/step/status.rb', line 69 def started? return true if done? return false unless Open.exist?(info_file) pid = info[:pid] return false unless pid return Misc.pid_alive?(pid) end |
#status ⇒ Object
171 172 173 |
# File 'lib/scout/workflow/step/info.rb', line 171 def status info[:status].tap{|s| s.nil? ? s : s.to_sym } end |
#step(task_name) ⇒ Object
413 414 415 416 417 418 419 420 421 422 423 424 425 426 |
# File 'lib/scout/workflow/step.rb', line 413 def step(task_name) task_name = task_name.to_sym dependencies.each do |dep| return dep if dep.task_name && dep.task_name.to_sym == task_name return dep if dep.overriden_task && dep.overriden_task.to_sym == task_name end dependencies.each do |dep| return dep if dep.task_name && dep.task_name.to_sym == task_name return dep if dep.overriden_task && dep.overriden_task.to_sym == task_name rec_dep = dep.step(task_name) return rec_dep if rec_dep end nil end |
#stream ⇒ Object
319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 |
# File 'lib/scout/workflow/step.rb', line 319 def stream synchronize do if streaming? && ! @result.nil? if @result.next Log.debug "Taking result #{Log.fingerprint @result} next #{Log.fingerprint @result.next}" else Log.debug "Taking result #{Log.fingerprint @result}" end @take_stream, @result = @result, @result.next return @take_stream end end if done? Open.open(self.path) else if running? || waiting? join Open.open(self.path) else exec end end end |
#streaming? ⇒ Boolean
315 316 317 |
# File 'lib/scout/workflow/step.rb', line 315 def streaming? @take_stream || IO === @result || StringIO === @result end |
#synchronize(&block) ⇒ Object
33 34 35 |
# File 'lib/scout/workflow/step.rb', line 33 def synchronize(&block) @mutex.synchronize(&block) end |
#task_signature ⇒ Object
440 441 442 443 |
# File 'lib/scout/workflow/step.rb', line 440 def task_signature workflow_name = String === workflow ? workflow : workflow.name [workflow_name, task_name] * "#" end |
#terminated? ⇒ Boolean
377 378 379 |
# File 'lib/scout/workflow/step.rb', line 377 def terminated? ! @in_exec && (done? || error? || aborted?) end |
#tmp_path ⇒ Object
159 160 161 162 163 164 165 166 167 |
# File 'lib/scout/workflow/step.rb', line 159 def tmp_path @tmp_path ||= begin basename = File.basename(@path) dirname = File.dirname(@path) tmp_path = File.join(dirname, '.' + basename) @path.setup(tmp_path) if Path === @path tmp_path end end |
#to_json ⇒ Object
21 22 23 |
# File 'lib/scout/workflow/step/load.rb', line 21 def to_json(...) self.path.to_json end |
#traverse(obj, desc: nil, **kwargs, &block) ⇒ Object
23 24 25 26 27 |
# File 'lib/scout/workflow/step/progress.rb', line 23 def traverse(obj, desc: nil , **kwargs, &block) desc = "Processing #{self.short_path}" if desc.nil? kwargs[:bar] = self.(desc) unless kwargs.include?(:bar) TSV.traverse obj, **kwargs, &block end |
#updated? ⇒ Boolean
30 31 32 33 34 35 36 37 |
# File 'lib/scout/workflow/step/status.rb', line 30 def updated? return false if self.error? && self.recoverable_error? return true if (self.done? || (self.error? && ! self.recoverable_error?)) && ! ENV["SCOUT_UPDATE"] newer = newer_dependencies Log.low "Newer deps found for #{Log.fingerprint self}: #{Log.fingerprint newer}" if newer.any? newer.empty? end |
#waiting? ⇒ Boolean
77 78 79 |
# File 'lib/scout/workflow/step/status.rb', line 77 def waiting? present? and not started? end |