Class: OpenTox::Task

Inherits:
Object
  • Object
show all
Includes:
OpenTox
Defined in:
lib/task.rb

Overview

Class for handling asynchronous tasks

Instance Attribute Summary collapse

Attributes included from OpenTox

#metadata, #uri

Class Method Summary collapse

Instance Method Summary collapse

Methods included from OpenTox

#add_metadata, #delete, login, text_to_html

Constructor Details

#initialize(uri = nil) ⇒ Task

Returns a new instance of Task.



9
10
11
12
13
14
15
16
17
18
19
20
# File 'lib/task.rb', line 9

def initialize(uri=nil)
  super uri
  @metadata = {
    DC.title => "",
    DC.date => "",
    OT.hasStatus => "Running",
    OT.percentageCompleted => 0.0,
    OT.resultURI => "",
    DC.creator => "", # not mandatory according to API
    DC.description => "", # not mandatory according to API
  }
end

Instance Attribute Details

#due_to_timeObject

Returns the value of attribute due_to_time.



7
8
9
# File 'lib/task.rb', line 7

def due_to_time
  @due_to_time
end

#http_codeObject

Returns the value of attribute http_code.



7
8
9
# File 'lib/task.rb', line 7

def http_code
  @http_code
end

Class Method Details

.all(uri = CONFIG[:services]["opentox-task"]) ⇒ text/uri-list

Get a list of all tasks

Parameters:

  • uri (optional, String) (defaults to: CONFIG[:services]["opentox-task"])

    URI of task service

Returns:

  • (text/uri-list)

    Task URIs



100
101
102
# File 'lib/task.rb', line 100

def self.all(uri=CONFIG[:services]["opentox-task"])
  OpenTox.all uri
end

.create(title = nil, creator = nil, max_duration = DEFAULT_TASK_MAX_DURATION, description = nil) ⇒ OPenTox::Task

Create a new task for the code in the block. Catches halts and exceptions and sets task state to error if necessary. The block has to return the URI of the created resource.

Examples:

task = OpenTox::Task.create do
  # this code will be executed as a task
  model = OpenTox::Algorithm.run(params) # this can be time consuming
  model.uri # Important: return URI of the created resource
end
task.status # returns "Running", because tasks are forked

Parameters:

  • title (String) (defaults to: nil)

    Task title

  • creator (String) (defaults to: nil)

    Task creator

Returns:

  • (OPenTox::Task)

    Task



33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
# File 'lib/task.rb', line 33

def self.create( title=nil, creator=nil, max_duration=DEFAULT_TASK_MAX_DURATION, description=nil )
  
  params = {:title=>title, :creator=>creator, :max_duration=>max_duration, :description=>description }
  task_uri = RestClientWrapper.post(CONFIG[:services]["opentox-task"], params, {}, nil, false).to_s
  task = Task.new(task_uri.chomp)

  # measure current memory consumption
  memory = `free -m|sed -n '2p'`.split
  free_memory = memory[3].to_i + memory[6].to_i # include cache
  if free_memory < 20 # require at least 200 M free memory
    LOGGER.warn "Cannot start task  - not enough memory left (#{free_memory} M free)"
    task.cancel
    return task
    #raise "Insufficient memory to start a new task"
  end

  cpu_load = `cat /proc/loadavg`.split(/\s+/)[0..2].collect{|c| c.to_f}
  nr_cpu_cores = `cat /proc/cpuinfo |grep "cpu cores"|cut -d ":" -f2|tr -d " "`.split("\n").collect{|c| c.to_i}.inject{|sum,n| sum+n}
  nr_cpu_cores = 1 if !nr_cpu_cores
  #if cpu_load[0] > nr_cpu_cores and cpu_load[0] > cpu_load[1] and cpu_load[1] > cpu_load[2] # average CPU load of the last minute is high and CPU load is increasing
  #  LOGGER.warn "Cannot start task  - CPU load too high (#{cpu_load.join(", ")})"
  #  task.cancel
  #  return task
  #  #raise "Server too busy to start a new task"
  #end

  task_pid = Spork.spork(:logger => LOGGER) do
    LOGGER.debug "Task #{task.uri} started #{Time.now}"
    begin
      result = yield task
      LOGGER.debug "Task #{task.uri} done #{Time.now} -> "+result.to_s
      task.completed(result)
    rescue => error
      LOGGER.error "task failed: "+error.class.to_s+": "+error.message
      LOGGER.error ":\n"+error.backtrace.join("\n")
      task.error(OpenTox::ErrorReport.create(error, creator))
    end
  end  
  task.pid = task_pid
  LOGGER.debug "Started task: "+task.uri.to_s
  task
end

.exist?(uri) ⇒ OpenTox::Task

Find a task for querying, status changes

Parameters:

Returns:



90
91
92
93
94
95
# File 'lib/task.rb', line 90

def self.exist?(uri)
  begin
    return find(uri)
  rescue
  end 
end

.find(uri) ⇒ OpenTox::Task

Find a task for querying, status changes

Parameters:

Returns:



79
80
81
82
83
84
85
# File 'lib/task.rb', line 79

def self.find(uri)
  return nil unless uri
  task = Task.new(uri)
  task.
  raise "could not load task metadata" if task.==nil or task..size==0
  task
end

.from_rdfxml(rdfxml) ⇒ Object



108
109
110
111
112
113
# File 'lib/task.rb', line 108

def self.from_rdfxml(rdfxml)
  owl = OpenTox::Parser::Owl.from_rdf(rdfxml, OT.Task)
  task = Task.new(owl.uri)
  task.(owl.)
  task
end

.from_yaml(yaml) ⇒ Object



104
105
106
# File 'lib/task.rb', line 104

def self.from_yaml(yaml)
  @metadata = YAML.load(yaml)
end

Instance Method Details

#add_error_report(error_report) ⇒ Object

not stored just for to_rdf



156
157
158
# File 'lib/task.rb', line 156

def add_error_report( error_report )
  @error_report = error_report
end

#cancelObject



139
140
141
142
# File 'lib/task.rb', line 139

def cancel
  RestClientWrapper.put(File.join(@uri,'Cancelled'),{:cannot_be => "empty"})
  
end

#completed(uri) ⇒ Object



144
145
146
147
# File 'lib/task.rb', line 144

def completed(uri)
  RestClientWrapper.put(File.join(@uri,'Completed'),{:resultURI => uri})
  
end

#completed?Boolean

Returns:

  • (Boolean)


168
169
170
# File 'lib/task.rb', line 168

def completed?
  @metadata[OT.hasStatus] == 'Completed'
end

#descriptionObject



131
132
133
# File 'lib/task.rb', line 131

def description
  @metadata[DC.description]
end

#error(error_report) ⇒ Object



149
150
151
152
153
# File 'lib/task.rb', line 149

def error(error_report)
  raise "no error report" unless error_report.is_a?(OpenTox::ErrorReport)
  RestClientWrapper.put(File.join(@uri,'Error'),{:errorReport => error_report.to_yaml})
  
end

#error?Boolean

Returns:

  • (Boolean)


172
173
174
# File 'lib/task.rb', line 172

def error?
  @metadata[OT.hasStatus] == 'Error'
end

#errorReportObject



135
136
137
# File 'lib/task.rb', line 135

def errorReport
  @metadata[OT.errorReport]
end

#load_metadataObject



176
177
178
179
180
181
182
183
184
185
186
# File 'lib/task.rb', line 176

def 
  if (CONFIG[:yaml_hosts].include?(URI.parse(@uri).host))
    result = RestClientWrapper.get(@uri, {:accept => 'application/x-yaml'}, nil, false)
    @metadata = YAML.load result.to_s
    @http_code = result.code
  else
    @metadata = Parser::Owl::Generic.new(@uri).
    @http_code = RestClientWrapper.get(uri, {:accept => 'application/rdf+xml'}, nil, false).code
  end
  raise "could not load task metadata for task "+@uri.to_s if @metadata==nil || @metadata.size==0
end

#pid=(pid) ⇒ Object



160
161
162
# File 'lib/task.rb', line 160

def pid=(pid)
  RestClientWrapper.put(File.join(@uri,'pid'), {:pid => pid})
end

#progress(pct) ⇒ Object

updates percentageCompleted value (can only be increased) task has to be running

Parameters:

  • pct (Numeric)

    value between 0 and 100



262
263
264
265
266
267
268
269
# File 'lib/task.rb', line 262

def progress(pct)
  #puts "task := "+pct.to_s
  raise "no numeric >= 0 and <= 100 : '"+pct.to_s+"'" unless pct.is_a?(Numeric) and pct>=0 and pct<=100
  if (pct > @metadata[OT.percentageCompleted] + 0.0001)
    RestClientWrapper.put(File.join(@uri,'Running'),{:percentageCompleted => pct})
    
  end
end

#result_uriObject



127
128
129
# File 'lib/task.rb', line 127

def result_uri
  @metadata[OT.resultURI]
end

#running?Boolean

Returns:

  • (Boolean)


164
165
166
# File 'lib/task.rb', line 164

def running?
  @metadata[OT.hasStatus] == 'Running'
end

#statusObject



123
124
125
# File 'lib/task.rb', line 123

def status
  @metadata[OT.hasStatus]
end

#to_rdfxmlObject



115
116
117
118
119
120
121
# File 'lib/task.rb', line 115

def to_rdfxml
  s = Serializer::Owl.new
  @metadata[OT.errorReport] = @uri+"/ErrorReport/tmpId" if @error_report
  s.add_task(@uri,@metadata)
  s.add_resource(@uri+"/ErrorReport/tmpId", OT.errorReport, @error_report.rdf_content) if @error_report
  s.to_rdfxml
end

#wait_for_completion(waiting_task = nil, dur = 0.3) ⇒ Object

waits for a task, unless time exceeds or state is no longer running

Parameters:

  • waiting_task (optional, OpenTox::Task) (defaults to: nil)

    (can be a OpenTox::Subtask as well), progress is updated accordingly

  • dur (optional, Numeric) (defaults to: 0.3)

    seconds pausing before cheking again for completion



237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
# File 'lib/task.rb', line 237

def wait_for_completion( waiting_task=nil, dur=0.3)
  
  waiting_task.waiting_for(self.uri) if waiting_task
  due_to_time = Time.new + DEFAULT_TASK_MAX_DURATION
  LOGGER.debug "start waiting for task "+@uri.to_s+" at: "+Time.new.to_s+", waiting at least until "+due_to_time.to_s
  
   # for extremely fast tasks
  check_state
  while self.running?
    sleep dur
     
    # if another (sub)task is waiting for self, set progress accordingly 
    waiting_task.progress(@metadata[OT.percentageCompleted].to_f) if waiting_task
    check_state
    if (Time.new > due_to_time)
      raise "max wait time exceeded ("+DEFAULT_TASK_MAX_DURATION.to_s+"sec), task: '"+@uri.to_s+"'"
    end
  end
  waiting_task.waiting_for(nil) if waiting_task
  LOGGER.debug "Task '"+@metadata[OT.hasStatus].to_s+"': "+@uri.to_s+", Result: "+@metadata[OT.resultURI].to_s
end

#waiting_for(task_uri) ⇒ Object



271
272
273
# File 'lib/task.rb', line 271

def waiting_for(task_uri)
  RestClientWrapper.put(File.join(@uri,'Running'),{:waiting_for => task_uri})
end