Class: WorkflowRESTClient

Inherits:
Object
  • Object
show all
Includes:
Workflow
Defined in:
lib/rbbt/rest/client.rb,
lib/rbbt/rest/client/get.rb,
lib/rbbt/rest/client/step.rb,
lib/rbbt/rest/client/adaptor.rb

Defined Under Namespace

Classes: RemoteStep

Constant Summary

Constants included from Workflow

Workflow::DEFAULT_NAME, Workflow::STEP_CACHE, Workflow::TAG

Instance Attribute Summary collapse

Attributes included from Workflow

#description, #example_dir, #helpers, #last_task, #libdir, #remote_tasks, #step_cache, #task_description, #workdir

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Workflow

#add_remote_tasks, #all_exports, #assign_dep_inputs, #dep, #desc, #doc, doc_parse_chunks, doc_parse_first_line, doc_parse_up_to, #documentation_markdown, #example, #example_step, #examples, #export_asynchronous, #export_exec, #export_stream, #export_synchronous, extended, #extension, #get_job_step, #helper, #id_for, init_remote_tasks, installed_workflows, #jobs, #load_documentation, load_inputs, #load_name, load_remote_tasks, #load_step, load_workflow_file, load_workflow_libdir, #local_persist_setup, #local_workdir_setup, local_workflow_filename, #log, #make_local, parse_workflow_doc, process_remote_tasks, #real_dependencies, #rec_dependencies, #rec_input_defaults, #rec_input_descriptions, #rec_input_options, #rec_input_types, #rec_inputs, require_local_workflow, require_remote_workflow, require_workflow, resolve_locals, #returns, #set_step_dependencies, #step_module, #step_path, #task, #task_exports, #task_for, #task_from_dep, #unexport, #with_workdir, workdir, workdir=

Methods included from InputModule

#input

Constructor Details

#initialize(url, name) ⇒ WorkflowRESTClient

Returns a new instance of WorkflowRESTClient.



16
17
18
19
20
# File 'lib/rbbt/rest/client.rb', line 16

def initialize(url, name)
  Log.debug{ "Loading remote workflow #{ name }: #{ url }" }
  @url, @name = url, name
  init_remote_tasks
end

Instance Attribute Details

#asynchronous_exportsObject

Returns the value of attribute asynchronous_exports.



14
15
16
# File 'lib/rbbt/rest/client.rb', line 14

def asynchronous_exports
  @asynchronous_exports
end

#exec_exportsObject

Returns the value of attribute exec_exports.



14
15
16
# File 'lib/rbbt/rest/client.rb', line 14

def exec_exports
  @exec_exports
end

#nameObject

Returns the value of attribute name.



14
15
16
# File 'lib/rbbt/rest/client.rb', line 14

def name
  @name
end

#stream_exportsObject

Returns the value of attribute stream_exports.



14
15
16
# File 'lib/rbbt/rest/client.rb', line 14

def stream_exports
  @stream_exports
end

#synchronous_exportsObject

Returns the value of attribute synchronous_exports.



14
15
16
# File 'lib/rbbt/rest/client.rb', line 14

def synchronous_exports
  @synchronous_exports
end

#urlObject

Returns the value of attribute url.



14
15
16
# File 'lib/rbbt/rest/client.rb', line 14

def url
  @url
end

Class Method Details

.__prepare_inputs_for_restclient(inputs) ⇒ Object



3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# File 'lib/rbbt/rest/client/step.rb', line 3

def self.__prepare_inputs_for_restclient(inputs)
  inputs.each do |k,v|
    if v.respond_to? :path and not v.respond_to? :original_filename
      class << v
        def original_filename
          File.expand_path(path)
        end
      end
    end

    if Array === v and v.empty?
      inputs[k] = "EMPTY_ARRAY"
    end
  end
end

.capture_exceptionObject



37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
# File 'lib/rbbt/rest/client/get.rb', line 37

def self.capture_exception
  begin
    yield
  rescue Exception => e
    raise e unless e.respond_to? :response
    begin
      klass, message = e.response.to_s.split " => "
      klass = Kernel.const_get klass
      raise klass.new message
    rescue
      raise e
    end
    raise $!
  end
end

.clean_url(url, params = {}) ⇒ Object



65
66
67
68
69
70
71
72
73
74
75
76
77
# File 'lib/rbbt/rest/client/get.rb', line 65

def self.clean_url(url, params = {})
  params = params.merge({ :_format => 'json', :update => 'clean' })
  params = fix_params params
  res = capture_exception do
    Misc.insist(2, 0.5) do
      Log.debug{ "RestClient clean: #{ url } - #{Misc.fingerprint params}" }
      res = RestClient.get(self.encode(url), :params => params)
      raise TryAgain if res.code == 202
      res
    end
  end
  res
end

.encode(url) ⇒ Object



2
3
4
5
6
7
8
9
# File 'lib/rbbt/rest/client/get.rb', line 2

def self.encode(url)
  begin
    URI.encode(url)
  rescue
    Log.warn $!.message
    url
  end
end

.fix_hash(hash, fix_values = false) ⇒ Object



11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# File 'lib/rbbt/rest/client/get.rb', line 11

def self.fix_hash(hash, fix_values = false)
  fixed = {}
  hash.each do |key, value|
    fixed[key.to_sym] = case value
                        when TrueClass
                          value
                        when FalseClass
                          value
                        when Hash 
                          fix_hash(value)  
                        when (fix_values and String )
                          value.to_sym
                        when IO
                          value.read
                        when TSV::Dumper
                          value.stream
                        when Step
                          stream = get_stream(value)
                          stream || value.load
                        else
                          value
                        end
  end
  fixed
end

.fix_params(params) ⇒ Object



53
54
55
56
57
58
59
60
61
62
63
# File 'lib/rbbt/rest/client/get.rb', line 53

def self.fix_params(params)
  new_params = {}
  params.each do |k,v|
    if Array === v and v.empty?
      new_params[k] = "EMPTY_ARRAY"
    else
      new_params[k] = v
    end
  end
  new_params
end

.get_json(url, params = {}) ⇒ Object



94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
# File 'lib/rbbt/rest/client/get.rb', line 94

def self.get_json(url, params = {})
  Log.debug{ "RestClient get_json: #{ url } - #{Misc.fingerprint params }" }
  params = params.merge({ :_format => 'json' })
  params = fix_params params

  res = capture_exception do
    Misc.insist(2, 0.5) do
      RestClient.get(self.encode(url), :params => params)
    end
  end

  begin
    JSON.parse(res)
  rescue
    res
  end
end

.get_raw(url, params = {}) ⇒ Object



79
80
81
82
83
84
85
86
87
88
89
90
91
92
# File 'lib/rbbt/rest/client/get.rb', line 79

def self.get_raw(url, params = {})
  params = params.merge({ :_format => 'raw' })
  params = fix_params params
  res = capture_exception do
    Misc.insist(2, 0.5) do
      Log.debug{ "RestClient get_raw: #{ url } - #{Misc.fingerprint params}" }
      raise "No url" if url.nil?
      res = RestClient.get(self.encode(url), :params => params)
      raise TryAgain if res.code == 202
      res.to_s
    end
  end
  res
end

.post_jobname(url, params = {}) ⇒ Object



112
113
114
115
116
117
118
119
120
121
122
123
124
125
# File 'lib/rbbt/rest/client/get.rb', line 112

def self.post_jobname(url, params = {})
  Log.debug{ "RestClient post_jobname: #{ url } - #{Misc.fingerprint params}" }
  params = params.merge({ :_format => 'jobname' })
  params = fix_params params

  WorkflowRESTClient.__prepare_inputs_for_restclient(params)
  name = capture_exception do
    RestClient.post(self.encode(url), params)
  end

  Log.debug{ "RestClient jobname returned for #{ url } - #{Misc.fingerprint params}: #{name}" }

  name
end

.post_json(url, params = {}) ⇒ Object



127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
# File 'lib/rbbt/rest/client/get.rb', line 127

def self.post_json(url, params = {})
  if url =~ /_cache_type=:exec/
    JSON.parse(Open.open(url, :nocache => true))
  else
    params = params.merge({ :_format => 'json' })
    params = fix_params params

    res = capture_exception do
      RestClient.post(self.encode(url), params)
    end

    begin
      JSON.parse(res)
    rescue
      res
    end
  end
end

Instance Method Details

#documentationObject



7
8
9
# File 'lib/rbbt/rest/client/adaptor.rb', line 7

def documentation
  @documention ||= IndiferentHash.setup(WorkflowRESTClient.get_json(File.join(url, "documentation"),{}))
end

#exported_tasksObject



29
30
31
# File 'lib/rbbt/rest/client/adaptor.rb', line 29

def exported_tasks
  (@asynchronous_exports  + @synchronous_exports + @exec_exports).compact.flatten
end

#init_remote_tasksObject



59
60
61
62
63
64
65
66
# File 'lib/rbbt/rest/client/adaptor.rb', line 59

def init_remote_tasks
  task_exports = WorkflowRESTClient.get_json(url)
  @asynchronous_exports = task_exports["asynchronous"].collect{|task| task.to_sym }
  @synchronous_exports = task_exports["synchronous"].collect{|task| task.to_sym }
  @exec_exports = task_exports["exec"].collect{|task| task.to_sym }
  @stream_exports = task_exports["stream"].collect{|task| task.to_sym }
  @can_stream = task_exports["can_stream"]
end

#job(task, name, inputs) ⇒ Object



26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
# File 'lib/rbbt/rest/client.rb', line 26

def job(task, name, inputs)
  task_info = task_info(task)
  fixed_inputs = {}
  input_types = task_info[:input_types]

  inputs.each do |k,v| 
    k = k.to_sym
    if TSV === v
      fixed_inputs[k] = v.to_s
    else
      next if input_types[k].nil?
      case input_types[k].to_sym
      when :tsv, :array, :file, :text
        fixed_inputs[k] = (String === v and Open.exists?(v)) ? Open.open(v) : v
      else
        fixed_inputs[k] = v
      end
    end
  end

  
  stream_input = @can_stream ? task_info(task)[:input_options].select{|k,o| o[:stream] }.collect{|k,o| k }.first : nil
  RemoteStep.new(url, task, name, fixed_inputs, task_info[:result_type], task_info[:result_description], @exec_exports.include?(task), @stream_exports.include?(task), stream_input)
end

#load_id(id) ⇒ Object



51
52
53
54
55
56
57
58
# File 'lib/rbbt/rest/client.rb', line 51

def load_id(id)
  task, name = id.split("/")
  step = RemoteStep.new url, task, nil
  step.name = name
  step.result_type = task_info(task)[:result_type]
  step.result_description = task_info(task)[:result_description]
  step
end

#load_tasksObject



44
45
46
47
# File 'lib/rbbt/rest/client/adaptor.rb', line 44

def load_tasks
  exported_tasks.each{|name| tasks[name]}
  nil
end

#task_dependenciesObject



49
50
51
52
53
54
55
56
57
# File 'lib/rbbt/rest/client/adaptor.rb', line 49

def task_dependencies
  @task_dependencies ||= Hash.new do |hash,task| 
    hash[task] = if exported_tasks.include? task
      WorkflowRESTClient.get_json(File.join(url, task.to_s, 'dependencies'))
    else
      []
    end
  end
end

#task_info(task) ⇒ Object



11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# File 'lib/rbbt/rest/client/adaptor.rb', line 11

def task_info(task)
  @task_info ||= {}
  @task_info[task]
  
  if @task_info[task].nil?
    task_info = WorkflowRESTClient.get_json(File.join(url, task.to_s, 'info'))
    task_info = WorkflowRESTClient.fix_hash(task_info)

    task_info[:result_type] = task_info[:result_type].to_sym
    task_info[:export] = task_info[:export].to_sym
    task_info[:input_types] = WorkflowRESTClient.fix_hash(task_info[:input_types], true)
    task_info[:inputs] = task_info[:inputs].collect{|input| input.to_sym }

    @task_info[task] = task_info
  end
  @task_info[task]
end

#tasksObject



33
34
35
36
37
38
39
40
41
42
# File 'lib/rbbt/rest/client/adaptor.rb', line 33

def tasks
  @tasks ||= Hash.new do |hash,task_name| 
    info = task_info(task_name)
    task = Task.setup info do |*args|
      raise "This is a remote task" 
    end
    task.name = task_name.to_sym
    hash[task_name] = task
  end
end

#to_sObject



22
23
24
# File 'lib/rbbt/rest/client.rb', line 22

def to_s
  name
end

#workflow_descriptionObject



3
4
5
# File 'lib/rbbt/rest/client/adaptor.rb', line 3

def workflow_description
  WorkflowRESTClient.get_raw(File.join(url, 'description'))
end