Module: RemoteStep::SSH

Defined in:
lib/rbbt/workflow/remote_workflow/remote_step/ssh.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#override_dependenciesObject

Returns the value of attribute override_dependencies.



3
4
5
# File 'lib/rbbt/workflow/remote_workflow/remote_step/ssh.rb', line 3

def override_dependencies
  @override_dependencies
end

#run_typeObject

Returns the value of attribute run_type.



3
4
5
# File 'lib/rbbt/workflow/remote_workflow/remote_step/ssh.rb', line 3

def run_type
  @run_type
end

#slurm_optionsObject

Returns the value of attribute slurm_options.



3
4
5
# File 'lib/rbbt/workflow/remote_workflow/remote_step/ssh.rb', line 3

def slurm_options
  @slurm_options
end

Instance Method Details

#_orchestrate_slurmObject



63
64
65
# File 'lib/rbbt/workflow/remote_workflow/remote_step/ssh.rb', line 63

def _orchestrate_slurm
  RemoteWorkflow::SSH.orchestrate_slurm_job(File.join(base_url, task.to_s), @input_id, @base_name, @slurm_options || {})
end

#_runObject



54
55
56
57
# File 'lib/rbbt/workflow/remote_workflow/remote_step/ssh.rb', line 54

def _run
  RemoteWorkflow::SSH.upload_dependencies(self, @server)
  RemoteWorkflow::SSH.run_job(File.join(base_url, task.to_s), @input_id, @base_name)
end

#_run_slurmObject



59
60
61
# File 'lib/rbbt/workflow/remote_workflow/remote_step/ssh.rb', line 59

def _run_slurm
  RemoteWorkflow::SSH.run_slurm_job(File.join(base_url, task.to_s), @input_id, @base_name, @slurm_options || {})
end

#abortObject



101
102
103
# File 'lib/rbbt/workflow/remote_workflow/remote_step/ssh.rb', line 101

def abort
  Log.warn "not implemented RemoteWorkflow::SSH.abort(@url, @input_id, @base_name)"
end

#cleanObject



95
96
97
98
99
# File 'lib/rbbt/workflow/remote_workflow/remote_step/ssh.rb', line 95

def clean
  init_job
  RemoteWorkflow::SSH.clean(@url, @input_id, @base_name)
  _restart
end

#init_job(cache_type = nil, other_params = {}) ⇒ Object



5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
# File 'lib/rbbt/workflow/remote_workflow/remote_step/ssh.rb', line 5

def init_job(cache_type = nil, other_params = {})
  return self if @url
  cache_type = :asynchronous if cache_type.nil? and not @is_exec
  cache_type = :exec if cache_type.nil?
  @last_info_time = nil
  @done = false
  @server, @server_path = RemoteWorkflow::SSH.parse_url base_url
  @input_id ||= "inputs-" << rand(100000).to_s

  if override_dependencies && override_dependencies.any?
    override_dependencies.each do |od|
      name, _sep, value = od.partition("=")
      inputs[name] = value
    end
  end

  inputs.select{|i| Step === i }.each{|i| i.produce }

  RemoteWorkflow::SSH.upload_inputs(@server, inputs, @input_types, @input_id)

  @remote_path ||= Persist.memory("RemoteSteps", :workflow => self, :task => task, :jobname => @name, :inputs => inputs, :cache_type => cache_type) do
    Misc.insist do
      input_types = {}
      RemoteWorkflow::SSH.post_job(File.join(base_url, task.to_s), @input_id, @base_name)
    end
  end
  @name = @remote_path.split("/").last

  if Open.remote?(@name)
    @url = @name
    @name = File.basename(@name)
  else
    @url = File.join(base_url, task.to_s, @name)
  end

  self
end

#input_dependenciesObject



105
106
107
108
109
110
111
112
# File 'lib/rbbt/workflow/remote_workflow/remote_step/ssh.rb', line 105

def input_dependencies
  @input_dependencies ||= inputs.values.flatten.
    select{|i| Step === i || (defined?(RemoteStep) && RemoteStep === i) } + 
    inputs.values.flatten.
    select{|dep| Path === dep && Step === dep.resource }.
    select{|dep| ! dep.resource.started? }. # Ignore input_deps already started
    collect{|dep| dep.resource }
end

#loadObject



86
87
88
# File 'lib/rbbt/workflow/remote_workflow/remote_step/ssh.rb', line 86

def load
  load_res Open.open(path)
end

#pathObject



43
44
45
46
47
48
49
50
51
52
# File 'lib/rbbt/workflow/remote_workflow/remote_step/ssh.rb', line 43

def path
  @server, @server_path = RemoteWorkflow::SSH.parse_url @base_url
  if info[:path]
    "ssh://" + @server + ":" + info[:path]
  elsif @remote_path
    "ssh://" + @server + ":" + @remote_path
  else
    "ssh://" + @server + ":" + ["var/jobs", self.workflow.to_s, task_name.to_s, @name] * "/"
  end
end

#produce(*args) ⇒ Object



67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
# File 'lib/rbbt/workflow/remote_workflow/remote_step/ssh.rb', line 67

def produce(*args)
  input_types = {}
  init_job
  @remote_path = case @run_type
                 when 'run', :run, nil
                   _run
                 when 'slurm', :slurm
                   _run_slurm
                 when 'orchestrate', :orchestrate
                   _orchestrate_slurm
                 end
  @started = true
  while ! (done? || error? || aborted?)
    sleep 1
  end
  raise self.get_exception if error?
  self
end

#run(*args) ⇒ Object



90
91
92
93
# File 'lib/rbbt/workflow/remote_workflow/remote_step/ssh.rb', line 90

def run(*args)
  produce(*args)
  self.load unless args.first
end