Module: WorkflowSSHClient

Defined in:
lib/rbbt/workflow/remote/ssh/get.rb,
lib/rbbt/workflow/remote/ssh/adaptor.rb

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.__prepare_inputs_for_restclient(inputs) ⇒ Object



2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# File 'lib/rbbt/workflow/remote/ssh/adaptor.rb', line 2

def self.__prepare_inputs_for_restclient(inputs)
  inputs.each do |k,v|
    if v.respond_to? :path and not v.respond_to? :original_filename
      class << v
        def original_filename
          File.expand_path(path)
        end
      end
    end

    if Array === v and v.empty?
      inputs[k] = "EMPTY_ARRAY"
    end
  end
end

.capture_exceptionObject



185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
# File 'lib/rbbt/workflow/remote/ssh/get.rb', line 185

def self.capture_exception
  begin
    yield
  rescue Exception => e
    raise e unless e.respond_to? :response
    begin
      ne = parse_exception e.response.to_s
      case ne
      when String
        raise e.class, ne
      when Exception
        raise ne
      else
        raise
      end
    rescue
      raise e
    end
    raise $!
  end
end

.execute_job(base_url, task, task_params, cache_type) ⇒ Object



76
77
# File 'lib/rbbt/workflow/remote/ssh/adaptor.rb', line 76

def self.execute_job(base_url, task, task_params, cache_type)
end

.fix_hash(hash, fix_values = false) ⇒ Object



149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
# File 'lib/rbbt/workflow/remote/ssh/get.rb', line 149

def self.fix_hash(hash, fix_values = false)
  fixed = {}
  hash.each do |key, value|
    fixed[key.to_sym] = case value
                        when TrueClass
                          value
                        when FalseClass
                          value
                        when Hash 
                          fix_hash(value)  
                        when (fix_values and String )
                          value.to_sym
                        when IO
                          value.read
                        when TSV::Dumper
                          value.stream
                        when Step
                          stream = get_stream(value)
                          stream || value.load
                        else
                          value
                        end
  end
  fixed
end

.fix_params(params) ⇒ Object



207
208
209
210
211
212
213
214
215
216
217
# File 'lib/rbbt/workflow/remote/ssh/get.rb', line 207

def self.fix_params(params)
  new_params = {}
  params.each do |k,v|
    if Array === v and v.empty?
      new_params[k] = "EMPTY_ARRAY"
    else
      new_params[k] = v
    end
  end
  new_params
end

.get_json(url, params = {}) ⇒ Object



219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
# File 'lib/rbbt/workflow/remote/ssh/get.rb', line 219

def self.get_json(url, params = {})
  Log.debug{ "SSHClient get_json: #{ url } - #{Misc.fingerprint params }" }
  params = params.merge({ :_format => 'json' })
  params = fix_params params

  res = capture_exception do
    Misc.insist(2, 0.5) do
      SSHClient.get_json(url, :params => params)
    end
  end

  begin
    JSON.parse(res)
  rescue
    res
  end
end

.parse_exception(text) ⇒ Object



175
176
177
178
179
180
181
182
183
# File 'lib/rbbt/workflow/remote/ssh/get.rb', line 175

def self.parse_exception(text)
  klass, message = text.split " => "
  begin
    klass = Kernel.const_get klass
    return klass.new message
  rescue
    message
  end
end

.post_jobname(url, inputs, input_types) ⇒ Object



238
239
240
# File 'lib/rbbt/workflow/remote/ssh/get.rb', line 238

def self.post_jobname(url, inputs, input_types)
  SSHClient.post_job(url, inputs, input_types)
end

Instance Method Details

#cleanObject



275
276
277
278
279
# File 'lib/rbbt/workflow/remote/ssh/get.rb', line 275

def clean
  init_job
  SSHClient.clean(@url) if done?
  _restart
end

#documentationObject



22
23
24
25
# File 'lib/rbbt/workflow/remote/ssh/adaptor.rb', line 22

def documentation
  @documention ||= IndiferentHash.setup(WorkflowSSHClient.get_json(File.join(url, "documentation")))
  @documention
end

#init_job(cache_type = nil, other_params = {}) ⇒ Object



242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
# File 'lib/rbbt/workflow/remote/ssh/get.rb', line 242

def init_job(cache_type = nil, other_params = {})
  cache_type = :asynchronous if cache_type.nil? and not @is_exec
  cache_type = :exec if cache_type.nil?
  @last_info_time = nil
  @done = false
  get_streams
  @name ||= Persist.memory("RemoteSteps", :workflow => self, :task => task, :jobname => @name, :inputs => inputs, :cache_type => cache_type) do
    Misc.insist do
      #@adaptor.post_jobname(File.join(base_url, task.to_s), inputs.merge(other_params).merge(:jobname => @name||@base_name, :_cache_type => cache_type))
      input_types = {}
      @adaptor.post_jobname(File.join(base_url, task.to_s), inputs, input_types)
    end
  end
  if Open.remote? @name
    @url = @name
    @name = File.basename(@name)
  else
    @url = File.join(base_url, task.to_s, @name)
  end
  self
end

#init_remote_tasksObject



70
71
72
73
74
# File 'lib/rbbt/workflow/remote/ssh/adaptor.rb', line 70

def init_remote_tasks
  @task_info = IndiferentHash.setup(WorkflowSSHClient.get_json(url))
  @exec_exports = @stream_exports = @synchronous_exports = []
  @asynchronous_exports = @task_info.keys
end

#load_tasksObject



56
57
58
# File 'lib/rbbt/workflow/remote/ssh/adaptor.rb', line 56

def load_tasks
  @task_info.keys.each{|name| tasks[name]}
end

#pathObject



264
265
266
267
# File 'lib/rbbt/workflow/remote/ssh/get.rb', line 264

def path
  server, path = SSHClient.parse_url(url)
  "ssh://" + server + ":" + info[:path]
end

#run(*args) ⇒ Object



269
270
271
272
# File 'lib/rbbt/workflow/remote/ssh/get.rb', line 269

def run(*args)
  input_types = {}
  SSHClient.run_job(File.join(base_url, task.to_s), inputs, input_types)
end

#task_dependenciesObject



60
61
62
63
64
65
66
67
68
# File 'lib/rbbt/workflow/remote/ssh/adaptor.rb', line 60

def task_dependencies
  @task_dependencies ||= Hash.new do |hash,task| 
    hash[task] = if exported_tasks.include? task
      WorkflowSSHClient.get_json(File.join(url, task.to_s, 'dependencies'))
    else
      []
    end
  end
end

#task_info(task) ⇒ Object



27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# File 'lib/rbbt/workflow/remote/ssh/adaptor.rb', line 27

def task_info(task)
  @task_info ||= IndiferentHash.setup({})
  @task_info[task]
  
  if @task_info[task].nil?
    task_info = WorkflowSSHClient.get_json(File.join(url, task.to_s, 'info'))
    task_info = WorkflowSSHClient.fix_hash(task_info)

    task_info[:result_type] = task_info[:result_type].to_sym
    task_info[:export] = task_info[:export].to_sym
    task_info[:input_types] = WorkflowSSHClient.fix_hash(task_info[:input_types], true)
    task_info[:inputs] = task_info[:inputs].collect{|input| input.to_sym }

    @task_info[task] = task_info
  end
  @task_info[task]
end

#tasksObject



45
46
47
48
49
50
51
52
53
54
# File 'lib/rbbt/workflow/remote/ssh/adaptor.rb', line 45

def tasks
  @tasks ||= Hash.new do |hash,task_name| 
    info = @task_info[task_name]
    task = Task.setup info do |*args|
      raise "This is a remote task" 
    end
    task.name = task_name.to_sym
    hash[task_name] = task
  end
end

#workflow_descriptionObject



18
19
20
# File 'lib/rbbt/workflow/remote/ssh/adaptor.rb', line 18

def workflow_description
  WorkflowSSHClient.get_raw(File.join(url, 'description'))
end