Class: WorkflowRESTClient::RemoteStep

Inherits:
Step
  • Object
show all
Defined in:
lib/rbbt/rest/client/step.rb

Constant Summary

Constants inherited from Step

Step::INFO_SERIALIAZER, Step::STREAM_CACHE, Step::STREAM_CACHE_MUTEX

Instance Attribute Summary collapse

Attributes inherited from Step

#bindings, #clean_name, #dependencies, #dupped, #mutex, #pid, #result, #saved_stream, #seen, #stream

Class Method Summary collapse

Instance Method Summary collapse

Methods inherited from Step

#_abort, #_exec, #abort, #abort_pid, #abort_stream, #aborted?, #checks, #child, clean, #dirty?, #dup_inputs, dup_stream, #error?, #exception, #files_dir, files_dir, #get_stream, #grace, #info_file, info_file, #info_lock, job_name_for_info_file, #join_stream, #kill_children, #load_file, log, #log, log_block, log_progress, #log_progress, log_string, #merge_info, #message, #messages, #prepare_result, #produce, #progress_bar, #provenance, #provenance_paths, purge_stream_cache, #rec_dependencies, #relay_log, #run_dependencies, #save_file, #set_info, #soft_grace, #started?, #status=, status_color, #step, step_info, #stop_dependencies, #streaming?, wait_for_jobs

Constructor Details

#initialize(base_url, task = nil, base_name = nil, inputs = nil, result_type = nil, result_description = nil, is_exec = false) ⇒ RemoteStep

Returns a new instance of RemoteStep.



15
16
17
18
19
# File 'lib/rbbt/rest/client/step.rb', line 15

def initialize(base_url, task = nil, base_name = nil, inputs = nil, result_type = nil, result_description = nil, is_exec = false)
  @base_url, @task, @base_name, @inputs, @result_type, @result_description, @is_exec = base_url, task, base_name, inputs, result_type, result_description, is_exec
  @mutex = Mutex.new
  RemoteStep.get_streams @inputs
end

Instance Attribute Details

#base_nameObject

Returns the value of attribute base_name.



4
5
6
# File 'lib/rbbt/rest/client/step.rb', line 4

def base_name
  @base_name
end

#base_urlObject

Returns the value of attribute base_url.



4
5
6
# File 'lib/rbbt/rest/client/step.rb', line 4

def base_url
  @base_url
end

#inputsObject

Returns the value of attribute inputs.



4
5
6
# File 'lib/rbbt/rest/client/step.rb', line 4

def inputs
  @inputs
end

#is_execObject

Returns the value of attribute is_exec.



4
5
6
# File 'lib/rbbt/rest/client/step.rb', line 4

def is_exec
  @is_exec
end

#result_descriptionObject

Returns the value of attribute result_description.



4
5
6
# File 'lib/rbbt/rest/client/step.rb', line 4

def result_description
  @result_description
end

#result_typeObject

Returns the value of attribute result_type.



4
5
6
# File 'lib/rbbt/rest/client/step.rb', line 4

def result_type
  @result_type
end

#taskObject

Returns the value of attribute task.



4
5
6
# File 'lib/rbbt/rest/client/step.rb', line 4

def task
  @task
end

#urlObject

Returns the value of attribute url.



4
5
6
# File 'lib/rbbt/rest/client/step.rb', line 4

def url
  @url
end

Class Method Details

.get_streams(inputs) ⇒ Object



6
7
8
9
10
11
12
13
# File 'lib/rbbt/rest/client/step.rb', line 6

def self.get_streams(inputs)
  inputs.each do |k,v|
    if Step === v
      stream = v.get_stream
      inputs[k] = stream || v.run
    end
  end
end

Instance Method Details

#_restartObject



157
158
159
160
161
162
163
164
165
166
167
168
169
# File 'lib/rbbt/rest/client/step.rb', line 157

def _restart
  @done = nil
  @name = nil
  new_inputs = {}
  inputs.each do |k,i| 
    if File === i 
      new_inputs[k] = File.open(i.path)
    else
      new_inputs[k] = i
    end
  end
  @inputs = new_inputs
end

#cleanObject



183
184
185
186
187
188
189
190
191
192
193
# File 'lib/rbbt/rest/client/step.rb', line 183

def clean
  begin
    params = {:_update => :clean}
    init_job(nil, params)
    WorkflowRESTClient.get_raw(url, params)
    _restart
  rescue Exception
    Log.exception $!
  end
  self
end

#done?Boolean

Returns:

  • (Boolean)


50
51
52
# File 'lib/rbbt/rest/client/step.rb', line 50

def done?
  @done || status.to_s == 'done'
end

#exec(*args) ⇒ Object



102
103
104
# File 'lib/rbbt/rest/client/step.rb', line 102

def exec(*args)
  exec_job
end

#exec_jobObject



150
151
152
153
154
155
# File 'lib/rbbt/rest/client/step.rb', line 150

def exec_job
  res = WorkflowRESTClient.capture_exception do
    RestClient.post(URI.encode(File.join(base_url, task.to_s)), inputs.merge(:_cache_type => :exec, :_format => [:string, :boolean, :tsv, :annotations].include?(result_type) ? :raw : :json))
  end
  load_res res, result_type == :array ? :json : result_type
end

#file(file) ⇒ Object



58
59
60
# File 'lib/rbbt/rest/client/step.rb', line 58

def file(file)
  WorkflowRESTClient.get_raw(File.join(url, 'file', file))
end

#filesObject



54
55
56
# File 'lib/rbbt/rest/client/step.rb', line 54

def files
  WorkflowRESTClient.get_json(File.join(url, 'files'))
end

#forkObject



75
76
77
# File 'lib/rbbt/rest/client/step.rb', line 75

def fork
  init_job(:asynchronous)
end

#getObject



113
114
115
116
117
118
119
120
121
122
123
124
# File 'lib/rbbt/rest/client/step.rb', line 113

def get
  params ||= {}
  params = params.merge(:_format => [:string, :boolean, :tsv, :annotations,:array].include?(result_type.to_sym) ? :raw : :json )
  Misc.insist 3, rand(2) + 1 do
    begin
      WorkflowRESTClient.get_raw(url, params)
    rescue
      Log.exception $!
      raise $!
    end
  end
end

#info(check_lock = false) ⇒ Object



31
32
33
34
35
36
37
38
39
40
# File 'lib/rbbt/rest/client/step.rb', line 31

def info(check_lock=false)
  @done = @info and @info[:status] and @info[:status].to_sym == :done
  @info = Persist.memory("RemoteSteps Info", :url => @url, :persist => !!@done) do
    init_job unless url
    info = WorkflowRESTClient.get_json(File.join(url, 'info'))
    info = WorkflowRESTClient.fix_hash(info)
    info[:status] = info[:status].to_sym if String === info[:status]
    info
  end
end

#init_job(cache_type = nil, other_params = {}) ⇒ Object

{{{ MANAGEMENT



64
65
66
67
68
69
70
71
72
# File 'lib/rbbt/rest/client/step.rb', line 64

def init_job(cache_type = nil, other_params = {})
  cache_type = :asynchronous if cache_type.nil? and not @is_exec
  cache_type = :exec if cache_type.nil?
  @name ||= Persist.memory("RemoteSteps", :workflow => self, :task => task, :jobname => @name, :inputs => inputs, :cache_type => cache_type) do
    WorkflowRESTClient.post_jobname(File.join(base_url, task.to_s), inputs.merge(other_params).merge(:jobname => @name||@base_name, :_cache_type => cache_type))
  end
  @url = File.join(base_url, task.to_s, @name)
  nil
end

#joinObject



106
107
108
109
110
111
# File 'lib/rbbt/rest/client/step.rb', line 106

def join
  sleep 0.2 unless self.done?
  sleep 1 unless self.done?
  sleep 3 while not self.done?
  self
end

#loadObject



145
146
147
148
# File 'lib/rbbt/rest/client/step.rb', line 145

def load
  params = {}
  load_res get
end

#load_res(res, result_type = nil) ⇒ Object



126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
# File 'lib/rbbt/rest/client/step.rb', line 126

def load_res(res, result_type = nil)
  join
  result_type ||= self.result_type
  case result_type
  when :string
    res
  when :boolean
    res == "true"
  when :tsv
    TSV.open(StringIO.new(res))
  when :annotations
    Annotated.load_tsv(TSV.open(StringIO.new(res)))
  when :array
    res.split("\n")
  else
    JSON.parse res
  end
end

#nameObject



21
22
23
24
# File 'lib/rbbt/rest/client/step.rb', line 21

def name
  return nil if @is_exec
  (Array === @url ? @url.first : @url).split("/").last
end

#pathObject



83
84
85
86
# File 'lib/rbbt/rest/client/step.rb', line 83

def path
  init_job
  @url + '?_format=raw'
end

#recursive_cleanObject



171
172
173
174
175
176
177
178
179
180
181
# File 'lib/rbbt/rest/client/step.rb', line 171

def recursive_clean
  begin
    params = {:_update => :recursive_clean}
    init_job(nil, params)
    WorkflowRESTClient.get_raw(url, params)
    _restart
  rescue Exception
    Log.exception $!
  end
  self
end

#run(noload = false) ⇒ Object



88
89
90
91
92
93
94
95
96
97
98
99
100
# File 'lib/rbbt/rest/client/step.rb', line 88

def run(noload = false)
  @mutex.synchronize do
    @result ||= begin
                  if @is_exec
                    exec_job 
                  else
                    init_job 
                    self.load
                  end
                end
  end
  noload ? path + '?_format=raw' : @result
end

#running?Boolean

Returns:

  • (Boolean)


79
80
81
# File 'lib/rbbt/rest/client/step.rb', line 79

def running?
  ! %w(done error aborted).include? status.to_s
end

#statusObject



42
43
44
45
46
47
48
# File 'lib/rbbt/rest/client/step.rb', line 42

def status
  begin
    info[:status]
  ensure
    @info = nil
  end
end

#task_nameObject



26
27
28
29
# File 'lib/rbbt/rest/client/step.rb', line 26

def task_name
  init_job
  (Array === @url ? @url.first : @url).split("/")[-2]
end