Class: Ripe::WorkerController::Preparer
- Inherits:
-
Object
- Object
- Ripe::WorkerController::Preparer
- Defined in:
- lib/ripe/worker_controller/preparer.rb
Overview
This class controls worker preparation from a given workflow, list of samples and parameters. It applies the workflow to each of the specified samples.
Instance Attribute Summary collapse
-
#workers ⇒ Array<Worker>
workers prepared in current batch.
Instance Method Summary collapse
-
#initialize(workflow, samples, params = {}) ⇒ Preparer
constructor
Prepare workers by applying the workflow callback and its parameters to each sample.
-
#load_workflow(workflow, params) ⇒ Proc, Hash<Symbol, String>
Load a workflow and return its
callbackandparamscomponents. -
#prepare_sample_blocks(samples, callback, params) ⇒ Hash
Apply the workflow (callback) to each sample, producing a single root block per sample.
-
#prepare_worker(worker_sample_blocks, params) ⇒ DB::Worker
Prepare a worker from a group of sample blocks.
-
#prepare_worker_blocks(worker_sample_blocks, worker) ⇒ Array<Blocks::Block>
Organize worker blocks into tasks and prepare them.
Constructor Details
#initialize(workflow, samples, params = {}) ⇒ Preparer
Prepare workers by applying the workflow callback and its parameters to each sample.
28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 |
# File 'lib/ripe/worker_controller/preparer.rb', line 28 def initialize(workflow, samples, params = {}) # Extract callback and params from input callback, params = load_workflow(workflow, params) if ![:patch, :force, :depend].include?(params[:mode].to_sym) abort "Invalid mode #{params[:mode]}." end # Apply the workflow to each sample sample_blocks = prepare_sample_blocks(samples, callback, params) # Split samples into groups of +:group_num+ samples and produce a # worker from each of these groups. @workers = sample_blocks.each_slice(params[:group_num].to_i).map do |worker_blocks| prepare_worker(worker_blocks, params) end end |
Instance Attribute Details
#workers ⇒ Array<Worker>
workers prepared in current batch
15 16 17 |
# File 'lib/ripe/worker_controller/preparer.rb', line 15 def workers @workers end |
Instance Method Details
#load_workflow(workflow, params) ⇒ Proc, Hash<Symbol, String>
Load a workflow and return its callback and params components.
54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 |
# File 'lib/ripe/worker_controller/preparer.rb', line 54 def load_workflow(workflow, params) filename = Library.find(:workflow, workflow) abort "Could not find workflow #{workflow}." if filename == nil require_relative filename # Imports +$workflow+ from the workflow component. This is a dirty # hack to help make the +DSL::WorkflowDSL+ more convenient for the # end user. params = { wd: Dir.pwd, mode: :patch, group_num: 1, }.merge($workflow.params.merge(params)) [$workflow.callback, params] end |
#prepare_sample_blocks(samples, callback, params) ⇒ Hash
Apply the workflow (callback) to each sample, producing a single root block per sample.
81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 |
# File 'lib/ripe/worker_controller/preparer.rb', line 81 def prepare_sample_blocks(samples, callback, params) sample_blocks = samples.map do |sample| block = callback.call(sample, params).prune(params[:mode].to_sym == :force, params[:mode].to_sym == :depend) if block != nil puts "Preparing sample #{sample}" {sample => block} else puts "Nothing to do for sample #{sample}" nil end end # Produce a {sample => block} hash sample_blocks.compact.inject(&:merge) end |
#prepare_worker(worker_sample_blocks, params) ⇒ DB::Worker
Prepare a worker from a group of sample blocks.
107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 |
# File 'lib/ripe/worker_controller/preparer.rb', line 107 def prepare_worker(worker_sample_blocks, params) worker = DB::Worker.create(handle: params[:handle]) worker_blocks = prepare_worker_blocks(worker_sample_blocks, worker) # Combine all grouped sample blocks into a single worker block params = params.merge({ name: worker.id, stdout: worker.stdout, stderr: worker.stderr, command: Blocks::SerialBlock.new(*worker_blocks).command, }) worker_block = Blocks::LiquidBlock.new("#{PATH}/share/moab.sh", params) File.open(worker.sh, 'w') { |f| f.write(worker_block.command) } worker.update({ status: :prepared, ppn: params[:ppn], queue: params[:queue], walltime: params[:walltime], }) worker end |
#prepare_worker_blocks(worker_sample_blocks, worker) ⇒ Array<Blocks::Block>
Organize worker blocks into tasks and prepare them.
containing as many elements as there are samples in the group
142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 |
# File 'lib/ripe/worker_controller/preparer.rb', line 142 def prepare_worker_blocks(worker_sample_blocks, worker) worker_sample_blocks.map do |sample, block| # Preorder traversal of blocks -- assign incremental numbers starting from # 1 to each node as it is being traversed, as well as producing the job # file for each task. post_var_assign = lambda do |subblock| if subblock.blocks.length == 0 # This section is only called when the subblock is actually a working # block (a leaf in the block arborescence). task = worker.tasks.create({ sample: sample, block: subblock.id, }) File.open(task.sh, 'w') { |f| f.write(subblock.command) } subblock.vars.merge!(log: task.log) else subblock.blocks.each(&post_var_assign) end end post_var_assign.call(block) block end end |