Module: Workflow
- Includes:
- AnnotatedModule
- Defined in:
- lib/rbbt/workflow.rb,
lib/rbbt/workflow/doc.rb,
lib/rbbt/workflow/usage.rb,
lib/rbbt/workflow/accessor.rb,
lib/rbbt/workflow/definition.rb
Defined Under Namespace
Classes: TaskNotFoundException
Constant Summary collapse
- DEFAULT_NAME =
{{{ JOB MANAGEMENT
"Default"
- TAG =
:hash
Class Attribute Summary collapse
-
.autoinstall ⇒ Object
Returns the value of attribute autoinstall.
-
.workflows ⇒ Object
Returns the value of attribute workflows.
Instance Attribute Summary collapse
-
#asynchronous_exports ⇒ Object
Returns the value of attribute asynchronous_exports.
-
#description ⇒ Object
Returns the value of attribute description.
-
#documentation ⇒ Object
Returns the value of attribute documentation.
-
#exec_exports ⇒ Object
Returns the value of attribute exec_exports.
-
#helpers ⇒ Object
Returns the value of attribute helpers.
-
#last_task ⇒ Object
Returns the value of attribute last_task.
-
#libdir ⇒ Object
Returns the value of attribute libdir.
-
#synchronous_exports ⇒ Object
Returns the value of attribute synchronous_exports.
-
#task_dependencies ⇒ Object
Returns the value of attribute task_dependencies.
-
#task_description ⇒ Object
Returns the value of attribute task_description.
-
#tasks ⇒ Object
Returns the value of attribute tasks.
-
#workdir ⇒ Object
Returns the value of attribute workdir.
Class Method Summary collapse
- .doc_parse_chunks(str, pattern) ⇒ Object
- .doc_parse_first_line(str) ⇒ Object
- .doc_parse_up_to(str, pattern, keep = false) ⇒ Object
- .extended(base) ⇒ Object
- .load_inputs(dir, input_names, input_types) ⇒ Object
- .load_workflow_file(filename) ⇒ Object
- .parse_workflow_doc(doc) ⇒ Object
- .require_local_workflow(wf_name) ⇒ Object
- .require_remote_workflow(wf_name, url) ⇒ Object
- .require_workflow(wf_name) ⇒ Object
- .resolve_locals(inputs) ⇒ Object
- .workflow_dir ⇒ Object
Instance Method Summary collapse
- #dep(*dependency_list, &block) ⇒ Object
- #desc(description) ⇒ Object
- #doc(task = nil) ⇒ Object
- #documentation_markdown ⇒ Object
- #export_asynchronous(*names) ⇒ Object
- #export_exec(*names) ⇒ Object
- #export_synchronous(*names) ⇒ Object
- #extension(extension) ⇒ Object
- #helper(name, &block) ⇒ Object
- #id_for(path) ⇒ Object
- #job(taskname, jobname = nil, inputs = {}) ⇒ Object
- #jobs(taskname, query = nil) ⇒ Object
- #load_documentation ⇒ Object
- #load_id(id) ⇒ Object
- #load_name(task, name) ⇒ Object
- #load_step(path) ⇒ Object
-
#local_persist_setup ⇒ Object
{{{ Make workflow resources local.
- #local_workdir_setup ⇒ Object
- #log(status, message = nil, &block) ⇒ Object
- #make_local ⇒ Object
- #real_dependencies(task, jobname, inputs, dependencies) ⇒ Object
- #rec_dependencies(taskname) ⇒ Object
- #rec_input_defaults(taskname) ⇒ Object
- #rec_input_descriptions(taskname) ⇒ Object
- #rec_input_options(taskname) ⇒ Object
- #rec_input_types(taskname) ⇒ Object
- #rec_inputs(taskname) ⇒ Object
- #returns(description) ⇒ Object
- #step_path(taskname, jobname, inputs, dependencies, extension = nil) ⇒ Object
- #task(name, &block) ⇒ Object
- #task_for(path) ⇒ Object
- #task_info(name) ⇒ Object
- #with_workdir(workdir) ⇒ Object
Methods included from AnnotatedModule
add_consummable_annotation, #input
Class Attribute Details
.autoinstall ⇒ Object
Returns the value of attribute autoinstall.
21 22 23 |
# File 'lib/rbbt/workflow.rb', line 21 def autoinstall @autoinstall end |
.workflows ⇒ Object
Returns the value of attribute workflows.
21 22 23 |
# File 'lib/rbbt/workflow.rb', line 21 def workflows @workflows end |
Instance Attribute Details
#asynchronous_exports ⇒ Object
Returns the value of attribute asynchronous_exports.
152 153 154 |
# File 'lib/rbbt/workflow.rb', line 152 def asynchronous_exports @asynchronous_exports end |
#description ⇒ Object
Returns the value of attribute description.
148 149 150 |
# File 'lib/rbbt/workflow.rb', line 148 def description @description end |
#documentation ⇒ Object
Returns the value of attribute documentation.
54 55 56 |
# File 'lib/rbbt/workflow/doc.rb', line 54 def documentation @documentation end |
#exec_exports ⇒ Object
Returns the value of attribute exec_exports.
152 153 154 |
# File 'lib/rbbt/workflow.rb', line 152 def exec_exports @exec_exports end |
#helpers ⇒ Object
Returns the value of attribute helpers.
150 151 152 |
# File 'lib/rbbt/workflow.rb', line 150 def helpers @helpers end |
#last_task ⇒ Object
Returns the value of attribute last_task.
151 152 153 |
# File 'lib/rbbt/workflow.rb', line 151 def last_task @last_task end |
#libdir ⇒ Object
Returns the value of attribute libdir.
149 150 151 |
# File 'lib/rbbt/workflow.rb', line 149 def libdir @libdir end |
#synchronous_exports ⇒ Object
Returns the value of attribute synchronous_exports.
152 153 154 |
# File 'lib/rbbt/workflow.rb', line 152 def synchronous_exports @synchronous_exports end |
#task_dependencies ⇒ Object
Returns the value of attribute task_dependencies.
151 152 153 |
# File 'lib/rbbt/workflow.rb', line 151 def task_dependencies @task_dependencies end |
#task_description ⇒ Object
Returns the value of attribute task_description.
151 152 153 |
# File 'lib/rbbt/workflow.rb', line 151 def task_description @task_description end |
#tasks ⇒ Object
Returns the value of attribute tasks.
150 151 152 |
# File 'lib/rbbt/workflow.rb', line 150 def tasks @tasks end |
#workdir ⇒ Object
Returns the value of attribute workdir.
149 150 151 |
# File 'lib/rbbt/workflow.rb', line 149 def workdir @workdir end |
Class Method Details
.doc_parse_chunks(str, pattern) ⇒ Object
21 22 23 24 25 26 27 |
# File 'lib/rbbt/workflow/doc.rb', line 21 def self.doc_parse_chunks(str, pattern) parts = str.split(pattern) return {} if parts.length < 2 tasks = Hash[*parts[1..-1].collect{|v| v.strip}] tasks.delete_if{|t,d| d.empty?} tasks end |
.doc_parse_first_line(str) ⇒ Object
3 4 5 6 7 8 9 10 |
# File 'lib/rbbt/workflow/doc.rb', line 3 def self.doc_parse_first_line(str) if str.match(/^([^\n]*)\n\n(.*)/sm) str.replace $2 $1 else "" end end |
.doc_parse_up_to(str, pattern, keep = false) ⇒ Object
12 13 14 15 16 17 18 19 |
# File 'lib/rbbt/workflow/doc.rb', line 12 def self.doc_parse_up_to(str, pattern, keep = false) pre, _pat, _post = str.partition pattern if _pat [pre, (keep ? _pat << _post : _post)] else _post end end |
.extended(base) ⇒ Object
29 30 31 32 |
# File 'lib/rbbt/workflow.rb', line 29 def self.extended(base) self.workflows << base base.libdir = Path.setup(Path.caller_lib_dir).tap{|p| p.resource = base} end |
.load_inputs(dir, input_names, input_types) ⇒ Object
243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 |
# File 'lib/rbbt/workflow/accessor.rb', line 243 def self.load_inputs(dir, input_names, input_types) inputs = {} dir = Path.setup(dir.dup) input_names.each do |input| file = dir[input].find Log.debug "Trying #{ input }: #{file}" next unless file.exists? case input_types[input] when :tsv, :array, :text Log.debug "Pointing #{ input } to #{file}" inputs[input.to_sym] = file when :boolean inputs[input.to_sym] = (file.read.strip == 'true') else Log.debug "Loading #{ input } from #{file}" inputs[input.to_sym] = file.read.strip end end IndiferentHash.setup(inputs) end |
.load_workflow_file(filename) ⇒ Object
39 40 41 42 43 44 45 46 47 48 49 50 51 52 |
# File 'lib/rbbt/workflow.rb', line 39 def self.load_workflow_file(filename) begin $LOAD_PATH.unshift(File.join(File.dirname(File.(filename)), 'lib')) filename = File.(filename) require filename Log.debug{"Workflow loaded from: #{ filename }"} return true rescue Exception Log.warn{"Error loading workflow: #{ filename }"} raise $! end end |
.parse_workflow_doc(doc) ⇒ Object
29 30 31 32 33 34 35 |
# File 'lib/rbbt/workflow/doc.rb', line 29 def self.parse_workflow_doc(doc) title = doc_parse_first_line doc description, task_info = doc_parse_up_to doc, /^# Tasks/i task_description, tasks = doc_parse_up_to task_info, /^##/, true tasks = doc_parse_chunks tasks, /## (.*)/ {:title => title.strip, :description => description.strip, :task_description => task_description.strip, :tasks => tasks} end |
.require_local_workflow(wf_name) ⇒ Object
67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 |
# File 'lib/rbbt/workflow.rb', line 67 def self.require_local_workflow(wf_name) filename = nil if Path === wf_name case # Points to workflow file when ((File.exists?(wf_name.find) and not File.directory?(wf_name.find)) or File.exists?(wf_name.find + '.rb')) filename = wf_name.find # Points to workflow dir when (File.exists?(wf_name.find) and File.directory?(wf_name.find) and File.exists?(File.join(wf_name.find, 'workflow.rb'))) filename = wf_name['workflow.rb'].find end else if ((File.exists?(wf_name) and not File.directory?(wf_name)) or File.exists?(wf_name + '.rb')) filename = (wf_name =~ /\.?\//) ? wf_name : "./" << wf_name else filename = workflow_dir[wf_name]['workflow.rb'].find end end if filename and File.exists? filename load_workflow_file filename else return false end end |
.require_remote_workflow(wf_name, url) ⇒ Object
34 35 36 37 |
# File 'lib/rbbt/workflow.rb', line 34 def self.require_remote_workflow(wf_name, url) require 'rbbt/rest/client' eval "Object::#{wf_name} = WorkflowRESTClient.new '#{ url }', '#{wf_name}'" end |
.require_workflow(wf_name) ⇒ Object
96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 |
# File 'lib/rbbt/workflow.rb', line 96 def self.require_workflow(wf_name) # Already loaded begin workflow = Misc.string2const wf_name Log.debug{"Workflow #{ wf_name } already loaded"} return workflow rescue Exception end # Load remotely if Rbbt.etc.remote_workflows.exists? remote_workflows = Rbbt.etc.remote_workflows.yaml if Hash === remote_workflows and remote_workflows.include?(wf_name) url = remote_workflows[wf_name] begin return require_remote_workflow(wf_name, url) ensure Log.debug{"Workflow #{ wf_name } loaded remotely: #{ url }"} end end end if Open.remote? wf_name url = wf_name wf_name = File.basename(url) begin return require_remote_workflow(wf_name, url) ensure Log.debug{"Workflow #{ wf_name } loaded remotely: #{ url }"} end end # Load locally if wf_name =~ /::\w+$/ clean_name = wf_name.sub(/::.*/,'') Log.info{"Looking for '#{wf_name}' in '#{clean_name}'"} wf_name = clean_name end Log.info{"Loading workflow #{wf_name}"} require_local_workflow(wf_name) or require_local_workflow(Misc.snake_case(wf_name)) or (Workflow.autoinstall and `rbbt workflow install #{Misc.snake_case(wf_name)}` and require_local_workflow(Misc.snake_case(wf_name))) or raise("Workflow not found or could not be loaded: #{ wf_name }") begin Misc.string2const Misc.camel_case(wf_name) rescue Workflow.workflows.last || true end end |
.resolve_locals(inputs) ⇒ Object
207 208 209 210 211 212 213 214 215 216 217 218 |
# File 'lib/rbbt/workflow.rb', line 207 def self.resolve_locals(inputs) inputs.each do |name, value| if value =~ /^local:(.*?):(.*)/ or (Array === value and value.length == 1 and value.first =~ /^local:(.*?):(.*)/) or (TSV === value and value.size == 1 and value.keys.first =~ /^local:(.*?):(.*)/) task_name = $1 jobname = $2 value = load_id(File.join(task_name, jobname)).load inputs[name] = value end end end |
.workflow_dir ⇒ Object
54 55 56 57 58 59 60 61 62 63 64 65 |
# File 'lib/rbbt/workflow.rb', line 54 def self.workflow_dir case when (defined?(Rbbt) and Rbbt.etc.workflow_dir.exists?) dir = Rbbt.etc.workflow_dir.read.strip Path.setup(dir) when defined?(Rbbt) Rbbt.workflows else dir = File.join(ENV['HOME'], '.workflows') Path.setup(dir) end end |
Instance Method Details
#dep(*dependency_list, &block) ⇒ Object
28 29 30 31 |
# File 'lib/rbbt/workflow/definition.rb', line 28 def dep(*dependency_list, &block) dependency_list << block if block_given? dependencies.concat dependency_list end |
#desc(description) ⇒ Object
16 17 18 |
# File 'lib/rbbt/workflow/definition.rb', line 16 def desc(description) @description = description end |
#doc(task = nil) ⇒ Object
42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 |
# File 'lib/rbbt/workflow/usage.rb', line 42 def doc(task = nil) if task.nil? puts Log.color :magenta, self.to_s puts Log.color :magenta, "=" * self.to_s.length if self.documentation[:description] and not self.documentation[:description].empty? puts puts Misc.format_paragraph self.documentation[:description] end puts puts Log.color :magenta, "## TASKS" if self.documentation[:task_description] and not self.documentation[:task_description].empty? puts puts Misc.format_paragraph self.documentation[:task_description] end puts tasks.each do |name,task| description = task.description || "" description = description.split("\n\n").first puts Misc.format_definition_list_item(name.to_s, description, 80, 30, :yellow) end else if Task === task task_name = task.name else task_name = task task = self.tasks[task_name] end dependencies = self.rec_dependencies(task_name).collect{|dep_name| self.tasks[dep_name.to_sym]} task.doc(dependencies) if self.libdir.examples[task_name].exists? self.libdir.examples[task_name].glob("*").each do |example_dir| example = File.basename(example_dir) puts Log.color(:magenta, "Example " << example) + " -- " + Log.color(:blue, example_dir) inputs = {} task.input_types.each do |input,type| if example_dir[input].exists? case type when :tsv, :array, :text head = example_dir[input].read.split("\n")[0..5].compact * "\n\n" head = head[0..500] puts Misc.format_definition_list_item(input, head).gsub("\n\n","\n") else puts Misc.format_definition_list_item(input, example_dir[input].read) end end end puts end end end end |
#documentation_markdown ⇒ Object
37 38 39 40 41 42 43 44 |
# File 'lib/rbbt/workflow/doc.rb', line 37 def documentation_markdown file = @libdir['workflow.md'].find if file.exists? file.read else "" end end |
#export_asynchronous(*names) ⇒ Object
69 70 71 |
# File 'lib/rbbt/workflow/definition.rb', line 69 def export_asynchronous(*names) asynchronous_exports.concat names end |
#export_exec(*names) ⇒ Object
65 66 67 |
# File 'lib/rbbt/workflow/definition.rb', line 65 def export_exec(*names) exec_exports.concat names end |
#export_synchronous(*names) ⇒ Object
73 74 75 |
# File 'lib/rbbt/workflow/definition.rb', line 73 def export_synchronous(*names) synchronous_exports.concat names end |
#extension(extension) ⇒ Object
20 21 22 |
# File 'lib/rbbt/workflow/definition.rb', line 20 def extension(extension) @extension = extension end |
#helper(name, &block) ⇒ Object
12 13 14 |
# File 'lib/rbbt/workflow/definition.rb', line 12 def helper(name, &block) helpers[name] = block end |
#id_for(path) ⇒ Object
380 381 382 383 384 385 386 387 |
# File 'lib/rbbt/workflow/accessor.rb', line 380 def id_for(path) if workdir.respond_to? :find workdir_find = workdir.find else workdir_find = workdir end Misc.path_relative_to workdir_find, path end |
#job(taskname, jobname = nil, inputs = {}) ⇒ Object
220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 |
# File 'lib/rbbt/workflow.rb', line 220 def job(taskname, jobname = nil, inputs = {}) taskname = taskname.to_sym jobname = DEFAULT_NAME if jobname.nil? or jobname.empty? task = tasks[taskname] raise "Task not found: #{ taskname }" if task.nil? IndiferentHash.setup(inputs) Workflow.resolve_locals(inputs) dependencies = real_dependencies(task, jobname, inputs, task_dependencies[taskname] || []) if inputs.empty? step_path = step_path taskname, jobname, [], [], task.extension input_values = task.take_input_values(inputs) else input_values = task.take_input_values(inputs) step_path = step_path taskname, jobname, input_values, dependencies, task.extension end step = Step.new step_path, task, input_values, dependencies helpers.each do |name, block| (class << step; self; end).instance_eval do define_method name, &block end end step end |
#jobs(taskname, query = nil) ⇒ Object
276 277 278 279 280 281 282 283 284 |
# File 'lib/rbbt/workflow.rb', line 276 def jobs(taskname, query = nil) task_dir = File.join(File.(workdir.find), taskname.to_s) pattern = File.join(File.(task_dir), '**/*') job_info_files = Dir.glob(Step.info_file(pattern)).collect{|f| Misc.path_relative_to task_dir, f } job_info_files = job_info_files.select{|f| f.index(query) == 0 } if query job_info_files.collect{|f| job_name = Step.job_name_for_info_file(f, tasks[taskname].extension) } end |
#load_documentation ⇒ Object
46 47 48 49 50 51 52 |
# File 'lib/rbbt/workflow/doc.rb', line 46 def load_documentation @documentation = Workflow.parse_workflow_doc documentation_markdown @documentation[:tasks].each do |task, description| raise "Documentation for #{ task }, but not a #{ self.to_s } task" unless tasks.include? task.to_sym tasks[task.to_sym].description = description end end |
#load_id(id) ⇒ Object
257 258 259 260 261 262 263 264 265 266 267 268 |
# File 'lib/rbbt/workflow.rb', line 257 def load_id(id) path = File.join(workdir, id) task = task_for path step = Step.new path, tasks[task.to_sym] step.info if step.info.include? :dependencies step.dependencies = step.info[:dependencies].collect do |task, job| load_id(File.join(task.to_s, job)) end end step end |
#load_name(task, name) ⇒ Object
270 271 272 273 274 |
# File 'lib/rbbt/workflow.rb', line 270 def load_name(task, name) task = tasks[task.to_sym] if String === task or Symbol === task path = step_path task.name, name, [], [], task.extension Step.new path, task end |
#load_step(path) ⇒ Object
252 253 254 255 |
# File 'lib/rbbt/workflow.rb', line 252 def load_step(path) task = task_for path Step.new path, tasks[task.to_sym] end |
#local_persist_setup ⇒ Object
{{{ Make workflow resources local
287 288 289 290 291 292 |
# File 'lib/rbbt/workflow.rb', line 287 def local_persist_setup class << self include LocalPersist end self.local_persist_dir = Rbbt.var.cache.persistence.find :lib end |
#local_workdir_setup ⇒ Object
294 295 296 |
# File 'lib/rbbt/workflow.rb', line 294 def local_workdir_setup self.workdir = Rbbt.var.jobs.find :lib end |
#log(status, message = nil, &block) ⇒ Object
266 267 268 |
# File 'lib/rbbt/workflow/accessor.rb', line 266 def log(status, = nil, &block) Step.log(status, , nil, &block) end |
#make_local ⇒ Object
298 299 300 301 |
# File 'lib/rbbt/workflow.rb', line 298 def make_local local_persist_setup local_workdir_setup end |
#real_dependencies(task, jobname, inputs, dependencies) ⇒ Object
342 343 344 345 346 347 348 349 350 351 352 353 354 355 |
# File 'lib/rbbt/workflow/accessor.rb', line 342 def real_dependencies(task, jobname, inputs, dependencies) real_dependencies = [] dependencies.each do |dependency| real_dependencies << case dependency when Step dependency when Symbol job(dependency, jobname, inputs) when Proc dependency.call jobname, inputs end end real_dependencies.flatten.compact end |
#rec_dependencies(taskname) ⇒ Object
308 309 310 311 312 313 314 315 316 |
# File 'lib/rbbt/workflow/accessor.rb', line 308 def rec_dependencies(taskname) if task_dependencies.include? taskname deps = task_dependencies[taskname].select{|dep| String === dep or Symbol === dep} deps.concat deps.collect{|dep| rec_dependencies(dep)}.flatten deps.uniq else [] end end |
#rec_input_defaults(taskname) ⇒ Object
322 323 324 325 |
# File 'lib/rbbt/workflow/accessor.rb', line 322 def rec_input_defaults(taskname) [taskname].concat(rec_dependencies(taskname)).inject({}){|acc, tn| acc.merge tasks[tn.to_sym].input_defaults}. tap{|h| IndiferentHash.setup(h)} end |
#rec_input_descriptions(taskname) ⇒ Object
332 333 334 335 |
# File 'lib/rbbt/workflow/accessor.rb', line 332 def rec_input_descriptions(taskname) [taskname].concat(rec_dependencies(taskname)).inject({}){|acc, tn| acc.merge tasks[tn.to_sym].input_descriptions}. tap{|h| IndiferentHash.setup(h)} end |
#rec_input_options(taskname) ⇒ Object
337 338 339 340 |
# File 'lib/rbbt/workflow/accessor.rb', line 337 def (taskname) [taskname].concat(rec_dependencies(taskname)).inject({}){|acc, tn| acc.merge tasks[tn.to_sym].}. tap{|h| IndiferentHash.setup(h)} end |
#rec_input_types(taskname) ⇒ Object
327 328 329 330 |
# File 'lib/rbbt/workflow/accessor.rb', line 327 def rec_input_types(taskname) [taskname].concat(rec_dependencies(taskname)).inject({}){|acc, tn| acc.merge tasks[tn.to_sym].input_types}. tap{|h| IndiferentHash.setup(h) } end |
#rec_inputs(taskname) ⇒ Object
318 319 320 |
# File 'lib/rbbt/workflow/accessor.rb', line 318 def rec_inputs(taskname) [taskname].concat(rec_dependencies(taskname)).inject([]){|acc, tn| acc.concat tasks[tn.to_sym].inputs} end |
#returns(description) ⇒ Object
24 25 26 |
# File 'lib/rbbt/workflow/definition.rb', line 24 def returns(description) @result_description = description end |
#step_path(taskname, jobname, inputs, dependencies, extension = nil) ⇒ Object
358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 |
# File 'lib/rbbt/workflow/accessor.rb', line 358 def step_path(taskname, jobname, inputs, dependencies, extension = nil) Proc.new{ raise "Jobname makes an invalid path: #{ jobname }" if jobname =~ /\.\./ if inputs.any? or dependencies.any? tagged_jobname = case TAG when :hash jobname + '_' + Misc.digest((inputs.collect{|i| Misc.fingerprint(i)} * "," + ";" + dependencies.collect{|dep| dep.name } * "\n")) else jobname end else tagged_jobname = jobname end if extension and not extension.empty? tagged_jobname = tagged_jobname + ('.' << extension.to_s) end workdir[taskname][tagged_jobname].find } end |
#task(name, &block) ⇒ Object
33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 |
# File 'lib/rbbt/workflow/definition.rb', line 33 def task(name, &block) if Hash === name type = name.first.last name = name.first.first else result_type = consume_result_type || :marshal end name = name.to_sym block = self.method(name) unless block_given? task_info = { :name => name, :inputs => consume_inputs, :description => consume_description, :input_types => consume_input_types, :result_type => (Array === type ? type.to_sym : type), :input_defaults => consume_input_defaults, :input_descriptions => consume_input_descriptions, :extension => consume_extension, :input_options => } task = Task.setup(task_info, &block) last_task = task tasks[name] = task task_dependencies[name] = consume_dependencies end |
#task_for(path) ⇒ Object
389 390 391 392 393 394 395 396 397 398 399 400 |
# File 'lib/rbbt/workflow/accessor.rb', line 389 def task_for(path) if workdir.respond_to? :find workdir_find = workdir.find else workdir_find = workdir end workdir_find = File.(workdir_find) path = File.(path) dir = File.dirname(path) Misc.path_relative_to(workdir_find, dir).sub(/([^\/]+)\/.*/,'\1') end |
#task_info(name) ⇒ Object
270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 |
# File 'lib/rbbt/workflow/accessor.rb', line 270 def task_info(name) name = name.to_sym task = tasks[name] description = task.description result_description = task.result_description result_type = task.result_type inputs = rec_inputs(name).uniq input_types = rec_input_types(name) input_descriptions = rec_input_descriptions(name) input_defaults = rec_input_defaults(name) = (name) export = case when (synchronous_exports.include?(name.to_sym) or synchronous_exports.include?(name.to_s)) :synchronous when (asynchronous_exports.include?(name.to_sym) or asynchronous_exports.include?(name.to_s)) :asynchronous when (exec_exports.include?(name.to_sym) or exec_exports.include?(name.to_s)) :exec else :none end dependencies = task_dependencies[name].select{|dep| String === dep or Symbol === dep} { :id => File.join(self.to_s, name.to_s), :description => description, :export => export, :inputs => inputs, :input_types => input_types, :input_descriptions => input_descriptions, :input_defaults => input_defaults, :input_options => , :result_type => result_type, :result_description => result_description, :dependencies => dependencies } end |