Class: Step

Inherits:
Object
  • Object
show all
Defined in:
lib/rbbt/workflow/step.rb,
lib/rbbt/workflow/accessor.rb,
lib/rbbt/workflow/step/run.rb

Constant Summary collapse

INFO_SERIALIAZER =
Marshal
STREAM_CACHE =
{}
STREAM_CACHE_MUTEX =
Mutex.new

Class Attribute Summary collapse

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(path, task = nil, inputs = nil, dependencies = nil, bindings = nil, clean_name = nil) ⇒ Step

Returns a new instance of Step.



31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
# File 'lib/rbbt/workflow/step.rb', line 31

def initialize(path, task = nil, inputs = nil, dependencies = nil, bindings = nil, clean_name = nil)
  path = Path.setup(Misc.sanitize_filename(path)) if String === path
  path = path.call if Proc === path

  @path = path
  @task = task
  @bindings = bindings
  @dependencies = case
                  when dependencies.nil? 
                    []
                  when Array === dependencies
                    dependencies
                  else
                    [dependencies]
                  end
  @mutex = Mutex.new
  @info_mutex = Mutex.new
  @inputs = inputs || []
  NamedArray.setup @inputs, task.inputs.collect{|s| s.to_s} if task and task.respond_to? :inputs and task.inputs
end

Class Attribute Details

.lock_dirObject

Returns the value of attribute lock_dir.



14
15
16
# File 'lib/rbbt/workflow/step.rb', line 14

def lock_dir
  @lock_dir
end

.log_relay_stepObject

Returns the value of attribute log_relay_step.



74
75
76
# File 'lib/rbbt/workflow/step.rb', line 74

def log_relay_step
  @log_relay_step
end

Instance Attribute Details

#bindingsObject

Returns the value of attribute bindings.



8
9
10
# File 'lib/rbbt/workflow/step.rb', line 8

def bindings
  @bindings
end

#clean_nameObject

Returns the value of attribute clean_name.



8
9
10
# File 'lib/rbbt/workflow/step.rb', line 8

def clean_name
  @clean_name
end

#dependenciesObject

Returns the value of attribute dependencies.



8
9
10
# File 'lib/rbbt/workflow/step.rb', line 8

def dependencies
  @dependencies
end

#duppedObject (readonly)

Returns the value of attribute dupped.



3
4
5
# File 'lib/rbbt/workflow/step/run.rb', line 3

def dupped
  @dupped
end

#exec(no_load = false) ⇒ Object

Returns the value of attribute exec.



10
11
12
# File 'lib/rbbt/workflow/step.rb', line 10

def exec
  @exec
end

#inputsObject

Returns the value of attribute inputs.



8
9
10
# File 'lib/rbbt/workflow/step.rb', line 8

def inputs
  @inputs
end

#mutexObject

Returns the value of attribute mutex.



11
12
13
# File 'lib/rbbt/workflow/step.rb', line 11

def mutex
  @mutex
end

#pathObject

Returns the value of attribute path.



8
9
10
# File 'lib/rbbt/workflow/step.rb', line 8

def path
  @path
end

#pidObject

Returns the value of attribute pid.



9
10
11
# File 'lib/rbbt/workflow/step.rb', line 9

def pid
  @pid
end

#resultObject

Returns the value of attribute result.



11
12
13
# File 'lib/rbbt/workflow/step.rb', line 11

def result
  @result
end

#saved_streamObject (readonly)

Returns the value of attribute saved_stream.



3
4
5
# File 'lib/rbbt/workflow/step/run.rb', line 3

def saved_stream
  @saved_stream
end

#seenObject

Returns the value of attribute seen.



11
12
13
# File 'lib/rbbt/workflow/step.rb', line 11

def seen
  @seen
end

#streamObject (readonly)

Returns the value of attribute stream.



3
4
5
# File 'lib/rbbt/workflow/step/run.rb', line 3

def stream
  @stream
end

#taskObject

Returns the value of attribute task.



8
9
10
# File 'lib/rbbt/workflow/step.rb', line 8

def task
  @task
end

Class Method Details

.clean(path) ⇒ Object



158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
# File 'lib/rbbt/workflow/step.rb', line 158

def self.clean(path)
  info_file = Step.info_file path
  files_dir = Step.files_dir path
  if Open.exists?(path) or Open.exists?(info_file)
    begin
      self.abort if self.running?
    rescue Exception
    end

    @result = nil
    @pid = nil

    Misc.insist do
      Open.rm info_file if Open.exists? info_file
      Open.rm info_file + '.lock' if Open.exists? info_file + '.lock'
      Open.rm path if Open.exists? path
      Open.rm path + '.lock' if Open.exists? path + '.lock'
      Open.rm_rf files_dir if Open.exists? files_dir
    end
  end
end

.dup_stream(stream) ⇒ Object



7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# File 'lib/rbbt/workflow/step/run.rb', line 7

def self.dup_stream(stream)
  case stream
  when IO, File
    return stream if stream.closed?

    STREAM_CACHE_MUTEX.synchronize do
      case current = STREAM_CACHE[stream]
      when nil
        Log.medium "Not duplicating stream #{ Misc.fingerprint(stream) }"
        STREAM_CACHE[stream] = stream
      when File
        if Open.exists? current.path 
          Log.medium "Reopening file #{ Misc.fingerprint(current) }"
          Open.open(current.path)
        else
          Log.medium "Duplicating file #{ Misc.fingerprint(current) } #{current.inspect}"
          Misc.dup_stream(current)
        end

      else
        Log.medium "Duplicating stream #{ Misc.fingerprint(stream) }"
        Misc.dup_stream(current)
      end
    end
  when TSV::Dumper#, TSV::Parser
    stream = stream.stream
    return stream if stream.closed?

    STREAM_CACHE_MUTEX.synchronize do
      if STREAM_CACHE[stream].nil?
        Log.high "Not duplicating dumper #{ stream.inspect }"
        STREAM_CACHE[stream] = stream
      else
        new = Misc.dup_stream(STREAM_CACHE[stream])
        Log.high "Duplicating dumper #{ stream.inspect } into #{new.inspect}"
        new
      end
    end
  else
    stream
  end
end

.files_dir(path) ⇒ Object



26
27
28
# File 'lib/rbbt/workflow/accessor.rb', line 26

def self.files_dir(path)
  path.nil? ? nil : path + '.files'
end

.info_file(path) ⇒ Object



30
31
32
# File 'lib/rbbt/workflow/accessor.rb', line 30

def self.info_file(path)
  path.nil? ? nil : path + '.info'
end

.job_name_for_info_file(info_file, extension = nil) ⇒ Object



45
46
47
48
49
50
51
# File 'lib/rbbt/workflow/accessor.rb', line 45

def self.job_name_for_info_file(info_file, extension = nil)
  if extension and not extension.empty?
    info_file.sub(/\.#{extension}\.info$/,'')
  else
    info_file.sub(/\.info$/,'')
  end
end

.log(status, message, path, &block) ⇒ Object



225
226
227
228
229
230
231
232
233
234
235
# File 'lib/rbbt/workflow/accessor.rb', line 225

def self.log(status, message, path, &block)
  if block
    if Hash === message
      log_progress(status, message, path, &block)
    else
      log_block(status, message, path, &block)
    end
  else
    log_string(status, message, path)
  end
end

.log_block(staus, message, path, &block) ⇒ Object



170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
# File 'lib/rbbt/workflow/accessor.rb', line 170

def self.log_block(staus, message, path, &block)
  start = Time.now
  status = status.to_s
  status_color = self.status_color status

  Log.info do 
    now = Time.now
    str = Log.color :reset
    str << "#{ Log.color status_color, status}"
    str << ": #{ message }" if message
    str << " -- #{Log.color :blue, path.to_s}" if path
    str << " #{Log.color :yellow, Process.pid}"
    str
  end
  res = yield
  eend = Time.now
  Log.info do 
    now = Time.now
    str = "#{ Log.color :cyan, status.to_s } +#{Log.color :green, "%.1g" % (eend - start)}"
    str << " -- #{Log.color :blue, path.to_s}" if path
    str << " #{Log.color :yellow, Process.pid}"
    str
  end
  res
end

.log_progress(status, options, path, &block) ⇒ Object



211
212
213
214
215
216
217
218
219
220
221
222
223
# File 'lib/rbbt/workflow/accessor.rb', line 211

def self.log_progress(status, options, path, &block)
  options = Misc.add_defaults options, :severity => Log::INFO
  max = Misc.process_options options, :max
  Log::ProgressBar.with_bar(max, options) do |bar|
    begin
      res = yield bar
      raise KeepBar.new res if IO === res
      res
    rescue
      Log.exception $!
    end
  end
end

.log_string(status, message, path) ⇒ Object



196
197
198
199
200
201
202
203
204
205
206
207
208
209
# File 'lib/rbbt/workflow/accessor.rb', line 196

def self.log_string(status, message, path)
  Log.info do 

    status = status.to_s
    status_color = self.status_color status

    str = Log.color :reset
    str << "#{ Log.color status_color, status}"
    str << ": #{ message }" if message
    str << " -- #{Log.color :blue, path.to_s}" if path
    str << " #{Log.color :yellow, Process.pid}"
    str
  end
end

.purge_stream_cacheObject



50
51
52
53
54
55
56
57
58
59
60
# File 'lib/rbbt/workflow/step/run.rb', line 50

def self.purge_stream_cache
  return
  STREAM_CACHE_MUTEX.synchronize do
    STREAM_CACHE.collect{|k,s| 
      Thread.new do
        Misc.consume_stream s
      end
    }
    STREAM_CACHE.clear
  end
end

.started?Boolean

Returns:

  • (Boolean)


9
10
11
# File 'lib/rbbt/workflow/accessor.rb', line 9

def self.started?
  info_file.exists?
end

.status_color(status) ⇒ Object



156
157
158
159
160
161
162
163
164
165
166
167
168
# File 'lib/rbbt/workflow/accessor.rb', line 156

def self.status_color(status)
  status = status.split(">").last
  case status
  when "starting"
    :yellow
  when "error", "aborted"
    :red
  when "done"
    :green
  else
    :cyan
  end
end

.step_info(path) ⇒ Object



34
35
36
37
38
39
40
41
42
43
# File 'lib/rbbt/workflow/accessor.rb', line 34

def self.step_info(path)
  begin
    Open.open(info_file(path)) do |f|
      INFO_SERIALIAZER.load(f)
    end
  rescue Exception
    Log.exception $!
    {}
  end
end

.wait_for_jobs(jobs) ⇒ Object



13
14
15
16
17
18
19
20
21
22
23
24
# File 'lib/rbbt/workflow/accessor.rb', line 13

def self.wait_for_jobs(jobs)
  jobs = [jobs] if Step === jobs
  begin
    threads = []
    jobs.each do |j| threads << Thread.new{j.join} end
    threads.each{|t| t.join }
  rescue Exception
    threads.each{|t| t.exit }
    jobs.each do |j| j.abort end
    raise $!
  end
end

Instance Method Details

#_abortObject



375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
# File 'lib/rbbt/workflow/step/run.rb', line 375

def _abort
  return if @aborted
  @aborted = true
  return if done?
  Log.medium{"#{Log.color :red, "Aborting"} #{Log.color :blue, path}"}
  begin
    stop_dependencies
    abort_stream
    abort_pid
  rescue Aborted
    Log.medium{"#{Log.color :red, "Aborting ABORTED RETRY"} #{Log.color :blue, path}"}
    retry
  rescue Exception
    retry
  ensure
    if Open.exists? path
      Log.warn "Aborted job had finished. Removing result -- #{ path }"
      begin
        Open.rm path
      rescue Exception
        Log.warn "Exception removing result of aborted job: #{$!.message}"
      end
    end
  end
  Log.medium{"#{Log.color :red, "Aborted"} #{Log.color :blue, path}"}
end

#_execObject



83
84
85
86
# File 'lib/rbbt/workflow/step/run.rb', line 83

def _exec
  @exec = true if @exec.nil?
  @task.exec_in((bindings ? bindings : self), *@inputs)
end

#abortObject



402
403
404
405
# File 'lib/rbbt/workflow/step/run.rb', line 402

def abort
  _abort
  log(:aborted, "Job aborted") unless aborted? or error?
end

#abort_pidObject



336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
# File 'lib/rbbt/workflow/step/run.rb', line 336

def abort_pid
  @pid ||= info[:pid]

  case @pid
  when nil
    Log.medium "Could not abort #{path}: no pid"
    false
  when Process.pid
    Log.medium "Could not abort #{path}: same process"
    false
  else
    Log.medium "Aborting #{path}: #{ @pid }"
    begin
      Process.kill("KILL", @pid)
      Process.waitpid @pid
    rescue Exception
      Log.debug("Aborted job #{@pid} was not killed: #{$!.message}")
    end
    true
  end
end

#abort_streamObject



358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
# File 'lib/rbbt/workflow/step/run.rb', line 358

def abort_stream
  stream = get_stream if @result
  stream ||= @saved_stream 
  @saved_stream = nil
  if stream and stream.respond_to? :abort and not stream.aborted?
    begin
      Log.medium "Aborting job stream #{stream.inspect} -- #{Log.color :blue, path}"
      stream.abort 
      #stream.close unless stream.closed?
    rescue Aborted
      Log.medium "Aborting job stream #{stream.inspect} ABORTED RETRY -- #{Log.color :blue, path}"
      Log.exception $!
      retry
    end
  end
end

#aborted?Boolean

Returns:

  • (Boolean)


279
280
281
# File 'lib/rbbt/workflow/accessor.rb', line 279

def aborted?
  @aborted || status == :aborted
end

#checksObject



98
99
100
# File 'lib/rbbt/workflow/step/run.rb', line 98

def checks
  rec_dependencies.collect{|dependency| dependency.path }.uniq
end

#child(&block) ⇒ Object



137
138
139
140
141
142
143
144
145
146
147
148
# File 'lib/rbbt/workflow/step.rb', line 137

def child(&block)
  child_pid = Process.fork &block
  children_pids = info[:children_pids]
  if children_pids.nil?
    children_pids = [child_pid]
  else
    children_pids << child_pid
  end
  #Process.detach(child_pid)
  set_info :children_pids, children_pids
  child_pid
end

#cleanObject



180
181
182
183
# File 'lib/rbbt/workflow/step.rb', line 180

def clean
  Step.clean(path)
  self
end

#done?Boolean

Returns:

  • (Boolean)


259
260
261
# File 'lib/rbbt/workflow/accessor.rb', line 259

def done?
  path and File.exists? path
end

#dup_inputsObject



75
76
77
78
79
80
81
# File 'lib/rbbt/workflow/step/run.rb', line 75

def dup_inputs
  return if @dupped or ENV["RBBT_NO_STREAM"] == 'true'
  @inputs = @inputs.collect do |input|
    Step.dup_stream input
  end
  @dupped = true
end

#error?Boolean

Returns:

  • (Boolean)


275
276
277
# File 'lib/rbbt/workflow/accessor.rb', line 275

def error?
  status == :error
end

#exception(ex, msg = nil) ⇒ Object



243
244
245
246
247
248
249
250
251
252
253
# File 'lib/rbbt/workflow/accessor.rb', line 243

def exception(ex, msg = nil)
  self._abort
  ex_class = ex.class.to_s
  set_info :backtrace, ex.backtrace
  set_info :exception, {:class => ex_class, :message => ex.message, :backtrace => ex.backtrace}
  if msg.nil?
    log :error, "#{ex_class} -- #{ex.message}"
  else
    log :error, "#{msg} -- #{ex.message}"
  end
end

#file(name) ⇒ Object



296
297
298
# File 'lib/rbbt/workflow/accessor.rb', line 296

def file(name)
  Path.setup(File.join(files_dir, name.to_s))
end

#filesObject



289
290
291
292
293
294
# File 'lib/rbbt/workflow/accessor.rb', line 289

def files
  files = Dir.glob(File.join(files_dir, '**', '*')).reject{|path| File.directory? path}.collect do |path| 
    Misc.path_relative_to(files_dir, path) 
  end
  files
end

#files_dirObject

{{{ INFO



285
286
287
# File 'lib/rbbt/workflow/accessor.rb', line 285

def files_dir
  @files_dir ||= Step.files_dir path
end

#fork(semaphore = nil) ⇒ Object



279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
# File 'lib/rbbt/workflow/step/run.rb', line 279

def fork(semaphore = nil)
  raise "Can not fork: Step is waiting for proces #{@pid} to finish" if not @pid.nil? and not Process.pid == @pid and Misc.pid_exists?(@pid) and not done? and info[:forked]
  @pid = Process.fork do
    Misc.pre_fork
    begin
      RbbtSemaphore.wait_semaphore(semaphore) if semaphore
      FileUtils.mkdir_p File.dirname(path) unless File.exists? File.dirname(path)
      begin
        res = run true
        set_info :forked, true
      rescue Aborted
        Log.debug{"Forked process aborted: #{path}"}
        log :aborted, "Job aborted (#{Process.pid})"
        raise $!
      rescue Exception
        Log.debug("Exception '#{$!.message}' caught on forked process: #{path}")
        raise $!
      ensure
        join_stream
      end

      begin
        children_pids = info[:children_pids]
        if children_pids
          children_pids.each do |pid|
            if Misc.pid_exists? pid
              begin
                Process.waitpid pid
              rescue Errno::ECHILD
                Log.low "Waiting on #{ pid } failed: #{$!.message}"
              end
            end
          end
          set_info :children_done, Time.now
        end
      rescue Exception
        Log.debug("Exception waiting for children: #{$!.message}")
        RbbtSemaphore.post_semaphore(semaphore) if semaphore
        Kernel.exit! -1
      end
      set_info :pid, nil
    ensure
      RbbtSemaphore.post_semaphore(semaphore) if semaphore
      Kernel.exit! 0
    end
  end
  Process.detach(@pid)
  self
end

#get_streamObject



62
63
64
65
66
67
68
69
70
71
72
73
# File 'lib/rbbt/workflow/step/run.rb', line 62

def get_stream
  @mutex.synchronize do
    begin
      return nil if @saved_stream
      if IO === @result 
        @saved_stream = @result 
      else 
        nil
      end
    end
  end
end

#graceObject



421
422
423
424
425
426
# File 'lib/rbbt/workflow/step/run.rb', line 421

def grace
  until done? or result or error? or aborted? or streaming? 
    sleep 1 
  end
  self
end

#info(check_lock = true) ⇒ Object



74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
# File 'lib/rbbt/workflow/accessor.rb', line 74

def info(check_lock = true)
  return {} if info_file.nil? or not Open.exists? info_file
  begin
    begin
      return @info_cache if @info_cache and File.ctime(info_file) < @info_cache_time
    rescue Exception
      raise $!
    end

    begin
      @info_cache = Misc.insist(2, 3, info_file) do
        Misc.insist(2, 1, info_file) do
          Misc.insist(3, 0.2, info_file) do
            raise TryAgain, "Info locked" if check_lock and info_lock.locked?
            Open.open(info_file) do |file|
              INFO_SERIALIAZER.load(file) #|| {}
            end
          end
        end
      end
      @info_cache_time = Time.now
      @info_cache
    end
  rescue Exception
    Log.debug{"Error loading info file: " + info_file}
    Log.exception $!
    Open.sensiblewrite(info_file, INFO_SERIALIAZER.dump({:status => :error, :messages => ["Info file lost"]}))
    raise $!
  end
end

#info_fileObject

{{{ INFO



63
64
65
# File 'lib/rbbt/workflow/accessor.rb', line 63

def info_file
  @info_file ||= Step.info_file(path)
end

#info_lockObject



67
68
69
70
71
72
# File 'lib/rbbt/workflow/accessor.rb', line 67

def info_lock
  @info_lock ||= begin
                   path = Persist.persistence_path(info_file + '.lock', {:dir => Step.lock_dir})
                   Lockfile.new path
                 end
end

#joinObject



428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
# File 'lib/rbbt/workflow/step/run.rb', line 428

def join

  grace

  join_stream if status.to_s == "streaming"

  return self if not Open.exists? info_file

  return self if info[:joined]
  pid = @pid 

  Misc.insist [0.1, 0.2, 0.5, 1] do
    pid ||= info[:pid]
  end

  begin
    if pid.nil? or Process.pid == pid
      dependencies.each{|dep| dep.join }
    else
      begin
        Log.debug{"Waiting for pid: #{pid}"}
        Process.waitpid pid 
      rescue Errno::ECHILD
        Log.debug{"Process #{ pid } already finished: #{ path }"}
      end if Misc.pid_exists? pid
      pid = nil
      dependencies.each{|dep| dep.join }
    end
    sleep 1 until path.exists?
    self
  ensure
    set_info :joined, true
  end
end

#join_streamObject



407
408
409
410
411
412
413
414
415
416
417
418
419
# File 'lib/rbbt/workflow/step/run.rb', line 407

def join_stream
  stream = get_stream if @result
  if stream
    begin
      Misc.consume_stream stream 
      stream.join if stream.respond_to? :join
    rescue Exception
      stream.abort
      self._abort
      raise $!
    end
  end
end

#kill_childrenObject



102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
# File 'lib/rbbt/workflow/step/run.rb', line 102

def kill_children
  begin
    children_pids = info[:children_pids]
    if children_pids and children_pids.any?
      Log.medium("Killing children: #{ children_pids * ", " }")
      children_pids.each do |pid|
        Log.medium("Killing child #{ pid }")
        begin
          Process.kill "INT", pid
        rescue Exception
          Log.medium("Exception killing child #{ pid }: #{$!.message}")
        end
      end
    end
  rescue
    Log.medium("Exception finding children")
  end
end

#loadObject



151
152
153
154
155
156
# File 'lib/rbbt/workflow/step.rb', line 151

def load
  return prepare_result @result, @task.result_description if @result and not @path == @result
  join if not done?
  return Persist.load_file(@path, @task.result_type) if @path.exists?
  exec
end

#load_file(name, type = nil, options = {}) ⇒ Object



316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
# File 'lib/rbbt/workflow/accessor.rb', line 316

def load_file(name, type = nil, options = {})
  if type.nil? and name =~ /.*\.(\w+)$/
    extension = name.match(/.*\.(\w+)$/)[1]
    case extension
    when "tc"
      type = :tc
    when "tsv"
      type = :tsv
    when "list", "ary", "array"
      type = :array
    when "yaml"
      type = :yaml
    when "marshal"
      type = :marshal
    else
      type = :other
    end
  else
    type ||= :other
  end

  case type.to_sym
  when :tc
    Persist.open_tokyocabinet(file(name), false)
  when :tsv
    TSV.open Open.open(file(name)), options
  when :array
    #Open.read(file(name)).split /\n|,\s*/
    Open.read(file(name)).split "\n"
  when :yaml
    YAML.load(Open.open(file(name)))
  when :marshal
    Marshal.load(Open.open(file(name)))
  else
    Open.read(file(name))
  end
end

#log(status, message = nil, &block) ⇒ Object



237
238
239
240
241
# File 'lib/rbbt/workflow/accessor.rb', line 237

def log(status, message = nil, &block)
  self.status = status
  self.message Log.uncolor(message)
  Step.log(status, message, path, &block)
end

#merge_info(hash) ⇒ Object



118
119
120
121
122
123
124
125
126
127
128
129
# File 'lib/rbbt/workflow/accessor.rb', line 118

def merge_info(hash)
  return nil if @exec or info_file.nil?
  value = Annotated.purge value if defined? Annotated
  Open.lock(info_file, :lock => info_lock) do
    i = info(false)
    i.merge! hash
    @info_cache = i
    Misc.sensiblewrite(info_file, INFO_SERIALIAZER.dump(i), :force => true)
    @info_cache_time = Time.now
    value
  end
end

#message(message) ⇒ Object



152
153
154
# File 'lib/rbbt/workflow/accessor.rb', line 152

def message(message)
  set_info(:messages, (messages || []) << message)
end

#messagesObject



144
145
146
147
148
149
150
# File 'lib/rbbt/workflow/accessor.rb', line 144

def messages
  if messages = info[:messages]
    messages
  else
    set_info(:messages, []) if self.respond_to?(:set_info)
  end
end

#nameObject



53
54
55
# File 'lib/rbbt/workflow/accessor.rb', line 53

def name
  path.sub(/.*\/#{Regexp.quote task.name.to_s}\/(.*)/, '\1')
end

#prepare_result(value, description = nil, info = {}) ⇒ Object



94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
# File 'lib/rbbt/workflow/step.rb', line 94

def prepare_result(value, description = nil, info = {})
  case 
  when IO === value
    begin
      res = case @task.result_type
            when :array
              array = []
              while line = value.gets
                array << line.strip
              end
              array
            when :tsv
              begin
                TSV.open(value)
              rescue IOError
                TSV.setup({})
              end
            else
              value.read
            end
      value.join if value.respond_to? :join
      res
    rescue Exception
      value.abort if value.respond_to? :abort
      self.abort
      raise $!
    end
  when (not defined? Entity or description.nil? or not Entity.formats.include? description)
    value
  when (Annotated === value and info.empty?)
    value
  when Annotated === value
    annotations = value.annotations
    info.each do |k,v|
      value.send("#{h}=", v) if annotations.include? k
    end
    value
  else
    Entity.formats[description].setup(value, info.merge(:format => description))
  end
end

#provenanceObject



354
355
356
357
358
359
360
361
362
363
364
365
# File 'lib/rbbt/workflow/accessor.rb', line 354

def provenance
  provenance = {}
  dependencies.each do |dep|
    next unless dep.path.exists?
    if File.exists? dep.info_file
      provenance[dep.path] = dep.provenance if File.exists? dep.path
    else
      provenance[dep.path] = nil
    end
  end
  {:inputs => info[:inputs], :provenance => provenance}
end

#provenance_pathsObject



367
368
369
370
371
372
373
# File 'lib/rbbt/workflow/accessor.rb', line 367

def provenance_paths
  provenance = {}
  dependencies.each do |dep|
    provenance[dep.path] = dep.provenance_paths if File.exists? dep.path
  end
  provenance
end

#rec_dependenciesObject



185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
# File 'lib/rbbt/workflow/step.rb', line 185

def rec_dependencies

  # A step result with no info_file means that it was manually
  # placed. In that case, do not consider its dependencies
  return [] if Open.exists?(self.path.to_s) and not Open.exists? self.info_file

  return [] if dependencies.nil? or dependencies.empty?
  new_dependencies = dependencies.collect{|step| 
    step.rec_dependencies 
  }.flatten.uniq.compact

  dependencies = self.dependencies ? self.dependencies + new_dependencies : new_dependencies
  dependencies.flatten!
  dependencies.uniq!
  dependencies
end

#recursive_cleanObject



202
203
204
205
206
207
208
209
210
211
# File 'lib/rbbt/workflow/step.rb', line 202

def recursive_clean
  clean
  rec_dependencies.each do |step| 
    if Open.exists?(step.info_file) 
      step.clean 
    else
    end
  end
  self
end

#relay_log(step) ⇒ Object



77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
# File 'lib/rbbt/workflow/step.rb', line 77

def relay_log(step)
  return self unless Task === self.task and not self.task.name.nil?
  if not self.respond_to? :original_log
    class << self
      attr_accessor :relay_step
      alias original_log log 
      def log(status, message = nil)
        self.status = status
        message Log.uncolor message
        relay_step.log([task.name.to_s, status.to_s] * ">", message.nil? ? nil : message ) unless (relay_step.done? or relay_step.error? or relay_step.aborted?)
      end
    end
  end
  @relay_step = step
  self
end

#run(no_load = false) ⇒ Object



165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
# File 'lib/rbbt/workflow/step/run.rb', line 165

def run(no_load = false)
  result = nil

  begin
    @mutex.synchronize do
      no_load = no_load ? :stream : false
      result = Persist.persist "Job", @task.result_type, :file => path, :check => checks, :no_load => no_load do |lockfile|
        if Step === Step.log_relay_step and not self == Step.log_relay_step
          relay_log(Step.log_relay_step) unless self.respond_to? :relay_step and self.relay_step
        end
        @exec = false

        Open.rm info_file if Open.exists? info_file

        log :setup, "#{Log.color :cyan, "Setup"} #{Log.color :yellow, task.name.to_s || ""}"

        merge_info({
          :pid => Process.pid,
          :issued => Time.now,
          :name => name,
          :clean_name => clean_name,
          :dependencies => dependencies.collect{|dep| [dep.task_name, dep.name, dep.path]},
        })

        dup_inputs
        begin
          run_dependencies
        rescue Exception
          stop_dependencies
          raise $!
        end


        set_info :inputs, Misc.remove_long_items(Misc.zip2hash(task.inputs, @inputs)) unless task.inputs.nil?

        set_info :started, (start_time = Time.now)
        log :started, "#{Log.color :magenta, "Starting"} #{Log.color :yellow, task.name.to_s || ""}"

        begin
          result = _exec
        rescue Aborted
          stop_dependencies
          log(:aborted, "Aborted")
          raise $!
        rescue Exception
          backtrace = $!.backtrace

          # HACK: This fixes an strange behaviour in 1.9.3 where some
          # backtrace strings are coded in ASCII-8BIT
          set_info :backtrace, backtrace 
          log(:error, "#{$!.class}: #{$!.message}")
          backtrace.each{|l| l.force_encoding("UTF-8")} if String.instance_methods.include? :force_encoding
          stop_dependencies
          raise $!
        end

        if not no_load or ENV["RBBT_NO_STREAM"] == "true" 
          result = prepare_result result, @task.description, info if IO === result 
          result = prepare_result result.stream, @task.description, info if TSV::Dumper === result 
        end

        stream = case result
                 when IO
                   result
                 when TSV::Dumper
                   result.stream
                 end

        if stream
          log :streaming, "#{Log.color :magenta, "Streaming"} #{Log.color :yellow, task.name.to_s || ""}"
          ConcurrentStream.setup stream do
            begin
              if status != :done
                Misc.insist do
                  set_info :done, (done_time = Time.now)
                  set_info :time_elapsed, (time_elapsed = done_time - start_time)
                  log :done, "#{Log.color :magenta, "Completed"} #{Log.color :yellow, task.name.to_s || ""} in #{time_elapsed.to_i} sec."
                end
              end
            rescue
              Log.exception $!
            ensure
              join
            end
          end
          stream.abort_callback = Proc.new do
            begin
              log :aborted, "#{Log.color :red, "Aborted"} #{Log.color :yellow, task.name.to_s || ""}" if status == :streaming
            rescue
              Log.exception $!
            end
          end
        else
          set_info :done, (done_time = Time.now)
          set_info :time_elapsed, (time_elapsed = done_time - start_time)
          log :done, "#{Log.color :magenta, "Completed"} #{Log.color :yellow, task.name.to_s || ""} in #{time_elapsed.to_i} sec."
        end

        result
      end

      if no_load
        @result ||= result
        self
      else
        @result = prepare_result result, @task.result_description
      end
    end
  rescue Exception
    exception $!
    raise $!
  end
end

#run_dependenciesObject



121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
# File 'lib/rbbt/workflow/step/run.rb', line 121

def run_dependencies
  @seen ||= []
  dependencies.uniq.each do |dependency| 
    next if seen.collect{|d| d.path}.include?(dependency.path)
    dependency.seen = seen
    @seen.concat dependency.rec_dependencies.collect{|d| d } 
    @seen << dependency
    @seen.uniq!
  end

  @seen.each do |dependency|
    next if dependency == self
    next unless dependencies.include? dependency
    dependency.dup_inputs
  end

  @seen.each do |dependency| 
    next if dependency == self
    next unless dependencies.include? dependency
    Log.info "#{Log.color :cyan, "dependency"} #{Log.color :yellow, task.name.to_s || ""} => #{Log.color :yellow, dependency.task_name.to_s || ""} -- #{Log.color :blue, dependency.path}"
    begin
      if dependency.streaming? 
        next if dependency.running?
        dependency.clean 
      else
        dependency.clean if (dependency.error? or dependency.aborted? or dependency.status.nil? or not (dependency.done? or dependency.running?))
      end

      unless dependency.started? 
        dependency.run(true)
      end
    rescue Aborted
      Log.error "Aborted dep. #{Log.color :red, dependency.task.name.to_s}"
      raise $!
    rescue Interrupt
      Log.error "Interrupted while in dep. #{Log.color :red, dependency.task.name.to_s}"
      raise $!
    rescue Exception
      Log.error "Exception in dep. #{ Log.color :red, dependency.task.name.to_s }"
      raise $!
    end
  end
end

#running?Boolean

Returns:

  • (Boolean)


267
268
269
270
271
272
273
# File 'lib/rbbt/workflow/accessor.rb', line 267

def running?
  return nil if not Open.exists? info_file
  return nil if info[:pid].nil?

  pid = @pid || info[:pid]
  return Misc.pid_exists?(pid) 
end

#save_file(name, content) ⇒ Object



300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
# File 'lib/rbbt/workflow/accessor.rb', line 300

def save_file(name, content)
  content = case
            when String === content
              content
            when Array === content
              content * "\n"
            when TSV === content
              content.to_s
            when Hash === content
              content.collect{|*p| p * "\t"} * "\n"
            else
              content.to_s
            end
  Open.write(file(name), content)
end

#set_info(key, value) ⇒ Object



105
106
107
108
109
110
111
112
113
114
115
116
# File 'lib/rbbt/workflow/accessor.rb', line 105

def set_info(key, value)
  return nil if @exec or info_file.nil?
  value = Annotated.purge value if defined? Annotated
  Open.lock(info_file, :lock => info_lock) do
    i = info(false)
    i[key] = value 
    @info_cache = i
    Misc.sensiblewrite(info_file, INFO_SERIALIAZER.dump(i), :force => true, :lock => false)
    @info_cache_time = Time.now
    value
  end
end

#started?Boolean

Returns:

  • (Boolean)


255
256
257
# File 'lib/rbbt/workflow/accessor.rb', line 255

def started?
  Open.exists? info_file or Open.exists? path
end

#statusObject



131
132
133
134
135
136
137
138
# File 'lib/rbbt/workflow/accessor.rb', line 131

def status
  begin
    info[:status]
  rescue Exception
    Log.error "Exception reading status: #{$!.message}" 
    :error
  end
end

#status=(status) ⇒ Object



140
141
142
# File 'lib/rbbt/workflow/accessor.rb', line 140

def status=(status)
  set_info(:status, status)
end

#step(name) ⇒ Object



213
214
215
216
217
218
219
220
221
222
# File 'lib/rbbt/workflow/step.rb', line 213

def step(name)
  @steps ||= {}
  @steps[name] ||= begin
                     deps = rec_dependencies.select{|step| 
                       step.task_name.to_sym == name.to_sym
                     }
                     deps.first
                   end

end

#stop_dependenciesObject



329
330
331
332
333
334
# File 'lib/rbbt/workflow/step/run.rb', line 329

def stop_dependencies
  dependencies.each do |dep|
    dep.abort
  end
  kill_children
end

#streaming?Boolean

Returns:

  • (Boolean)


263
264
265
# File 'lib/rbbt/workflow/accessor.rb', line 263

def streaming?
  IO === @result or status == :streaming
end

#task_nameObject



64
65
66
# File 'lib/rbbt/workflow/step.rb', line 64

def task_name
  @task.name
end