Class: WorkflowRESTClient::RemoteStep
- Inherits:
-
Step
- Object
- Step
- WorkflowRESTClient::RemoteStep
show all
- Defined in:
- lib/rbbt/rest/client/step.rb
Constant Summary
Constants inherited
from Step
Step::INFO_SERIALIAZER, Step::STREAM_CACHE, Step::STREAM_CACHE_MUTEX
Instance Attribute Summary collapse
Attributes inherited from Step
#bindings, #clean_name, #dependencies, #dupped, #mutex, #pid, #result, #saved_stream, #seen, #stream
Class Method Summary
collapse
Instance Method Summary
collapse
Methods inherited from Step
#_abort, #_exec, #abort, #abort_pid, #abort_stream, #aborted?, #checks, #child, clean, #dirty?, #dup_inputs, dup_stream, #error?, #exception, #files_dir, files_dir, #get_stream, #grace, #info_file, info_file, #info_lock, job_name_for_info_file, #join_stream, #kill_children, #load_file, log, #log, log_block, log_progress, #log_progress, log_string, #merge_info, #message, #messages, #prepare_result, #produce, #progress_bar, #provenance, #provenance_paths, purge_stream_cache, #rec_dependencies, #relay_log, #run_dependencies, #save_file, #set_info, #soft_grace, #started?, #status=, status_color, #step, step_info, #stop_dependencies, #streaming?, wait_for_jobs
Constructor Details
#initialize(base_url, task = nil, base_name = nil, inputs = nil, result_type = nil, result_description = nil, is_exec = false) ⇒ RemoteStep
Returns a new instance of RemoteStep.
15
16
17
18
19
|
# File 'lib/rbbt/rest/client/step.rb', line 15
def initialize(base_url, task = nil, base_name = nil, inputs = nil, result_type = nil, result_description = nil, is_exec = false)
@base_url, @task, @base_name, @inputs, @result_type, @result_description, @is_exec = base_url, task, base_name, inputs, result_type, result_description, is_exec
@mutex = Mutex.new
RemoteStep.get_streams @inputs
end
|
Instance Attribute Details
#base_name ⇒ Object
Returns the value of attribute base_name.
4
5
6
|
# File 'lib/rbbt/rest/client/step.rb', line 4
def base_name
@base_name
end
|
#base_url ⇒ Object
Returns the value of attribute base_url.
4
5
6
|
# File 'lib/rbbt/rest/client/step.rb', line 4
def base_url
@base_url
end
|
Returns the value of attribute inputs.
4
5
6
|
# File 'lib/rbbt/rest/client/step.rb', line 4
def inputs
@inputs
end
|
#is_exec ⇒ Object
Returns the value of attribute is_exec.
4
5
6
|
# File 'lib/rbbt/rest/client/step.rb', line 4
def is_exec
@is_exec
end
|
#result_description ⇒ Object
Returns the value of attribute result_description.
4
5
6
|
# File 'lib/rbbt/rest/client/step.rb', line 4
def result_description
@result_description
end
|
#result_type ⇒ Object
Returns the value of attribute result_type.
4
5
6
|
# File 'lib/rbbt/rest/client/step.rb', line 4
def result_type
@result_type
end
|
#task ⇒ Object
Returns the value of attribute task.
4
5
6
|
# File 'lib/rbbt/rest/client/step.rb', line 4
def task
@task
end
|
#url ⇒ Object
Returns the value of attribute url.
4
5
6
|
# File 'lib/rbbt/rest/client/step.rb', line 4
def url
@url
end
|
Class Method Details
.get_streams(inputs) ⇒ Object
6
7
8
9
10
11
12
13
|
# File 'lib/rbbt/rest/client/step.rb', line 6
def self.get_streams(inputs)
inputs.each do |k,v|
if Step === v
stream = v.get_stream
inputs[k] = stream || v.run
end
end
end
|
Instance Method Details
#_restart ⇒ Object
157
158
159
160
161
162
163
164
165
166
167
168
169
|
# File 'lib/rbbt/rest/client/step.rb', line 157
def _restart
@done = nil
@name = nil
new_inputs = {}
inputs.each do |k,i|
if File === i
new_inputs[k] = File.open(i.path)
else
new_inputs[k] = i
end
end
@inputs = new_inputs
end
|
#clean ⇒ Object
183
184
185
186
187
188
189
190
191
192
193
|
# File 'lib/rbbt/rest/client/step.rb', line 183
def clean
begin
params = {:_update => :clean}
init_job(nil, params)
WorkflowRESTClient.get_raw(url, params)
_restart
rescue Exception
Log.exception $!
end
self
end
|
#done? ⇒ Boolean
50
51
52
|
# File 'lib/rbbt/rest/client/step.rb', line 50
def done?
@done || status.to_s == 'done'
end
|
#exec(*args) ⇒ Object
102
103
104
|
# File 'lib/rbbt/rest/client/step.rb', line 102
def exec(*args)
exec_job
end
|
#exec_job ⇒ Object
150
151
152
153
154
155
|
# File 'lib/rbbt/rest/client/step.rb', line 150
def exec_job
res = WorkflowRESTClient.capture_exception do
RestClient.post(URI.encode(File.join(base_url, task.to_s)), inputs.merge(:_cache_type => :exec, :_format => [:string, :boolean, :tsv, :annotations].include?(result_type) ? :raw : :json))
end
load_res res, result_type == :array ? :json : result_type
end
|
#file(file) ⇒ Object
58
59
60
|
# File 'lib/rbbt/rest/client/step.rb', line 58
def file(file)
WorkflowRESTClient.get_raw(File.join(url, 'file', file))
end
|
#fork ⇒ Object
75
76
77
|
# File 'lib/rbbt/rest/client/step.rb', line 75
def fork
init_job(:asynchronous)
end
|
#get ⇒ Object
113
114
115
116
117
118
119
120
121
122
123
124
|
# File 'lib/rbbt/rest/client/step.rb', line 113
def get
params ||= {}
params = params.merge(:_format => [:string, :boolean, :tsv, :annotations,:array].include?(result_type.to_sym) ? :raw : :json )
Misc.insist 3, rand(2) + 1 do
begin
WorkflowRESTClient.get_raw(url, params)
rescue
Log.exception $!
raise $!
end
end
end
|
#info(check_lock = false) ⇒ Object
31
32
33
34
35
36
37
38
39
40
|
# File 'lib/rbbt/rest/client/step.rb', line 31
def info(check_lock=false)
@done = @info and @info[:status] and @info[:status].to_sym == :done
@info = Persist.memory("RemoteSteps Info", :url => @url, :persist => !!@done) do
init_job unless url
info = WorkflowRESTClient.get_json(File.join(url, 'info'))
info = WorkflowRESTClient.fix_hash(info)
info[:status] = info[:status].to_sym if String === info[:status]
info
end
end
|
#init_job(cache_type = nil, other_params = {}) ⇒ Object
64
65
66
67
68
69
70
71
72
|
# File 'lib/rbbt/rest/client/step.rb', line 64
def init_job(cache_type = nil, other_params = {})
cache_type = :asynchronous if cache_type.nil? and not @is_exec
cache_type = :exec if cache_type.nil?
@name ||= Persist.memory("RemoteSteps", :workflow => self, :task => task, :jobname => @name, :inputs => inputs, :cache_type => cache_type) do
WorkflowRESTClient.post_jobname(File.join(base_url, task.to_s), inputs.merge(other_params).merge(:jobname => @name||@base_name, :_cache_type => cache_type))
end
@url = File.join(base_url, task.to_s, @name)
nil
end
|
#join ⇒ Object
106
107
108
109
110
111
|
# File 'lib/rbbt/rest/client/step.rb', line 106
def join
sleep 0.2 unless self.done?
sleep 1 unless self.done?
sleep 3 while not self.done?
self
end
|
#load ⇒ Object
145
146
147
148
|
# File 'lib/rbbt/rest/client/step.rb', line 145
def load
params = {}
load_res get
end
|
#load_res(res, result_type = nil) ⇒ Object
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
|
# File 'lib/rbbt/rest/client/step.rb', line 126
def load_res(res, result_type = nil)
join
result_type ||= self.result_type
case result_type
when :string
res
when :boolean
res == "true"
when :tsv
TSV.open(StringIO.new(res))
when :annotations
Annotated.load_tsv(TSV.open(StringIO.new(res)))
when :array
res.split("\n")
else
JSON.parse res
end
end
|
#name ⇒ Object
21
22
23
24
|
# File 'lib/rbbt/rest/client/step.rb', line 21
def name
return nil if @is_exec
(Array === @url ? @url.first : @url).split("/").last
end
|
#path ⇒ Object
83
84
85
86
|
# File 'lib/rbbt/rest/client/step.rb', line 83
def path
init_job
@url + '?_format=raw'
end
|
#recursive_clean ⇒ Object
171
172
173
174
175
176
177
178
179
180
181
|
# File 'lib/rbbt/rest/client/step.rb', line 171
def recursive_clean
begin
params = {:_update => :recursive_clean}
init_job(nil, params)
WorkflowRESTClient.get_raw(url, params)
_restart
rescue Exception
Log.exception $!
end
self
end
|
#run(noload = false) ⇒ Object
88
89
90
91
92
93
94
95
96
97
98
99
100
|
# File 'lib/rbbt/rest/client/step.rb', line 88
def run(noload = false)
@mutex.synchronize do
@result ||= begin
if @is_exec
exec_job
else
init_job
self.load
end
end
end
noload ? path + '?_format=raw' : @result
end
|
#running? ⇒ Boolean
79
80
81
|
# File 'lib/rbbt/rest/client/step.rb', line 79
def running?
! %w(done error aborted).include? status.to_s
end
|
#status ⇒ Object
42
43
44
45
46
47
48
|
# File 'lib/rbbt/rest/client/step.rb', line 42
def status
begin
info[:status]
ensure
@info = nil
end
end
|
#task_name ⇒ Object
26
27
28
29
|
# File 'lib/rbbt/rest/client/step.rb', line 26
def task_name
init_job
(Array === @url ? @url.first : @url).split("/")[-2]
end
|