Class: Step

Inherits:
Object
  • Object
show all
Defined in:
lib/rbbt/workflow/step.rb,
lib/rbbt/workflow/accessor.rb,
lib/rbbt/workflow/step/run.rb

Constant Summary collapse

INFO_SERIALIAZER =
Marshal
STREAM_CACHE =
{}
STREAM_CACHE_MUTEX =
Mutex.new

Class Attribute Summary collapse

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(path, task = nil, inputs = nil, dependencies = nil, bindings = nil) ⇒ Step

Returns a new instance of Step.



21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
# File 'lib/rbbt/workflow/step.rb', line 21

def initialize(path, task = nil, inputs = nil, dependencies = nil, bindings = nil)
  path = Path.setup(Misc.sanitize_filename(path)) if String === path
  pat = path.call if Proc === path
  @path = path
  @task = task
  @bindings = bindings
  @dependencies = case
                  when dependencies.nil? 
                    []
                  when Array === dependencies
                    dependencies
                  else
                    [dependencies]
                  end
  @mutex = Mutex.new
  @info_mutex = Mutex.new
  @inputs = inputs || []
end

Class Attribute Details

.lock_dirObject

Returns the value of attribute lock_dir.



14
15
16
# File 'lib/rbbt/workflow/step.rb', line 14

def lock_dir
  @lock_dir
end

.log_relay_stepObject

Returns the value of attribute log_relay_step.



50
51
52
# File 'lib/rbbt/workflow/step.rb', line 50

def log_relay_step
  @log_relay_step
end

Instance Attribute Details

#bindingsObject

Returns the value of attribute bindings.



8
9
10
# File 'lib/rbbt/workflow/step.rb', line 8

def bindings
  @bindings
end

#dependenciesObject

Returns the value of attribute dependencies.



8
9
10
# File 'lib/rbbt/workflow/step.rb', line 8

def dependencies
  @dependencies
end

#duppedObject (readonly)

Returns the value of attribute dupped.



3
4
5
# File 'lib/rbbt/workflow/step/run.rb', line 3

def dupped
  @dupped
end

#exec(no_load = false) ⇒ Object

Returns the value of attribute exec.



10
11
12
# File 'lib/rbbt/workflow/step.rb', line 10

def exec
  @exec
end

#inputsObject

Returns the value of attribute inputs.



8
9
10
# File 'lib/rbbt/workflow/step.rb', line 8

def inputs
  @inputs
end

#mutexObject

Returns the value of attribute mutex.



11
12
13
# File 'lib/rbbt/workflow/step.rb', line 11

def mutex
  @mutex
end

#pathObject

Returns the value of attribute path.



8
9
10
# File 'lib/rbbt/workflow/step.rb', line 8

def path
  @path
end

#pidObject

Returns the value of attribute pid.



9
10
11
# File 'lib/rbbt/workflow/step.rb', line 9

def pid
  @pid
end

#resultObject

Returns the value of attribute result.



11
12
13
# File 'lib/rbbt/workflow/step.rb', line 11

def result
  @result
end

#saved_streamObject (readonly)

Returns the value of attribute saved_stream.



3
4
5
# File 'lib/rbbt/workflow/step/run.rb', line 3

def saved_stream
  @saved_stream
end

#seenObject

Returns the value of attribute seen.



11
12
13
# File 'lib/rbbt/workflow/step.rb', line 11

def seen
  @seen
end

#streamObject (readonly)

Returns the value of attribute stream.



3
4
5
# File 'lib/rbbt/workflow/step/run.rb', line 3

def stream
  @stream
end

#taskObject

Returns the value of attribute task.



8
9
10
# File 'lib/rbbt/workflow/step.rb', line 8

def task
  @task
end

Class Method Details

.dup_stream(stream) ⇒ Object



7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
# File 'lib/rbbt/workflow/step/run.rb', line 7

def self.dup_stream(stream)
  case stream
  when IO, File
    return stream if stream.closed?

    STREAM_CACHE_MUTEX.synchronize do
      case current = STREAM_CACHE[stream]
      when nil
        Log.medium "Not duplicating stream #{ Misc.fingerprint(stream) }"
        STREAM_CACHE[stream] = stream
      when File
        if true or current.closed?
          Log.medium "Reopening file #{ Misc.fingerprint(current) }"
          Open.open(current.filename)
        else
          Log.medium "Duplicating file #{ Misc.fingerprint(current) }"
          new = current.dup
          new.rewind
          new
        end

      else
        Log.medium "Duplicating stream #{ Misc.fingerprint(stream) }"
        Misc.dup_stream(current)
      end
    end
  when TSV::Dumper#, TSV::Parser
    stream = stream.stream
    return stream if stream.closed?

    STREAM_CACHE_MUTEX.synchronize do
      if STREAM_CACHE[stream].nil?
        Log.high "Not duplicating dumper #{ stream.inspect }"
        STREAM_CACHE[stream] = stream
      else
        new = Misc.dup_stream(STREAM_CACHE[stream])
        Log.high "Duplicating dumper #{ stream.inspect } into #{new.inspect}"
        new
      end
    end
  else
    stream
  end
end

.files_dir(path) ⇒ Object



8
9
10
# File 'lib/rbbt/workflow/accessor.rb', line 8

def self.files_dir(path)
  path.nil? ? nil : path + '.files'
end

.info_file(path) ⇒ Object



12
13
14
# File 'lib/rbbt/workflow/accessor.rb', line 12

def self.info_file(path)
  path.nil? ? nil : path + '.info'
end

.job_name_for_info_file(info_file, extension = nil) ⇒ Object



16
17
18
19
20
21
22
# File 'lib/rbbt/workflow/accessor.rb', line 16

def self.job_name_for_info_file(info_file, extension = nil)
  if extension and not extension.empty?
    info_file.sub(/\.#{extension}\.info$/,'')
  else
    info_file.sub(/\.info$/,'')
  end
end

.log(status, message, path, &block) ⇒ Object



191
192
193
194
195
196
197
198
199
200
201
# File 'lib/rbbt/workflow/accessor.rb', line 191

def self.log(status, message, path, &block)
  if block
    if Hash === message
      log_progress(status, message, path, &block)
    else
      log_block(status, message, path, &block)
    end
  else
    log_string(status, message, path)
  end
end

.log_block(staus, message, path, &block) ⇒ Object



122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
# File 'lib/rbbt/workflow/accessor.rb', line 122

def self.log_block(staus, message, path, &block)
  start = Time.now
  status = status.to_s
  status_color = case status
                 when "starting"
                   :yellow
                 when "error"
                   :red
                 when "done"
                   :green
                 else
                   :cyan
                 end
  Log.info do 
    now = Time.now
    str = Log.color :reset
    str << "#{ Log.color status_color, status}"
    str << ": #{ message }" if message
    str << " -- #{Log.color :blue, path.to_s}" if path
    str
  end
  res = yield
  eend = Time.now
  Log.info do 
    now = Time.now
    str = "#{ Log.color :cyan, status.to_s } +#{Log.color :green, "%.1g" % (eend - start)}"
    str << " -- #{Log.color :blue, path.to_s}" if path
    str
  end
  res
end

.log_progress(status, options, path, &block) ⇒ Object



177
178
179
180
181
182
183
184
185
186
187
188
189
# File 'lib/rbbt/workflow/accessor.rb', line 177

def self.log_progress(status, options, path, &block)
  options = Misc.add_defaults options, :severity => Log::INFO
  max = Misc.process_options options, :max
  Log::ProgressBar.with_bar(max, options) do |bar|
    begin
      res = yield bar
      raise KeepBar.new res if IO === res
      res
    rescue
      Log.exception $!
    end
  end
end

.log_string(status, message, path) ⇒ Object



154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
# File 'lib/rbbt/workflow/accessor.rb', line 154

def self.log_string(status, message, path)
  Log.info do 

    status = status.to_s
    status_color = case status
                   when "starting"
                     :yellow
                   when "error"
                     :red
                   when "done"
                     :green
                   else
                     :cyan
                   end

    str = Log.color :reset
    str << "#{ Log.color status_color, status}"
    str << ": #{ message }" if message
    str << " -- #{Log.color :blue, path.to_s}" if path
    str
  end
end

.purge_stream_cacheObject



52
53
54
55
56
57
58
59
60
61
62
# File 'lib/rbbt/workflow/step/run.rb', line 52

def self.purge_stream_cache
  return
  STREAM_CACHE_MUTEX.synchronize do
    STREAM_CACHE.collect{|k,s| 
      Thread.new do
        Misc.consume_stream s
      end
    }
    STREAM_CACHE.clear
  end
end

Instance Method Details

#_execObject



85
86
87
88
# File 'lib/rbbt/workflow/step/run.rb', line 85

def _exec
  @exec = true if @exec.nil?
  @task.exec_in((bindings ? bindings : self), *@inputs)
end

#abortObject



393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
# File 'lib/rbbt/workflow/step/run.rb', line 393

def abort
  return if @aborted
  @aborted = true
  return if done?
  Log.medium{"#{Log.color :red, "Aborting"} #{Log.color :blue, path}"}
  begin
    stop_dependencies
    abort_stream
    abort_pid
  rescue Aborted
    Log.medium{"#{Log.color :red, "Aborting ABORTED RETRY"} #{Log.color :blue, path}"}
    retry
  rescue Exception
    retry
  ensure
    if Open.exists? path
      Log.warn "Aborted job had finished. Removing result"
      begin
        Open.rm path
      rescue Exception
        Log.warn "Exception removing result of aborted job: #{$!.message}"
      end
    end

    begin
      log(:aborted, "Job aborted")
    rescue Exception
      Log.exception $!
    end
  end
  Log.medium{"#{Log.color :red, "Aborted"} #{Log.color :blue, path}"}
end

#abort_pidObject



355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
# File 'lib/rbbt/workflow/step/run.rb', line 355

def abort_pid
  @pid ||= info[:pid]

  case @pid
  when nil
    Log.medium "Could not abort #{path}: no pid"
    false
  when Process.pid
    Log.medium "Could not abort #{path}: same process"
    false
  else
    Log.medium "Aborting #{path}: #{ @pid }"
    begin
      Process.kill("KILL", @pid)
      Process.waitpid @pid
    rescue Exception
      Log.debug("Aborted job #{@pid} was not killed: #{$!.message}")
    end
    true
  end
end

#abort_streamObject



377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
# File 'lib/rbbt/workflow/step/run.rb', line 377

def abort_stream
  stream = get_stream if @result
  stream ||= @saved_stream 
  @saved_stream = nil
  if stream and stream.respond_to? :abort and not stream.aborted?
    begin
      Log.medium "Aborting job stream #{stream.inspect} -- #{Log.color :blue, path}"
      stream.abort 
      #stream.close unless stream.closed?
    rescue Aborted
      Log.medium "Aborting job stream #{stream.inspect} ABORTED RETRY -- #{Log.color :blue, path}"
      retry
    end
  end
end

#aborted?Boolean

Returns:

  • (Boolean)


233
234
235
# File 'lib/rbbt/workflow/accessor.rb', line 233

def aborted?
  @aborted || info[:status] == :aborted
end

#checksObject



100
101
102
# File 'lib/rbbt/workflow/step/run.rb', line 100

def checks
  rec_dependencies.collect{|dependency| dependency.path }.uniq
end

#child(&block) ⇒ Object



109
110
111
112
113
114
115
116
117
118
119
120
# File 'lib/rbbt/workflow/step.rb', line 109

def child(&block)
  child_pid = Process.fork &block
  children_pids = info[:children_pids]
  if children_pids.nil?
    children_pids = [child_pid]
  else
    children_pids << child_pid
  end
  #Process.detach(child_pid)
  set_info :children_pids, children_pids
  child_pid
end

#cleanObject



129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
# File 'lib/rbbt/workflow/step.rb', line 129

def clean
  if Open.exists?(path) or Open.exists?(info_file)
    begin
      self.abort if self.running?
    rescue Exception
    end

    @result = nil
    @pid = nil

    begin
      Open.rm info_file if Open.exists? info_file
      Open.rm info_file + '.lock' if Open.exists? info_file + '.lock'
      Open.rm path if Open.exists? path
      Open.rm path + '.lock' if Open.exists? path + '.lock'
      Open.rm_rf files_dir if Open.exists? files_dir
    end
  end
  self
end

#clean_nameObject



28
29
30
# File 'lib/rbbt/workflow/accessor.rb', line 28

def clean_name
  name.sub(/(.*)_.*/, '\1')
end

#done?Boolean

Returns:

  • (Boolean)


213
214
215
# File 'lib/rbbt/workflow/accessor.rb', line 213

def done?
  path and File.exists? path
end

#dup_inputsObject



77
78
79
80
81
82
83
# File 'lib/rbbt/workflow/step/run.rb', line 77

def dup_inputs
  return if @dupped or ENV["RBBT_NO_STREAM"] == 'true'
  @inputs = @inputs.collect do |input|
    Step.dup_stream input
  end
  @dupped = true
end

#error?Boolean

Returns:

  • (Boolean)


229
230
231
# File 'lib/rbbt/workflow/accessor.rb', line 229

def error?
  info[:status] == :error
end

#file(name) ⇒ Object



250
251
252
# File 'lib/rbbt/workflow/accessor.rb', line 250

def file(name)
  Path.setup(File.join(files_dir, name.to_s))
end

#filesObject



243
244
245
246
247
248
# File 'lib/rbbt/workflow/accessor.rb', line 243

def files
  files = Dir.glob(File.join(files_dir, '**', '*')).reject{|path| File.directory? path}.collect do |path| 
    Misc.path_relative_to(files_dir, path) 
  end
  files
end

#files_dirObject

{{{ INFO



239
240
241
# File 'lib/rbbt/workflow/accessor.rb', line 239

def files_dir
  @files_dir ||= Step.files_dir path
end

#fork(semaphore = nil) ⇒ Object



300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
# File 'lib/rbbt/workflow/step/run.rb', line 300

def fork(semaphore = nil)
  raise "Can not fork: Step is waiting for proces #{@pid} to finish" if not @pid.nil? and not Process.pid == @pid and Misc.pid_exists?(@pid) and not done? and info[:forked]
  @pid = Process.fork do
    begin
      RbbtSemaphore.wait_semaphore(semaphore) if semaphore
      FileUtils.mkdir_p File.dirname(path) unless Open.exists? File.dirname(path)
      begin
        res = run true
      rescue Aborted
        Log.debug{"Forked process aborted: #{path}"}
        log :aborted, "Job aborted (#{Process.pid})"
        raise $!
      rescue Exception
        Log.debug("Exception '#{$!.message}' caught on forked process: #{path}")
        raise $!
      ensure
        join_stream
      end

      begin
        children_pids = info[:children_pids]
        if children_pids
          children_pids.each do |pid|
            if Misc.pid_exists? pid
              begin
                Process.waitpid pid
              rescue Errno::ECHILD
                Log.low "Waiting on #{ pid } failed: #{$!.message}"
              end
            end
          end
          set_info :children_done, Time.now
        end
      rescue Exception
        Log.debug("Exception waiting for children: #{$!.message}")
        exit -1
      end
      set_info :pid, nil
      exit 0
    ensure
      RbbtSemaphore.post_semaphore(semaphore) if semaphore
    end
  end
  set_info :forked, true
  Process.detach(@pid)
  self
end

#get_streamObject



64
65
66
67
68
69
70
71
72
73
74
75
# File 'lib/rbbt/workflow/step/run.rb', line 64

def get_stream
  @mutex.synchronize do
    begin
      return nil if @saved_stream
      if IO === @result 
        @saved_stream = @result 
      else 
        nil
      end
    end
  end
end

#graceObject



438
439
440
441
442
443
# File 'lib/rbbt/workflow/step/run.rb', line 438

def grace
  until done? or result or streaming? or error? or aborted?
    sleep 1 
  end
  self
end

#infoObject



42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
# File 'lib/rbbt/workflow/accessor.rb', line 42

def info
  return {} if info_file.nil? or not Open.exists? info_file
  begin
    @info_mutex.synchronize do
      begin
        return @info_cache if @info_cache and File.mtime(info_file) < @info_cache_time
      rescue Exception
        raise $!
      end

      begin
        @info_cache = Misc.insist(2, 3, info_file) do
          Misc.insist(2, 1, info_file) do
            Misc.insist(3, 0.2, info_file) do
              Open.open(info_file) do |file|
                INFO_SERIALIAZER.load(file) || {}
              end
            end
          end
        end
        @info_cache_time = Time.now
        @info_cache
      end
    end
  rescue Exception
    Log.debug{"Error loading info file: " + info_file}
    #self.abort_pid
    Open.write(info_file, INFO_SERIALIAZER.dump({:status => :error, :messages => ["Info file lost"]}))
    raise $!
  end
end

#info_fileObject

{{{ INFO



38
39
40
# File 'lib/rbbt/workflow/accessor.rb', line 38

def info_file
  @info_file ||= Step.info_file(path)
end

#joinObject



445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
# File 'lib/rbbt/workflow/step/run.rb', line 445

def join

  grace

  join_stream if status == :streaming

  return self if not Open.exists? info_file

  return self if info[:joined]
  pid = @pid 

  Misc.insist [0.1, 0.2, 0.5, 1] do
    pid ||= info[:pid]
  end

  begin
    if pid.nil? or Process.pid == pid
      dependencies.each{|dep| dep.join }
      while not done?
        sleep 1
      end
    else
      begin
        Log.debug{"Waiting for pid: #{pid}"}
        Process.waitpid pid 
      rescue Errno::ECHILD
        Log.debug{"Process #{ pid } already finished: #{ path }"}
      end if Misc.pid_exists? pid
      pid = nil
      dependencies.each{|dep| dep.join }
    end
    self
  ensure
    set_info :joined, true
  end
end

#join_streamObject



426
427
428
429
430
431
432
433
434
435
436
# File 'lib/rbbt/workflow/step/run.rb', line 426

def join_stream
  stream = get_stream if @result
  if stream
    begin
      Misc.consume_stream stream
    rescue Exception
      self.abort
      raise $!
    end
  end
end

#kill_childrenObject



104
105
106
107
108
109
110
111
112
113
114
115
116
117
# File 'lib/rbbt/workflow/step/run.rb', line 104

def kill_children
  children_pids = info[:children_pids]
  if children_pids and children_pids.any?
    Log.medium("Killing children: #{ children_pids * ", " }")
    children_pids.each do |pid|
      Log.medium("Killing child #{ pid }")
      begin
        Process.kill "INT", pid
      rescue Exception
        Log.medium("Exception killing child #{ pid }: #{$!.message}")
      end
    end
  end
end

#loadObject



122
123
124
125
126
127
# File 'lib/rbbt/workflow/step.rb', line 122

def load
  return prepare_result @result, @task.result_description if @result and not @path == @result
  join if not done?
  return Persist.load_file(@path, @task.result_type) if @path.exists?
  exec
end

#load_file(name, type = nil, options = {}) ⇒ Object



270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
# File 'lib/rbbt/workflow/accessor.rb', line 270

def load_file(name, type = nil, options = {})
  if type.nil? and name =~ /.*\.(\w+)$/
    extension = name.match(/.*\.(\w+)$/)[1]
    case extension
    when "tc"
      type = :tc
    when "tsv"
      type = :tsv
    when "list", "ary", "array"
      type = :array
    when "yaml"
      type = :yaml
    when "marshal"
      type = :marshal
    else
      type = :other
    end
  else
    type ||= :other
  end

  case type.to_sym
  when :tc
    Persist.open_tokyocabinet(file(name), false)
  when :tsv
    TSV.open Open.open(file(name)), options
  when :array
    #Open.read(file(name)).split /\n|,\s*/
    Open.read(file(name)).split "\n"
  when :yaml
    YAML.load(Open.open(file(name)))
  when :marshal
    Marshal.load(Open.open(file(name)))
  else
    Open.read(file(name))
  end
end

#log(status, message = nil, &block) ⇒ Object



203
204
205
206
207
# File 'lib/rbbt/workflow/accessor.rb', line 203

def log(status, message = nil, &block)
  self.status = status
  self.message Log.uncolor(message)
  Step.log(status, message, path, &block)
end

#merge_info(hash) ⇒ Object



88
89
90
91
92
93
94
95
96
97
98
99
100
# File 'lib/rbbt/workflow/accessor.rb', line 88

def merge_info(hash)
  return nil if @exec or info_file.nil?
  value = Annotated.purge value if defined? Annotated
  lock_filename = Persist.persistence_path(info_file, {:dir => Step.lock_dir})
  Open.lock(info_file, :refresh => false) do
    i = info
    i.merge! hash
    @info_cache = i
    Open.write(info_file, INFO_SERIALIAZER.dump(i))
    @info_cache_time = Time.now
    value
  end
end

#message(message) ⇒ Object



118
119
120
# File 'lib/rbbt/workflow/accessor.rb', line 118

def message(message)
  set_info(:messages, (messages || []) << message)
end

#messagesObject



110
111
112
113
114
115
116
# File 'lib/rbbt/workflow/accessor.rb', line 110

def messages
  if messages = info[:messages]
    messages
  else
    set_info(:messages, []) if self.respond_to?(:set_info)
  end
end

#nameObject



24
25
26
# File 'lib/rbbt/workflow/accessor.rb', line 24

def name
  path.sub(/.*\/#{Regexp.quote task.name.to_s}\/(.*)/, '\1')
end

#prepare_result(value, description = nil, info = {}) ⇒ Object



70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
# File 'lib/rbbt/workflow/step.rb', line 70

def prepare_result(value, description = nil, info = {})
  case 
  when IO === value
    begin
      res = case @task.result_type
            when :array
              array = []
              while line = value.gets
                array << line.strip
              end
              array
            when :tsv
              TSV.open(value)
            else
              value.read
            end
      value.join if value.respond_to? :join
      res
    rescue Exception
      value.abort if value.respond_to? :abort
      self.abort
      raise $!
    end
  when (not defined? Entity or description.nil? or not Entity.formats.include? description)
    value
  when (Annotated === value and info.empty?)
    value
  when Annotated === value
    annotations = value.annotations
    info.each do |k,v|
      value.send("#{h}=", v) if annotations.include? k
    end
    value
  else
    Entity.formats[description].setup(value, info.merge(:format => description))
  end
end

#provenanceObject



308
309
310
311
312
313
314
315
316
317
318
319
# File 'lib/rbbt/workflow/accessor.rb', line 308

def provenance
  provenance = {}
  dependencies.each do |dep|
    next unless dep.path.exists?
    if File.exists? dep.info_file
      provenance[dep.path] = dep.provenance if File.exists? dep.path
    else
      provenance[dep.path] = nil
    end
  end
  {:inputs => info[:inputs], :provenance => provenance}
end

#provenance_pathsObject



321
322
323
324
325
326
327
# File 'lib/rbbt/workflow/accessor.rb', line 321

def provenance_paths
  provenance = {}
  dependencies.each do |dep|
    provenance[dep.path] = dep.provenance_paths if File.exists? dep.path
  end
  provenance
end

#rec_dependenciesObject



150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
# File 'lib/rbbt/workflow/step.rb', line 150

def rec_dependencies

  # A step result with no info_file means that it was manually
  # placed. In that case, do not consider its dependencies
  return [] if Open.exists?(self.path.to_s) and not Open.exists? self.info_file

  return [] if dependencies.nil? or dependencies.empty?
  new_dependencies = dependencies.collect{|step| 
    step.rec_dependencies 
  }.flatten.uniq.compact

  dependencies = self.dependencies ? self.dependencies + new_dependencies : new_dependencies
  dependencies.flatten!
  dependencies.uniq!
  dependencies
end

#recursive_cleanObject



167
168
169
170
171
172
173
174
175
# File 'lib/rbbt/workflow/step.rb', line 167

def recursive_clean
  clean
  rec_dependencies.each do |step| 
    if Open.exists?(step.info_file) 
      step.clean 
    else
    end
  end
end

#relay_log(step) ⇒ Object



53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
# File 'lib/rbbt/workflow/step.rb', line 53

def relay_log(step)
  return self unless Task === self.task and not self.task.name.nil?
  if not self.respond_to? :original_log
    class << self
      attr_accessor :relay_step
      alias original_log log 
      def log(status, message = nil)
        self.status = status
        message Log.uncolor message
        relay_step.log([task.name.to_s, status.to_s] * ">", message.nil? ? nil : message ) unless (relay_step.done? or relay_step.error? or relay_step.aborted?)
      end
    end
  end
  @relay_step = step
  self
end

#run(no_load = false) ⇒ Object



173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
# File 'lib/rbbt/workflow/step/run.rb', line 173

def run(no_load = false)
  result = nil

  begin
    @mutex.synchronize do
      no_load = no_load ? :stream : false
      result = Persist.persist "Job", @task.result_type, :file => path, :check => checks, :no_load => no_load do |lockfile|
        if Step === Step.log_relay_step and not self == Step.log_relay_step
          relay_log(Step.log_relay_step) unless self.respond_to? :relay_step and self.relay_step
        end
        @exec = false

        Open.rm info_file if Open.exists? info_file

        log :setup, "#{Log.color :green, "Task"} #{Log.color :yellow, task.name.to_s || ""}"

        merge_info({
          :pid => Process.pid,
          :issued => Time.now,
          :name => name,
          :dependencies => dependencies.collect{|dep| [dep.task_name, dep.name, dep.path]},
        })

        dup_inputs
        begin
          run_dependencies
        rescue
          log(:error, "Error procesing dependencies")
          stop_dependencies
          raise $!
        end


        set_info :inputs, Misc.remove_long_items(Misc.zip2hash(task.inputs, @inputs)) unless task.inputs.nil?

        set_info :started, (start_time = Time.now)
        log :started, "#{Log.color :green, "Starting task"} #{Log.color :yellow, task.name.to_s || ""} [#{Process.pid}]"

        begin
          result = _exec
        rescue Aborted
          stop_dependencies
          log(:error, "Aborted")
          raise $!
        rescue Exception
          backtrace = $!.backtrace

          # HACK: This fixes an strange behaviour in 1.9.3 where some
          # backtrace strings are coded in ASCII-8BIT
          set_info :backtrace, backtrace 
          log(:error, "#{$!.class}: #{$!.message}")
          backtrace.each{|l| l.force_encoding("UTF-8")} if String.instance_methods.include? :force_encoding
          stop_dependencies
          raise $!
        end

        if not no_load or ENV["RBBT_NO_STREAM"] == "true" 
          result = prepare_result result, @task.description, info if IO === result 
          result = prepare_result result.stream, @task.description, info if TSV::Dumper === result 
        end

        case result
        when IO

          log :streaming, "#{Log.color :magenta, "Streaming task result IO"} #{Log.color :yellow, task.name.to_s || ""} [#{Process.pid}]"
          ConcurrentStream.setup result do
            begin
              set_info :done, (done_time = Time.now)
              set_info :time_elapsed, (time_elapsed = done_time - start_time)
              log :done, "#{Log.color :red, "Completed task"} #{Log.color :yellow, task.name.to_s || ""} [#{Process.pid}] +#{time_elapsed.to_i} -- #{path}"
            rescue
              Log.exception $!
            ensure
              join
            end
          end
          result.abort_callback = Proc.new do
            begin
              log :error, "#{Log.color :red, "ERROR -- streamming aborted"} #{Log.color :yellow, task.name.to_s || ""} [#{Process.pid}] -- #{path}" if status == :streaming
            rescue
              Log.exception $!
            ensure
              join
            end
          end
        when TSV::Dumper
          log :streaming, "#{Log.color :magenta, "Streaming task result TSV::Dumper"} #{Log.color :yellow, task.name.to_s || ""} [#{Process.pid}]"
          ConcurrentStream.setup result.stream do
            begin
              set_info :done, (done_time = Time.now)
              set_info :time_elapsed, (time_elapsed = done_time - start_time)
              log :done, "#{Log.color :red, "Completed task"} #{Log.color :yellow, task.name.to_s || ""} [#{Process.pid}] +#{time_elapsed.to_i} -- #{path}"
            rescue
              Log.exception $!
            ensure
              join
            end
          end
          result.stream.abort_callback = Proc.new do
            begin
              log :error, "#{Log.color :red, "ERROR -- streamming aborted"} #{Log.color :yellow, task.name.to_s || ""} [#{Process.pid}] -- #{path}"  if status == :streaming
            rescue Exception
              Log.exception $!
            end
          end
        else
          set_info :done, (done_time = Time.now)
          set_info :time_elapsed, (time_elapsed = done_time - start_time)
          log :done, "#{Log.color :red, "Completed task"} #{Log.color :yellow, task.name.to_s || ""} [#{Process.pid}] +#{time_elapsed.to_i}"
        end

        result
      end

      if no_load
        @result ||= result
        self
      else
        @result = prepare_result result, @task.result_description
      end
    end
  rescue Exception
    self.abort
    raise $!
  end
end

#run_dependenciesObject



119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
# File 'lib/rbbt/workflow/step/run.rb', line 119

def run_dependencies
  @seen ||= []
  dependencies.uniq.each do |dependency| 
    next if seen.collect{|d| d.path}.include?(dependency.path)
    dependency.seen = seen
    @seen << dependency
    @seen.concat dependency.rec_dependencies.collect{|d| d } 
    @seen.uniq!
  end

  @seen.each do |dependency|
    next if dependency == self
    next unless dependencies.include? dependency
    dependency.relay_log self
    dependency.dup_inputs
  end

  @seen.each do |dependency| 
    next if dependency == self
    next unless dependencies.include? dependency
    Log.info "#{Log.color :magenta, "Checking dependency"} #{Log.color :yellow, task.name.to_s || ""} => #{Log.color :yellow, dependency.task_name.to_s || ""} -- #{Log.color :blue, dependency.path}"
    begin
      if dependency.streaming? 
        next if dependency.running?
        dependency.clean 
      else
        dependency.clean if (dependency.error? or dependency.aborted? or not dependency.done?)
      end

      unless dependency.result or dependency.done?
        dependency.run(true) 
      end
    rescue Aborted
      backtrace = $!.backtrace
      set_info :backtrace, backtrace 
      log(:error, "Aborted dependency #{Log.color :yellow, dependency.task.name.to_s}")
      self.abort
      raise $!
    rescue Interrupt
      backtrace = $!.backtrace
      set_info :backtrace, backtrace 
      self.abort
      log(:error, "Interrupted dependency #{Log.color :yellow, dependency.task.name.to_s}")
      raise $!
    rescue Exception
      backtrace = $!.backtrace
      set_info :backtrace, backtrace 
      log(:error, "Exception processing dependency #{Log.color :yellow, dependency.task.name.to_s} -- #{$!.class}: #{$!.message}")
      self.abort
      raise $!
    end
  end
end

#running?Boolean

Returns:

  • (Boolean)


221
222
223
224
225
226
227
# File 'lib/rbbt/workflow/accessor.rb', line 221

def running?
  return nil if not Open.exists? info_file
  return nil if info[:pid].nil?

  pid = @pid || info[:pid]
  return Misc.pid_exists?(pid) 
end

#save_file(name, content) ⇒ Object



254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
# File 'lib/rbbt/workflow/accessor.rb', line 254

def save_file(name, content)
  content = case
            when String === content
              content
            when Array === content
              content * "\n"
            when TSV === content
              content.to_s
            when Hash === content
              content.collect{|*p| p * "\t"} * "\n"
            else
              content.to_s
            end
  Open.write(file(name), content)
end

#set_info(key, value) ⇒ Object



74
75
76
77
78
79
80
81
82
83
84
85
86
# File 'lib/rbbt/workflow/accessor.rb', line 74

def set_info(key, value)
  return nil if @exec or info_file.nil?
  value = Annotated.purge value if defined? Annotated
  lock_filename = Persist.persistence_path(info_file, {:dir => Step.lock_dir})
  Open.lock(info_file, :refresh => false) do
    i = info
    i[key] = value 
    @info_cache = i
    Open.write(info_file, INFO_SERIALIAZER.dump(i))
    @info_cache_time = Time.now
    value
  end
end

#started?Boolean

Returns:

  • (Boolean)


209
210
211
# File 'lib/rbbt/workflow/accessor.rb', line 209

def started?
  Open.exists? info_file
end

#statusObject



102
103
104
# File 'lib/rbbt/workflow/accessor.rb', line 102

def status
  info[:status]
end

#status=(status) ⇒ Object



106
107
108
# File 'lib/rbbt/workflow/accessor.rb', line 106

def status=(status)
  set_info(:status, status)
end

#step(name) ⇒ Object



177
178
179
180
181
182
# File 'lib/rbbt/workflow/step.rb', line 177

def step(name)
  @steps ||= {}
  @steps[name] ||= rec_dependencies.select do |step| 
    step.task_name.to_sym == name.to_sym
  end.first
end

#stop_dependenciesObject



348
349
350
351
352
353
# File 'lib/rbbt/workflow/step/run.rb', line 348

def stop_dependencies
  dependencies.each do |dep|
    dep.abort
  end
  kill_children
end

#streaming?Boolean

Returns:

  • (Boolean)


217
218
219
# File 'lib/rbbt/workflow/accessor.rb', line 217

def streaming?
  IO === @result or status == :streaming
end

#task_nameObject



40
41
42
# File 'lib/rbbt/workflow/step.rb', line 40

def task_name
  @task.name
end