Module: WorkflowSSHClient

Defined in:
lib/rbbt/workflow/remote/ssh/get.rb,
lib/rbbt/workflow/remote/ssh/adaptor.rb

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.capture_exceptionObject



38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
# File 'lib/rbbt/workflow/remote/ssh/get.rb', line 38

def self.capture_exception
  begin
    yield
  rescue Exception => e
    raise e unless e.respond_to? :response
    begin
      ne = parse_exception e.response.to_s
      case ne
      when String
        raise e.class, ne
      when Exception
        raise ne
      else
        raise
      end
    rescue
      raise e
    end
    raise $!
  end
end

.fix_hash(hash, fix_values = false) ⇒ Object



2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# File 'lib/rbbt/workflow/remote/ssh/get.rb', line 2

def self.fix_hash(hash, fix_values = false)
  fixed = {}
  hash.each do |key, value|
    fixed[key.to_sym] = case value
                        when TrueClass
                          value
                        when FalseClass
                          value
                        when Hash 
                          fix_hash(value)  
                        when (fix_values and String )
                          value.to_sym
                        when IO
                          value.read
                        when TSV::Dumper
                          value.stream
                        when Step
                          stream = get_stream(value)
                          stream || value.load
                        else
                          value
                        end
  end
  fixed
end

.fix_params(params) ⇒ Object



60
61
62
63
64
65
66
67
68
69
70
# File 'lib/rbbt/workflow/remote/ssh/get.rb', line 60

def self.fix_params(params)
  new_params = {}
  params.each do |k,v|
    if Array === v and v.empty?
      new_params[k] = "EMPTY_ARRAY"
    else
      new_params[k] = v
    end
  end
  new_params
end

.get_json(url, params = {}) ⇒ Object



72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
# File 'lib/rbbt/workflow/remote/ssh/get.rb', line 72

def self.get_json(url, params = {})
  Log.debug{ "SSHClient get_json: #{ url } - #{Misc.fingerprint params }" }
  params = params.merge({ :_format => 'json' })
  params = fix_params params

  res = capture_exception do
    Misc.insist(2, 0.5) do
      SSHDriver.get_json(url, :params => params)
    end
  end

  begin
    JSON.parse(res)
  rescue
    res
  end
end

.parse_exception(text) ⇒ Object



28
29
30
31
32
33
34
35
36
# File 'lib/rbbt/workflow/remote/ssh/get.rb', line 28

def self.parse_exception(text)
  klass, message = text.split " => "
  begin
    klass = Kernel.const_get klass
    return klass.new message
  rescue
    message
  end
end

.upload_inputs(server, inputs, input_types, input_id) ⇒ Object



90
91
92
93
94
95
96
# File 'lib/rbbt/workflow/remote/ssh/get.rb', line 90

def self.upload_inputs(server, inputs, input_types, input_id)
  TmpFile.with_file do |dir|
    if Step.save_inputs(inputs, input_types, dir)
      CMD.cmd("ssh '#{server}' mkdir -p .rbbt/tmp/tmp-ssh_job_inputs/; scp -r '#{dir}' #{server}:.rbbt/tmp/tmp-ssh_job_inputs/#{input_id}")
    end
  end
end

Instance Method Details

#cleanObject



145
146
147
148
149
# File 'lib/rbbt/workflow/remote/ssh/get.rb', line 145

def clean
  init_job
  SSHDriver.clean(@url, @input_id, @base_name) if done?
  _restart
end

#documentationObject



8
9
10
11
# File 'lib/rbbt/workflow/remote/ssh/adaptor.rb', line 8

def documentation
  @documention ||= IndiferentHash.setup(WorkflowSSHClient.get_json(File.join(url, "documentation")))
  @documention
end

#init_job(cache_type = nil, other_params = {}) ⇒ Object



101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
# File 'lib/rbbt/workflow/remote/ssh/get.rb', line 101

def init_job(cache_type = nil, other_params = {})
  cache_type = :asynchronous if cache_type.nil? and not @is_exec
  cache_type = :exec if cache_type.nil?
  @last_info_time = nil
  @done = false
  @server, @server_path = SSHDriver.parse_url base_url
  @input_id ||= "inputs-" << rand(100000).to_s
  @input_types = task_info(task)[:input_types]

  WorkflowSSHClient.upload_inputs(@server, inputs, @input_types, @input_id)

  @name ||= Persist.memory("RemoteSteps", :workflow => self, :task => task, :jobname => @name, :inputs => inputs, :cache_type => cache_type) do
    Misc.insist do
      input_types = {}
      SSHDriver.post_job(File.join(base_url, task.to_s), @input_id, @base_name)
    end
  end
  if Open.remote? @name
    @url = @name
    @name = File.basename(@name)
  else
    @url = File.join(base_url, task.to_s, @name)
  end
  self
end

#init_remote_tasksObject



56
57
58
59
60
# File 'lib/rbbt/workflow/remote/ssh/adaptor.rb', line 56

def init_remote_tasks
  @task_info = IndiferentHash.setup(WorkflowSSHClient.get_json(url))
  @exec_exports = @stream_exports = @synchronous_exports = []
  @asynchronous_exports = @task_info.keys
end

#load_tasksObject



42
43
44
# File 'lib/rbbt/workflow/remote/ssh/adaptor.rb', line 42

def load_tasks
  @task_info.keys.each{|name| tasks[name]}
end

#pathObject



127
128
129
130
# File 'lib/rbbt/workflow/remote/ssh/get.rb', line 127

def path
  @server, @server_path = SSHDriver.parse_url @base_url
  "ssh://" + @server + ":" + @remote_path
end

#produce(*args) ⇒ Object



132
133
134
135
136
137
138
139
# File 'lib/rbbt/workflow/remote/ssh/get.rb', line 132

def produce(*args)
  input_types = {}
  init_job
  @remote_path = SSHDriver.run_job(File.join(base_url, task.to_s), @input_id, @base_name)
  while ! done?
    sleep 1
  end
end

#run(*args) ⇒ Object



141
142
143
# File 'lib/rbbt/workflow/remote/ssh/get.rb', line 141

def run(*args)
  produce(*args)
end

#task_dependenciesObject



46
47
48
49
50
51
52
53
54
# File 'lib/rbbt/workflow/remote/ssh/adaptor.rb', line 46

def task_dependencies
  @task_dependencies ||= Hash.new do |hash,task| 
    hash[task] = if exported_tasks.include? task
      WorkflowSSHClient.get_json(File.join(url, task.to_s, 'dependencies'))
    else
      []
    end
  end
end

#task_info(task) ⇒ Object



13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# File 'lib/rbbt/workflow/remote/ssh/adaptor.rb', line 13

def task_info(task)
  @task_info ||= IndiferentHash.setup({})
  
  if @task_info[task].nil?
    task_info = WorkflowSSHClient.get_json(File.join(@base_url, task.to_s))
    task_info = WorkflowSSHClient.fix_hash(task_info)

    task_info[:result_type] = task_info[:result_type].to_sym if task_info[:result_type]
    task_info[:export] = task_info[:export].to_sym if task_info[:export]
    task_info[:input_types] = WorkflowSSHClient.fix_hash(task_info[:input_types], true)
    task_info[:inputs] = task_info[:inputs].collect{|input| input.to_sym }

    @task_info[task] = IndiferentHash.setup(task_info)
  end

  IndiferentHash.setup(@task_info[task])
end

#tasksObject



31
32
33
34
35
36
37
38
39
40
# File 'lib/rbbt/workflow/remote/ssh/adaptor.rb', line 31

def tasks
  @tasks ||= Hash.new do |hash,task_name| 
    info = @task_info[task_name]
    task = Task.setup info do |*args|
      raise "This is a remote task" 
    end
    task.name = task_name.to_sym
    hash[task_name] = task
  end
end

#workflow_descriptionObject



4
5
6
# File 'lib/rbbt/workflow/remote/ssh/adaptor.rb', line 4

def workflow_description
  WorkflowSSHClient.get_raw(File.join(url, 'description'))
end