Class: Task::Job

Inherits:
Object
  • Object
show all
Defined in:
lib/rbbt/util/task/job.rb

Constant Summary collapse

IDSEP =
"_"

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(task, id, name, options = nil, previous_jobs = nil, required_files = nil, input = nil) ⇒ Job

Returns a new instance of Job.



20
21
22
23
24
25
26
27
28
29
30
31
# File 'lib/rbbt/util/task/job.rb', line 20

def initialize(task, id, name, options = nil, previous_jobs = nil, required_files = nil, input = nil)
  @task = task  
  @id =id
  @name = name
  @options = options || {}
  @previous_jobs = previous_jobs || []
  @required_files = required_files || []
  @input = input

  basedir = task.workflow.jobdir unless task.workflow.nil?
  @path = File.join(basedir || Task.basedir, task.name, id)
end

Instance Attribute Details

#idObject

Returns the value of attribute id.



5
6
7
# File 'lib/rbbt/util/task/job.rb', line 5

def id
  @id
end

#input(name = nil) ⇒ Object

Returns the value of attribute input.



5
6
7
# File 'lib/rbbt/util/task/job.rb', line 5

def input
  @input
end

#nameObject

Returns the value of attribute name.



5
6
7
# File 'lib/rbbt/util/task/job.rb', line 5

def name
  @name
end

#optionsObject

Returns the value of attribute options.



5
6
7
# File 'lib/rbbt/util/task/job.rb', line 5

def options
  @options
end

#pathObject

Returns the value of attribute path.



5
6
7
# File 'lib/rbbt/util/task/job.rb', line 5

def path
  @path
end

#pidObject

Returns the value of attribute pid.



5
6
7
# File 'lib/rbbt/util/task/job.rb', line 5

def pid
  @pid
end

#previous_jobsObject

Returns the value of attribute previous_jobs.



5
6
7
# File 'lib/rbbt/util/task/job.rb', line 5

def previous_jobs
  @previous_jobs
end

#required_filesObject

Returns the value of attribute required_files.



5
6
7
# File 'lib/rbbt/util/task/job.rb', line 5

def required_files
  @required_files
end

#taskObject

Returns the value of attribute task.



5
6
7
# File 'lib/rbbt/util/task/job.rb', line 5

def task
  @task
end

Class Method Details

.id2name(job_id) ⇒ Object



9
10
11
# File 'lib/rbbt/util/task/job.rb', line 9

def self.id2name(job_id)
  job_id.split(IDSEP)
end

.load(task, id) ⇒ Object



13
14
15
16
17
18
# File 'lib/rbbt/util/task/job.rb', line 13

def self.load(task, id)
  name, hash = id2name(id)
  job = self.new task, id, name, nil, nil
  job.load_dependencies
  job
end

Instance Method Details

#abortObject



122
123
124
125
126
# File 'lib/rbbt/util/task/job.rb', line 122

def abort
  if @pid
    Process.kill("INT", @pid)
  end
end

#aborted?Boolean

Returns:

  • (Boolean)


136
137
138
# File 'lib/rbbt/util/task/job.rb', line 136

def aborted?
  step == :aborted 
end

#all_inputsObject



44
45
46
47
48
49
50
51
52
53
# File 'lib/rbbt/util/task/job.rb', line 44

def all_inputs
  if true or not defined? @all_inputs
    @all_inputs = {}
    previous_jobs_rec.each do |job| @all_inputs[job.task.name] = job end
    @all_inputs.extend IndiferentHash
    @all_inputs
  else
    @all_inputs
  end
end

#argumentsObject



140
141
142
# File 'lib/rbbt/util/task/job.rb', line 140

def arguments
  options.values_at *task.options
end

#blockObject



144
145
146
# File 'lib/rbbt/util/task/job.rb', line 144

def block
  task.block
end

#cleanObject



295
296
297
298
299
300
# File 'lib/rbbt/util/task/job.rb', line 295

def clean
  FileUtils.rm path if File.exist? path
  FileUtils.rm info_file if File.exist? info_file
  FileUtils.rm_rf path + '.files' if File.exist? path + '.files'
  self
end

#done?Boolean

Returns:

  • (Boolean)


128
129
130
# File 'lib/rbbt/util/task/job.rb', line 128

def done?
  [:done, :error, :aborted].include? info[:step]
end

#error?Boolean

Returns:

  • (Boolean)


132
133
134
# File 'lib/rbbt/util/task/job.rb', line 132

def error?
  step == :error or step == :aborted
end

#files(file = nil, data = nil) ⇒ Object



111
112
113
114
115
116
117
118
119
120
# File 'lib/rbbt/util/task/job.rb', line 111

def files(file = nil, data = nil)
  return Dir.glob(File.join(path + '.files/*')).collect{|f| File.basename(f)} if file.nil?

  filename = Resource::Path.path(File.join(path + '.files', file.to_s))
  if data.nil?
    filename
  else
    Open.write(filename, data)
  end
end

#forkObject



237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
# File 'lib/rbbt/util/task/job.rb', line 237

def fork
  return self if recursive_done?
  @pid = Process.fork do
    begin
      step(:started)
      start
      step(:done)
    rescue Exception
      Log.debug $!.message
      Log.debug $!.backtrace * "\n"
      step(:error, "#{$!.class}: #{$!.message}")
    end
    exit
  end

  self
end

#infoObject



79
80
81
82
83
# File 'lib/rbbt/util/task/job.rb', line 79

def info
  return {} if not File.exist?(info_file)
  info = YAML.load(File.open(info_file)) || {}
  info.extend IndiferentHash
end

#info_fileObject



75
76
77
# File 'lib/rbbt/util/task/job.rb', line 75

def info_file
  path + '.info'
end

#joinObject



255
256
257
258
259
260
261
262
263
264
265
266
# File 'lib/rbbt/util/task/job.rb', line 255

def join
  if @pid.nil?
    while not done? do
      Log.debug "Waiting: #{info[:step]}"
      sleep 5
    end
  else
    Process.waitpid @pid
  end

  self
end

#load(*args) ⇒ Object



276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
# File 'lib/rbbt/util/task/job.rb', line 276

def load(*args)
  case task.persistence
  when :float
    Open.read(path).to_f
  when :integer
    Open.read(path).to_i
  when :string
    Open.read(path)
  when :tsv
    TSV.new(path, *args)
  when :marshal
    Marshal.load(Open.read(path))
  when :yaml
    YAML.load(Open.read(path))
  when nil
    nil
  end
end

#load_dependenciesObject



164
165
166
167
168
169
170
# File 'lib/rbbt/util/task/job.rb', line 164

def load_dependencies
  @previous_jobs = info[:previous_jobs].collect do |job_string| 
    job_string =~ /JOB:(.*)\/(.*)/
    task.workflow.load_job($1, $2)
  end if info[:previous_jobs]
  @required_files = info[:required_files] if info[:required_files]
end

#messagesObject



107
108
109
# File 'lib/rbbt/util/task/job.rb', line 107

def messages
  info[:messages] || []
end

#openObject



268
269
270
# File 'lib/rbbt/util/task/job.rb', line 268

def open
  File.open(path)
end

#previous_jobs_recObject



33
34
35
36
37
# File 'lib/rbbt/util/task/job.rb', line 33

def previous_jobs_rec
  return [] if previous_jobs.nil?
  prev = previous_jobs + previous_jobs.collect{|job| job.previous_jobs_rec}.flatten
  NamedArray.name prev, prev.collect{|job| job.task.name}
end

#readObject



272
273
274
# File 'lib/rbbt/util/task/job.rb', line 272

def read
  File.open(path) do |f| f.read end
end

#recursive_cleanObject



302
303
304
305
# File 'lib/rbbt/util/task/job.rb', line 302

def recursive_clean
  previous_jobs.each do |job| job.recursive_clean end unless previous_jobs.nil?
  clean
end

#recursive_done?Boolean

Returns:

  • (Boolean)


218
219
220
# File 'lib/rbbt/util/task/job.rb', line 218

def recursive_done?
  (previous_jobs || []).inject(true){|acc,j| acc and j.recursive_done?} and done? and not error? 
end

#runObject



222
223
224
225
226
227
228
229
230
231
232
233
234
235
# File 'lib/rbbt/util/task/job.rb', line 222

def run
  return self if recursive_done?
  begin
    FileUtils.rm info_file if File.exist? info_file
    step(:started)
    start
    step(:done)
  rescue Exception
    Log.debug $!.message
    Log.debug $!.backtrace * "\n"
    step(:error, "#{$!.class}: #{$!.message}")
  end
  self
end

#run_dependenciesObject



148
149
150
151
152
153
154
155
156
157
# File 'lib/rbbt/util/task/job.rb', line 148

def run_dependencies
  required_files.each do |file| file.produce unless File.exist? file end unless required_files.nil?
  previous_jobs.each do |job| 
    if not job.recursive_done? 
      job.clean if job.error?
      job.start
      job.step :done unless job.step == :error or job.step == :aborted
    end
  end unless previous_jobs.nil?
end

#save_dependenciesObject



159
160
161
162
# File 'lib/rbbt/util/task/job.rb', line 159

def save_dependencies
  set_info :previous_jobs, @previous_jobs.collect{|job| "JOB:#{job.task.name}/#{job.id}"}  unless @previous_jobs.nil?
  set_info :required_files, @required_files.collect{|file| file.responds_to? :find ? file.find : file} if @required_files.nil?
end

#save_options(options) ⇒ Object



205
206
207
208
209
210
211
212
213
214
215
216
# File 'lib/rbbt/util/task/job.rb', line 205

def save_options(options)
  new_options = {}
  options.each do |key, value|
    case 
    when TSV === value
      new_options[key] = value.to_s
    else
      new_options[key] = value
    end
  end
  set_info(:options, new_options)
end

#set_info(key, value) ⇒ Object



85
86
87
88
89
90
# File 'lib/rbbt/util/task/job.rb', line 85

def set_info(key, value)
  Misc.lock(info_file, key, value) do |info_file, key, value| i = self.info
    new_info = i.merge(key => value)
    Open.write(info_file, new_info.to_yaml)
  end
end

#startObject



172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
# File 'lib/rbbt/util/task/job.rb', line 172

def start
  begin
    run_dependencies

    Log.medium("[#{task.name}] Starting Job '#{ name }'. Path: '#{ path }'")
    set_info(:start_time, Time.now)
    save_options(options)
    save_dependencies

    extend task.scope unless task.scope.nil? or Object == task.scope.class

    result = instance_exec *arguments, &block

    if not result.nil?
      case task.persistence
      when nil, :string, :tsv, :integer
        Open.write(path, result.to_s)
      when :marshal
        Open.write(path, Marshal.dump(result))
      when :yaml
        Open.write(path, YAML.dump(result))
      end
    end

    set_info(:end_time, Time.now)
    Log.medium("[#{task.name}] Finished Job '#{ name }'. Path: '#{ path }'")
  rescue Exception
    set_info(:exception_backtrace, $!.backtrace)
    step(:error, "#{$!.class}: #{$!.message}")
    raise $!
  end
end

#step(name = nil, message = nil) ⇒ Object



92
93
94
95
96
97
98
99
100
101
102
103
104
105
# File 'lib/rbbt/util/task/job.rb', line 92

def step(name = nil, message = nil)
  @previous_jobs
  if name.nil?
    info[:step]
  else
    set_info(:step, name)
    if message.nil?
      Log.info "[#{task.name}] Step '#{name}'"
    else
      Log.info "[#{task.name}] Step '#{name}': #{message.chomp}"
      set_info(:messages, info[:messages] || [] << message) if not message.nil?
    end
  end
end