Class: OpenTox::Task
Overview
Class for handling asynchronous tasks
Instance Attribute Summary collapse
Attributes included from OpenTox
#metadata, #uri
Class Method Summary
collapse
Instance Method Summary
collapse
Methods included from OpenTox
#add_metadata, #delete, login, text_to_html
Constructor Details
#initialize(uri = nil) ⇒ Task
Returns a new instance of Task.
9
10
11
12
13
14
15
16
17
18
19
20
|
# File 'lib/task.rb', line 9
def initialize(uri=nil)
super uri
@metadata = {
DC.title => "",
DC.date => "",
OT.hasStatus => "Running",
OT.percentageCompleted => 0.0,
OT.resultURI => "",
DC.creator => "", DC.description => "", }
end
|
Instance Attribute Details
#due_to_time ⇒ Object
Returns the value of attribute due_to_time.
7
8
9
|
# File 'lib/task.rb', line 7
def due_to_time
@due_to_time
end
|
#http_code ⇒ Object
Returns the value of attribute http_code.
7
8
9
|
# File 'lib/task.rb', line 7
def http_code
@http_code
end
|
Class Method Details
.all(uri = CONFIG[:services]["opentox-task"]) ⇒ text/uri-list
100
101
102
|
# File 'lib/task.rb', line 100
def self.all(uri=CONFIG[:services]["opentox-task"])
OpenTox.all uri
end
|
.create(title = nil, creator = nil, max_duration = DEFAULT_TASK_MAX_DURATION, description = nil) ⇒ OPenTox::Task
Create a new task for the code in the block. Catches halts and exceptions and sets task state to error if necessary. The block has to return the URI of the created resource.
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
|
# File 'lib/task.rb', line 33
def self.create( title=nil, creator=nil, max_duration=DEFAULT_TASK_MAX_DURATION, description=nil )
params = {:title=>title, :creator=>creator, :max_duration=>max_duration, :description=>description }
task_uri = RestClientWrapper.post(CONFIG[:services]["opentox-task"], params, {}, nil, false).to_s
task = Task.new(task_uri.chomp)
memory = `free -m|sed -n '2p'`.split
free_memory = memory[3].to_i + memory[6].to_i if free_memory < 20 LOGGER.warn "Cannot start task - not enough memory left (#{free_memory} M free)"
task.cancel
return task
end
cpu_load = `cat /proc/loadavg`.split(/\s+/)[0..2].collect{|c| c.to_f}
nr_cpu_cores = `cat /proc/cpuinfo |grep "cpu cores"|cut -d ":" -f2|tr -d " "`.split("\n").collect{|c| c.to_i}.inject{|sum,n| sum+n}
nr_cpu_cores = 1 if !nr_cpu_cores
task_pid = Spork.spork(:logger => LOGGER) do
LOGGER.debug "Task #{task.uri} started #{Time.now}"
begin
result = yield task
LOGGER.debug "Task #{task.uri} done #{Time.now} -> "+result.to_s
task.completed(result)
rescue => error
LOGGER.error "task failed: "+error.class.to_s+": "+error.message
LOGGER.error ":\n"+error.backtrace.join("\n")
task.error(OpenTox::ErrorReport.create(error, creator))
end
end
task.pid = task_pid
LOGGER.debug "Started task: "+task.uri.to_s
task
end
|
Find a task for querying, status changes
90
91
92
93
94
95
|
# File 'lib/task.rb', line 90
def self.exist?(uri)
begin
return find(uri)
rescue
end
end
|
Find a task for querying, status changes
79
80
81
82
83
84
85
|
# File 'lib/task.rb', line 79
def self.find(uri)
return nil unless uri
task = Task.new(uri)
task.load_metadata
raise "could not load task metadata" if task.metadata==nil or task.metadata.size==0
task
end
|
.from_rdfxml(rdfxml) ⇒ Object
108
109
110
111
112
113
|
# File 'lib/task.rb', line 108
def self.from_rdfxml(rdfxml)
owl = OpenTox::Parser::Owl.from_rdf(rdfxml, OT.Task)
task = Task.new(owl.uri)
task.add_metadata(owl.metadata)
task
end
|
.from_yaml(yaml) ⇒ Object
104
105
106
|
# File 'lib/task.rb', line 104
def self.from_yaml(yaml)
@metadata = YAML.load(yaml)
end
|
Instance Method Details
#add_error_report(error_report) ⇒ Object
not stored just for to_rdf
156
157
158
|
# File 'lib/task.rb', line 156
def add_error_report( error_report )
@error_report = error_report
end
|
#cancel ⇒ Object
139
140
141
142
|
# File 'lib/task.rb', line 139
def cancel
RestClientWrapper.put(File.join(@uri,'Cancelled'),{:cannot_be => "empty"})
load_metadata
end
|
#completed(uri) ⇒ Object
144
145
146
147
|
# File 'lib/task.rb', line 144
def completed(uri)
RestClientWrapper.put(File.join(@uri,'Completed'),{:resultURI => uri})
load_metadata
end
|
#completed? ⇒ Boolean
168
169
170
|
# File 'lib/task.rb', line 168
def completed?
@metadata[OT.hasStatus] == 'Completed'
end
|
#description ⇒ Object
131
132
133
|
# File 'lib/task.rb', line 131
def description
@metadata[DC.description]
end
|
#error(error_report) ⇒ Object
149
150
151
152
153
|
# File 'lib/task.rb', line 149
def error(error_report)
raise "no error report" unless error_report.is_a?(OpenTox::ErrorReport)
RestClientWrapper.put(File.join(@uri,'Error'),{:errorReport => error_report.to_yaml})
load_metadata
end
|
#error? ⇒ Boolean
172
173
174
|
# File 'lib/task.rb', line 172
def error?
@metadata[OT.hasStatus] == 'Error'
end
|
#errorReport ⇒ Object
135
136
137
|
# File 'lib/task.rb', line 135
def errorReport
@metadata[OT.errorReport]
end
|
176
177
178
179
180
181
182
183
184
185
186
|
# File 'lib/task.rb', line 176
def load_metadata
if (CONFIG[:yaml_hosts].include?(URI.parse(@uri).host))
result = RestClientWrapper.get(@uri, {:accept => 'application/x-yaml'}, nil, false)
@metadata = YAML.load result.to_s
@http_code = result.code
else
@metadata = Parser::Owl::Generic.new(@uri).load_metadata
@http_code = RestClientWrapper.get(uri, {:accept => 'application/rdf+xml'}, nil, false).code
end
raise "could not load task metadata for task "+@uri.to_s if @metadata==nil || @metadata.size==0
end
|
#pid=(pid) ⇒ Object
160
161
162
|
# File 'lib/task.rb', line 160
def pid=(pid)
RestClientWrapper.put(File.join(@uri,'pid'), {:pid => pid})
end
|
#progress(pct) ⇒ Object
updates percentageCompleted value (can only be increased) task has to be running
262
263
264
265
266
267
268
269
|
# File 'lib/task.rb', line 262
def progress(pct)
raise "no numeric >= 0 and <= 100 : '"+pct.to_s+"'" unless pct.is_a?(Numeric) and pct>=0 and pct<=100
if (pct > @metadata[OT.percentageCompleted] + 0.0001)
RestClientWrapper.put(File.join(@uri,'Running'),{:percentageCompleted => pct})
load_metadata
end
end
|
#result_uri ⇒ Object
127
128
129
|
# File 'lib/task.rb', line 127
def result_uri
@metadata[OT.resultURI]
end
|
#running? ⇒ Boolean
164
165
166
|
# File 'lib/task.rb', line 164
def running?
@metadata[OT.hasStatus] == 'Running'
end
|
#status ⇒ Object
123
124
125
|
# File 'lib/task.rb', line 123
def status
@metadata[OT.hasStatus]
end
|
#to_rdfxml ⇒ Object
115
116
117
118
119
120
121
|
# File 'lib/task.rb', line 115
def to_rdfxml
s = Serializer::Owl.new
@metadata[OT.errorReport] = @uri+"/ErrorReport/tmpId" if @error_report
s.add_task(@uri,@metadata)
s.add_resource(@uri+"/ErrorReport/tmpId", OT.errorReport, @error_report.rdf_content) if @error_report
s.to_rdfxml
end
|
#wait_for_completion(waiting_task = nil, dur = 0.3) ⇒ Object
waits for a task, unless time exceeds or state is no longer running
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
|
# File 'lib/task.rb', line 237
def wait_for_completion( waiting_task=nil, dur=0.3)
waiting_task.waiting_for(self.uri) if waiting_task
due_to_time = Time.new + DEFAULT_TASK_MAX_DURATION
LOGGER.debug "start waiting for task "+@uri.to_s+" at: "+Time.new.to_s+", waiting at least until "+due_to_time.to_s
load_metadata check_state
while self.running?
sleep dur
load_metadata
waiting_task.progress(@metadata[OT.percentageCompleted].to_f) if waiting_task
check_state
if (Time.new > due_to_time)
raise "max wait time exceeded ("+DEFAULT_TASK_MAX_DURATION.to_s+"sec), task: '"+@uri.to_s+"'"
end
end
waiting_task.waiting_for(nil) if waiting_task
LOGGER.debug "Task '"+@metadata[OT.hasStatus].to_s+"': "+@uri.to_s+", Result: "+@metadata[OT.resultURI].to_s
end
|
#waiting_for(task_uri) ⇒ Object
271
272
273
|
# File 'lib/task.rb', line 271
def waiting_for(task_uri)
RestClientWrapper.put(File.join(@uri,'Running'),{:waiting_for => task_uri})
end
|