Class: Step

Inherits:
Object
  • Object
show all
Defined in:
lib/rbbt/workflow/step.rb,
lib/rbbt/workflow/accessor.rb,
lib/rbbt/workflow/step/run.rb

Direct Known Subclasses

WorkflowRESTClient::RemoteStep

Constant Summary collapse

INFO_SERIALIAZER =
Marshal
STREAM_CACHE =
{}
STREAM_CACHE_MUTEX =
Mutex.new

Class Attribute Summary collapse

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(path, task = nil, inputs = nil, dependencies = nil, bindings = nil, clean_name = nil) ⇒ Step

Returns a new instance of Step.



31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
# File 'lib/rbbt/workflow/step.rb', line 31

def initialize(path, task = nil, inputs = nil, dependencies = nil, bindings = nil, clean_name = nil)
  path = Path.setup(Misc.sanitize_filename(path)) if String === path
  path = path.call if Proc === path

  @path = path
  @task = task
  @bindings = bindings
  @dependencies = case
                  when dependencies.nil? 
                    []
                  when Array === dependencies
                    dependencies
                  else
                    [dependencies]
                  end
  @mutex = Mutex.new
  @info_mutex = Mutex.new
  @inputs = inputs || []
  NamedArray.setup @inputs, task.inputs.collect{|s| s.to_s} if task and task.respond_to? :inputs and task.inputs
end

Class Attribute Details

.lock_dirObject

Returns the value of attribute lock_dir.



14
15
16
# File 'lib/rbbt/workflow/step.rb', line 14

def lock_dir
  @lock_dir
end

.log_relay_stepObject

Returns the value of attribute log_relay_step.



74
75
76
# File 'lib/rbbt/workflow/step.rb', line 74

def log_relay_step
  @log_relay_step
end

Instance Attribute Details

#bindingsObject

Returns the value of attribute bindings.



8
9
10
# File 'lib/rbbt/workflow/step.rb', line 8

def bindings
  @bindings
end

#clean_nameObject

Returns the value of attribute clean_name.



8
9
10
# File 'lib/rbbt/workflow/step.rb', line 8

def clean_name
  @clean_name
end

#dependenciesObject

Returns the value of attribute dependencies.



8
9
10
# File 'lib/rbbt/workflow/step.rb', line 8

def dependencies
  @dependencies
end

#duppedObject (readonly)

Returns the value of attribute dupped.



3
4
5
# File 'lib/rbbt/workflow/step/run.rb', line 3

def dupped
  @dupped
end

#exec(no_load = false) ⇒ Object

Returns the value of attribute exec.



10
11
12
# File 'lib/rbbt/workflow/step.rb', line 10

def exec
  @exec
end

#inputsObject

Returns the value of attribute inputs.



8
9
10
# File 'lib/rbbt/workflow/step.rb', line 8

def inputs
  @inputs
end

#mutexObject

Returns the value of attribute mutex.



11
12
13
# File 'lib/rbbt/workflow/step.rb', line 11

def mutex
  @mutex
end

#pathObject

Returns the value of attribute path.



8
9
10
# File 'lib/rbbt/workflow/step.rb', line 8

def path
  @path
end

#pidObject

Returns the value of attribute pid.



9
10
11
# File 'lib/rbbt/workflow/step.rb', line 9

def pid
  @pid
end

#resultObject

Returns the value of attribute result.



11
12
13
# File 'lib/rbbt/workflow/step.rb', line 11

def result
  @result
end

#saved_streamObject (readonly)

Returns the value of attribute saved_stream.



3
4
5
# File 'lib/rbbt/workflow/step/run.rb', line 3

def saved_stream
  @saved_stream
end

#seenObject

Returns the value of attribute seen.



11
12
13
# File 'lib/rbbt/workflow/step.rb', line 11

def seen
  @seen
end

#streamObject (readonly)

Returns the value of attribute stream.



3
4
5
# File 'lib/rbbt/workflow/step/run.rb', line 3

def stream
  @stream
end

#taskObject

Returns the value of attribute task.



8
9
10
# File 'lib/rbbt/workflow/step.rb', line 8

def task
  @task
end

Class Method Details

.clean(path) ⇒ Object



158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
# File 'lib/rbbt/workflow/step.rb', line 158

def self.clean(path)
  info_file = Step.info_file path
  files_dir = Step.files_dir path
  if Open.exists?(path) or Open.exists?(info_file)
    begin
      self.abort if self.running?
    rescue Exception
    end

    @result = nil
    @pid = nil

    Misc.insist do
      Open.rm info_file if Open.exists? info_file
      Open.rm info_file + '.lock' if Open.exists? info_file + '.lock'
      Open.rm path if Open.exists? path
      Open.rm path + '.lock' if Open.exists? path + '.lock'
      Open.rm_rf files_dir if Open.exists? files_dir
    end
  end
end

.dup_stream(stream) ⇒ Object



7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# File 'lib/rbbt/workflow/step/run.rb', line 7

def self.dup_stream(stream)
  case stream
  when IO, File
    return stream if stream.closed?

    STREAM_CACHE_MUTEX.synchronize do
      case current = STREAM_CACHE[stream]
      when nil
        Log.medium "Not duplicating stream #{ Misc.fingerprint(stream) }"
        STREAM_CACHE[stream] = stream
      when File
        if Open.exists? current.path 
          Log.medium "Reopening file #{ Misc.fingerprint(current) }"
          Open.open(current.path)
        else
          Log.medium "Duplicating file #{ Misc.fingerprint(current) } #{current.inspect}"
          Misc.dup_stream(current)
        end

      else
        Log.medium "Duplicating stream #{ Misc.fingerprint(stream) }"
        Misc.dup_stream(current)
      end
    end
  when TSV::Dumper#, TSV::Parser
    stream = stream.stream
    return stream if stream.closed?

    STREAM_CACHE_MUTEX.synchronize do
      if STREAM_CACHE[stream].nil?
        Log.high "Not duplicating dumper #{ stream.inspect }"
        STREAM_CACHE[stream] = stream
      else
        new = Misc.dup_stream(STREAM_CACHE[stream])
        Log.high "Duplicating dumper #{ stream.inspect } into #{new.inspect}"
        new
      end
    end
  else
    stream
  end
end

.files_dir(path) ⇒ Object



26
27
28
# File 'lib/rbbt/workflow/accessor.rb', line 26

def self.files_dir(path)
  path.nil? ? nil : path + '.files'
end

.info_file(path) ⇒ Object



30
31
32
# File 'lib/rbbt/workflow/accessor.rb', line 30

def self.info_file(path)
  path.nil? ? nil : path + '.info'
end

.job_name_for_info_file(info_file, extension = nil) ⇒ Object



45
46
47
48
49
50
51
# File 'lib/rbbt/workflow/accessor.rb', line 45

def self.job_name_for_info_file(info_file, extension = nil)
  if extension and not extension.empty?
    info_file.sub(/\.#{extension}\.info$/,'')
  else
    info_file.sub(/\.info$/,'')
  end
end

.log(status, message, path, &block) ⇒ Object



236
237
238
239
240
241
242
243
244
245
246
# File 'lib/rbbt/workflow/accessor.rb', line 236

def self.log(status, message, path, &block)
  if block
    if Hash === message
      log_progress(status, message, path, &block)
    else
      log_block(status, message, path, &block)
    end
  else
    log_string(status, message, path)
  end
end

.log_block(status, message, path, &block) ⇒ Object



173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
# File 'lib/rbbt/workflow/accessor.rb', line 173

def self.log_block(status, message, path, &block)
  start = Time.now
  status = status.to_s
  status_color = self.status_color status

  Log.info do 
    now = Time.now
    str = Log.color :reset
    str << "#{ Log.color status_color, status}"
    str << ": #{ message }" if message
    str << " -- #{Log.color :blue, path.to_s}" if path
    str << " #{Log.color :yellow, Process.pid}"
    str
  end
  res = yield
  eend = Time.now
  Log.info do 
    now = Time.now
    str = "#{ Log.color :cyan, status.to_s } +#{Log.color :green, "%.2f" % (eend - start)}"
    str << " -- #{Log.color :blue, path.to_s}" if path
    str << " #{Log.color :yellow, Process.pid}"
    str
  end
  res
end

.log_progress(status, options = {}, path = nil, &block) ⇒ Object



214
215
216
217
218
219
220
221
222
223
224
225
226
# File 'lib/rbbt/workflow/accessor.rb', line 214

def self.log_progress(status, options = {}, path = nil, &block)
  options = Misc.add_defaults options, :severity => Log::INFO, :file => path
  max = Misc.process_options options, :max
  Log::ProgressBar.with_bar(max, options) do |bar|
    begin
      res = yield bar
      raise KeepBar.new res if IO === res
      res
    rescue
      Log.exception $!
    end
  end
end

.log_string(status, message, path) ⇒ Object



199
200
201
202
203
204
205
206
207
208
209
210
211
212
# File 'lib/rbbt/workflow/accessor.rb', line 199

def self.log_string(status, message, path)
  Log.info do 

    status = status.to_s
    status_color = self.status_color status

    str = Log.color :reset
    str << "#{ Log.color status_color, status}"
    str << ": #{ message }" if message
    str << " -- #{Log.color :blue, path.to_s}" if path
    str << " #{Log.color :yellow, Process.pid}"
    str
  end
end

.purge_stream_cacheObject



50
51
52
53
54
55
56
57
58
59
60
# File 'lib/rbbt/workflow/step/run.rb', line 50

def self.purge_stream_cache
  return
  STREAM_CACHE_MUTEX.synchronize do
    STREAM_CACHE.collect{|k,s| 
      Thread.new do
        Misc.consume_stream s
      end
    }
    STREAM_CACHE.clear
  end
end

.started?Boolean

Returns:

  • (Boolean)


9
10
11
# File 'lib/rbbt/workflow/accessor.rb', line 9

def self.started?
  info_file.exists?
end

.status_color(status) ⇒ Object



159
160
161
162
163
164
165
166
167
168
169
170
171
# File 'lib/rbbt/workflow/accessor.rb', line 159

def self.status_color(status)
  status = status.split(">").last
  case status
  when "starting"
    :yellow
  when "error", "aborted"
    :red
  when "done"
    :green
  else
    :cyan
  end
end

.step_info(path) ⇒ Object



34
35
36
37
38
39
40
41
42
43
# File 'lib/rbbt/workflow/accessor.rb', line 34

def self.step_info(path)
  begin
    Open.open(info_file(path)) do |f|
      INFO_SERIALIAZER.load(f)
    end
  rescue Exception
    Log.exception $!
    {}
  end
end

.wait_for_jobs(jobs) ⇒ Object



13
14
15
16
17
18
19
20
21
22
23
24
# File 'lib/rbbt/workflow/accessor.rb', line 13

def self.wait_for_jobs(jobs)
  jobs = [jobs] if Step === jobs
  begin
    threads = []
    jobs.each do |j| threads << Thread.new{j.join} end
    threads.each{|t| t.join }
  rescue Exception
    threads.each{|t| t.exit }
    jobs.each do |j| j.abort end
    raise $!
  end
end

Instance Method Details

#_abortObject



378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
# File 'lib/rbbt/workflow/step/run.rb', line 378

def _abort
  return if @aborted
  @aborted = true
  return if done?
  Log.medium{"#{Log.color :red, "Aborting"} #{Log.color :blue, path}"}
  begin
    stop_dependencies
    abort_stream
    abort_pid
  rescue Aborted
    Log.medium{"#{Log.color :red, "Aborting ABORTED RETRY"} #{Log.color :blue, path}"}
    retry
  rescue Exception
    retry
  ensure
    if Open.exists? path
      Log.warn "Aborted job had finished. Removing result -- #{ path }"
      begin
        Open.rm path
      rescue Exception
        Log.warn "Exception removing result of aborted job: #{$!.message}"
      end
    end
  end
  Log.medium{"#{Log.color :red, "Aborted"} #{Log.color :blue, path}"}
end

#_execObject



83
84
85
86
# File 'lib/rbbt/workflow/step/run.rb', line 83

def _exec
  @exec = true if @exec.nil?
  @task.exec_in((bindings ? bindings : self), *@inputs)
end

#abortObject



405
406
407
408
# File 'lib/rbbt/workflow/step/run.rb', line 405

def abort
  _abort
  log(:aborted, "Job aborted") unless aborted? or error?
end

#abort_pidObject



339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
# File 'lib/rbbt/workflow/step/run.rb', line 339

def abort_pid
  @pid ||= info[:pid]

  case @pid
  when nil
    Log.medium "Could not abort #{path}: no pid"
    false
  when Process.pid
    Log.medium "Could not abort #{path}: same process"
    false
  else
    Log.medium "Aborting #{path}: #{ @pid }"
    begin
      Process.kill("KILL", @pid)
      Process.waitpid @pid
    rescue Exception
      Log.debug("Aborted job #{@pid} was not killed: #{$!.message}")
    end
    true
  end
end

#abort_streamObject



361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
# File 'lib/rbbt/workflow/step/run.rb', line 361

def abort_stream
  stream = get_stream if @result
  stream ||= @saved_stream 
  @saved_stream = nil
  if stream and stream.respond_to? :abort and not stream.aborted?
    begin
      Log.medium "Aborting job stream #{stream.inspect} -- #{Log.color :blue, path}"
      stream.abort 
      #stream.close unless stream.closed?
    rescue Aborted
      Log.medium "Aborting job stream #{stream.inspect} ABORTED RETRY -- #{Log.color :blue, path}"
      Log.exception $!
      retry
    end
  end
end

#aborted?Boolean

Returns:

  • (Boolean)


290
291
292
# File 'lib/rbbt/workflow/accessor.rb', line 290

def aborted?
  @aborted || status == :aborted
end

#checksObject



98
99
100
# File 'lib/rbbt/workflow/step/run.rb', line 98

def checks
  rec_dependencies.collect{|dependency| dependency.path }.uniq
end

#child(&block) ⇒ Object



137
138
139
140
141
142
143
144
145
146
147
148
# File 'lib/rbbt/workflow/step.rb', line 137

def child(&block)
  child_pid = Process.fork &block
  children_pids = info[:children_pids]
  if children_pids.nil?
    children_pids = [child_pid]
  else
    children_pids << child_pid
  end
  #Process.detach(child_pid)
  set_info :children_pids, children_pids
  child_pid
end

#cleanObject



180
181
182
183
# File 'lib/rbbt/workflow/step.rb', line 180

def clean
  Step.clean(path)
  self
end

#done?Boolean

Returns:

  • (Boolean)


270
271
272
# File 'lib/rbbt/workflow/accessor.rb', line 270

def done?
  path and File.exists? path
end

#dup_inputsObject



75
76
77
78
79
80
81
# File 'lib/rbbt/workflow/step/run.rb', line 75

def dup_inputs
  return if @dupped or ENV["RBBT_NO_STREAM"] == 'true'
  @inputs = @inputs.collect do |input|
    Step.dup_stream input
  end
  @dupped = true
end

#error?Boolean

Returns:

  • (Boolean)


286
287
288
# File 'lib/rbbt/workflow/accessor.rb', line 286

def error?
  status == :error
end

#exception(ex, msg = nil) ⇒ Object



254
255
256
257
258
259
260
261
262
263
264
# File 'lib/rbbt/workflow/accessor.rb', line 254

def exception(ex, msg = nil)
  self._abort
  ex_class = ex.class.to_s
  set_info :backtrace, ex.backtrace
  set_info :exception, {:class => ex_class, :message => ex.message, :backtrace => ex.backtrace}
  if msg.nil?
    log :error, "#{ex_class} -- #{ex.message}"
  else
    log :error, "#{msg} -- #{ex.message}"
  end
end

#file(name) ⇒ Object



307
308
309
# File 'lib/rbbt/workflow/accessor.rb', line 307

def file(name)
  Path.setup(File.join(files_dir, name.to_s))
end

#filesObject



300
301
302
303
304
305
# File 'lib/rbbt/workflow/accessor.rb', line 300

def files
  files = Dir.glob(File.join(files_dir, '**', '*')).reject{|path| File.directory? path}.collect do |path| 
    Misc.path_relative_to(files_dir, path) 
  end
  files
end

#files_dirObject

{{{ INFO



296
297
298
# File 'lib/rbbt/workflow/accessor.rb', line 296

def files_dir
  @files_dir ||= Step.files_dir path
end

#fork(semaphore = nil) ⇒ Object



282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
# File 'lib/rbbt/workflow/step/run.rb', line 282

def fork(semaphore = nil)
  raise "Can not fork: Step is waiting for proces #{@pid} to finish" if not @pid.nil? and not Process.pid == @pid and Misc.pid_exists?(@pid) and not done? and info[:forked]
  @pid = Process.fork do
    Misc.pre_fork
    begin
      RbbtSemaphore.wait_semaphore(semaphore) if semaphore
      FileUtils.mkdir_p File.dirname(path) unless File.exists? File.dirname(path)
      begin
        res = run true
        set_info :forked, true
      rescue Aborted
        Log.debug{"Forked process aborted: #{path}"}
        log :aborted, "Job aborted (#{Process.pid})"
        raise $!
      rescue Exception
        Log.debug("Exception '#{$!.message}' caught on forked process: #{path}")
        raise $!
      ensure
        join_stream
      end

      begin
        children_pids = info[:children_pids]
        if children_pids
          children_pids.each do |pid|
            if Misc.pid_exists? pid
              begin
                Process.waitpid pid
              rescue Errno::ECHILD
                Log.low "Waiting on #{ pid } failed: #{$!.message}"
              end
            end
          end
          set_info :children_done, Time.now
        end
      rescue Exception
        Log.debug("Exception waiting for children: #{$!.message}")
        RbbtSemaphore.post_semaphore(semaphore) if semaphore
        Kernel.exit! -1
      end
      set_info :pid, nil
    ensure
      RbbtSemaphore.post_semaphore(semaphore) if semaphore
      Kernel.exit! 0
    end
  end
  Process.detach(@pid)
  self
end

#get_streamObject



62
63
64
65
66
67
68
69
70
71
72
73
# File 'lib/rbbt/workflow/step/run.rb', line 62

def get_stream
  @mutex.synchronize do
    begin
      return nil if @saved_stream
      if IO === @result 
        @saved_stream = @result 
      else 
        nil
      end
    end
  end
end

#graceObject



431
432
433
434
435
436
# File 'lib/rbbt/workflow/step/run.rb', line 431

def grace
  until done? or result or error? or aborted? or streaming? 
    sleep 1 
  end
  self
end

#info(check_lock = true) ⇒ Object



74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
# File 'lib/rbbt/workflow/accessor.rb', line 74

def info(check_lock = true)
  return {} if info_file.nil? or not Open.exists? info_file
  begin
    Misc.insist do
      begin
        return @info_cache if @info_cache and File.ctime(info_file) < @info_cache_time
      rescue Exception
        raise $!
      end

      begin
        @info_cache = Misc.insist(3, 1.6, info_file) do
          Misc.insist(2, 1, info_file) do
            Misc.insist(3, 0.2, info_file) do
              raise TryAgain, "Info locked" if check_lock and info_lock.locked?
              Open.open(info_file) do |file|
                INFO_SERIALIAZER.load(file) #|| {}
              end
            end
          end
        end
        @info_cache_time = Time.now
        @info_cache
      end
    end
  rescue Exception
    Log.debug{"Error loading info file: " + info_file}
    Log.exception $!
    Open.rm info_file
    Misc.sensiblewrite(info_file, INFO_SERIALIAZER.dump({:status => :error, :messages => ["Info file lost"]}))
    raise $!
  end
end

#info_fileObject

{{{ INFO



63
64
65
# File 'lib/rbbt/workflow/accessor.rb', line 63

def info_file
  @info_file ||= Step.info_file(path)
end

#info_lockObject



67
68
69
70
71
72
# File 'lib/rbbt/workflow/accessor.rb', line 67

def info_lock
  @info_lock ||= begin
                   path = Persist.persistence_path(info_file + '.lock', {:dir => Step.lock_dir})
                   Lockfile.new path
                 end
end

#joinObject



438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
# File 'lib/rbbt/workflow/step/run.rb', line 438

def join

  grace

  join_stream if status.to_s == "streaming"

  return self if not Open.exists? info_file

  return self if info[:joined]
  pid = @pid 

  Misc.insist [0.1, 0.2, 0.5, 1] do
    pid ||= info[:pid]
  end

  begin
    if pid.nil? or Process.pid == pid
      dependencies.each{|dep| dep.join }
    else
      begin
        Log.debug{"Waiting for pid: #{pid}"}
        Process.waitpid pid 
      rescue Errno::ECHILD
        Log.debug{"Process #{ pid } already finished: #{ path }"}
      end if Misc.pid_exists? pid
      pid = nil
      dependencies.each{|dep| dep.join }
    end
    sleep 1 until path.exists?
    self
  ensure
    set_info :joined, true
  end
end

#join_streamObject



410
411
412
413
414
415
416
417
418
419
420
421
422
# File 'lib/rbbt/workflow/step/run.rb', line 410

def join_stream
  stream = get_stream if @result
  if stream
    begin
      Misc.consume_stream stream 
      stream.join if stream.respond_to? :join
    rescue Exception
      stream.abort
      self._abort
      raise $!
    end
  end
end

#kill_childrenObject



102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
# File 'lib/rbbt/workflow/step/run.rb', line 102

def kill_children
  begin
    children_pids = info[:children_pids]
    if children_pids and children_pids.any?
      Log.medium("Killing children: #{ children_pids * ", " }")
      children_pids.each do |pid|
        Log.medium("Killing child #{ pid }")
        begin
          Process.kill "INT", pid
        rescue Exception
          Log.medium("Exception killing child #{ pid }: #{$!.message}")
        end
      end
    end
  rescue
    Log.medium("Exception finding children")
  end
end

#loadObject



151
152
153
154
155
156
# File 'lib/rbbt/workflow/step.rb', line 151

def load
  return prepare_result @result, @task.result_description if @result and not @path == @result
  join if not done?
  return Persist.load_file(@path, @task.result_type) if @path.exists?
  exec
end

#load_file(name, type = nil, options = {}) ⇒ Object



327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
# File 'lib/rbbt/workflow/accessor.rb', line 327

def load_file(name, type = nil, options = {})
  if type.nil? and name =~ /.*\.(\w+)$/
    extension = name.match(/.*\.(\w+)$/)[1]
    case extension
    when "tc"
      type = :tc
    when "tsv"
      type = :tsv
    when "list", "ary", "array"
      type = :array
    when "yaml"
      type = :yaml
    when "marshal"
      type = :marshal
    else
      type = :other
    end
  else
    type ||= :other
  end

  case type.to_sym
  when :tc
    Persist.open_tokyocabinet(file(name), false)
  when :tsv
    TSV.open Open.open(file(name)), options
  when :array
    #Open.read(file(name)).split /\n|,\s*/
    Open.read(file(name)).split "\n"
  when :yaml
    YAML.load(Open.open(file(name)))
  when :marshal
    Marshal.load(Open.open(file(name)))
  else
    Open.read(file(name))
  end
end

#log(status, message = nil, &block) ⇒ Object



248
249
250
251
252
# File 'lib/rbbt/workflow/accessor.rb', line 248

def log(status, message = nil, &block)
  self.status = status
  self.message Log.uncolor(message)
  Step.log(status, message, path, &block)
end

#log_progress(status, options = {}, &block) ⇒ Object



228
229
230
# File 'lib/rbbt/workflow/accessor.rb', line 228

def log_progress(status, options = {}, &block)
  Step.log_progress(status, options, file(:progress), &block)
end

#merge_info(hash) ⇒ Object



121
122
123
124
125
126
127
128
129
130
131
132
# File 'lib/rbbt/workflow/accessor.rb', line 121

def merge_info(hash)
  return nil if @exec or info_file.nil?
  value = Annotated.purge value if defined? Annotated
  Open.lock(info_file, :lock => info_lock) do
    i = info(false)
    i.merge! hash
    @info_cache = i
    Misc.sensiblewrite(info_file, INFO_SERIALIAZER.dump(i), :force => true)
    @info_cache_time = Time.now
    value
  end
end

#message(message) ⇒ Object



155
156
157
# File 'lib/rbbt/workflow/accessor.rb', line 155

def message(message)
  set_info(:messages, (messages || []) << message)
end

#messagesObject



147
148
149
150
151
152
153
# File 'lib/rbbt/workflow/accessor.rb', line 147

def messages
  if messages = info[:messages]
    messages
  else
    set_info(:messages, []) if self.respond_to?(:set_info)
  end
end

#nameObject



53
54
55
# File 'lib/rbbt/workflow/accessor.rb', line 53

def name
  path.sub(/.*\/#{Regexp.quote task.name.to_s}\/(.*)/, '\1')
end

#prepare_result(value, description = nil, info = {}) ⇒ Object



94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
# File 'lib/rbbt/workflow/step.rb', line 94

def prepare_result(value, description = nil, info = {})
  case 
  when IO === value
    begin
      res = case @task.result_type
            when :array
              array = []
              while line = value.gets
                array << line.strip
              end
              array
            when :tsv
              begin
                TSV.open(value)
              rescue IOError
                TSV.setup({})
              end
            else
              value.read
            end
      value.join if value.respond_to? :join
      res
    rescue Exception
      value.abort if value.respond_to? :abort
      self.abort
      raise $!
    end
  when (not defined? Entity or description.nil? or not Entity.formats.include? description)
    value
  when (Annotated === value and info.empty?)
    value
  when Annotated === value
    annotations = value.annotations
    info.each do |k,v|
      value.send("#{h}=", v) if annotations.include? k
    end
    value
  else
    Entity.formats[description].setup(value, info.merge(:format => description))
  end
end

#progress_bar(msg, options = {}) ⇒ Object



232
233
234
# File 'lib/rbbt/workflow/accessor.rb', line 232

def progress_bar(msg, options = {})
  Log::ProgressBar.new nil, {:desc => msg, :file => file(:progress)}.merge(options)
end

#provenanceObject



365
366
367
368
369
370
371
372
373
374
375
376
# File 'lib/rbbt/workflow/accessor.rb', line 365

def provenance
  provenance = {}
  dependencies.each do |dep|
    next unless dep.path.exists?
    if File.exists? dep.info_file
      provenance[dep.path] = dep.provenance if File.exists? dep.path
    else
      provenance[dep.path] = nil
    end
  end
  {:inputs => info[:inputs], :provenance => provenance}
end

#provenance_pathsObject



378
379
380
381
382
383
384
# File 'lib/rbbt/workflow/accessor.rb', line 378

def provenance_paths
  provenance = {}
  dependencies.each do |dep|
    provenance[dep.path] = dep.provenance_paths if File.exists? dep.path
  end
  provenance
end

#rec_dependenciesObject



185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
# File 'lib/rbbt/workflow/step.rb', line 185

def rec_dependencies

  # A step result with no info_file means that it was manually
  # placed. In that case, do not consider its dependencies
  return [] if Open.exists?(self.path.to_s) and not Open.exists? self.info_file

  return [] if dependencies.nil? or dependencies.empty?
  new_dependencies = dependencies.collect{|step| 
    step.rec_dependencies 
  }.flatten.uniq.compact

  dependencies = self.dependencies ? self.dependencies + new_dependencies : new_dependencies
  dependencies.flatten!
  dependencies.uniq!
  dependencies
end

#recursive_cleanObject



202
203
204
205
206
207
208
209
210
211
# File 'lib/rbbt/workflow/step.rb', line 202

def recursive_clean
  clean
  rec_dependencies.each do |step| 
    if Open.exists?(step.info_file) 
      step.clean 
    else
    end
  end
  self
end

#relay_log(step) ⇒ Object



77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
# File 'lib/rbbt/workflow/step.rb', line 77

def relay_log(step)
  return self unless Task === self.task and not self.task.name.nil?
  if not self.respond_to? :original_log
    class << self
      attr_accessor :relay_step
      alias original_log log 
      def log(status, message = nil)
        self.status = status
        message Log.uncolor message
        relay_step.log([task.name.to_s, status.to_s] * ">", message.nil? ? nil : message ) unless (relay_step.done? or relay_step.error? or relay_step.aborted?)
      end
    end
  end
  @relay_step = step
  self
end

#run(no_load = false) ⇒ Object



168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
# File 'lib/rbbt/workflow/step/run.rb', line 168

def run(no_load = false)
  result = nil

  begin
    @mutex.synchronize do
      no_load = no_load ? :stream : false
      result = Persist.persist "Job", @task.result_type, :file => path, :check => checks, :no_load => no_load do |lockfile|
        if Step === Step.log_relay_step and not self == Step.log_relay_step
          relay_log(Step.log_relay_step) unless self.respond_to? :relay_step and self.relay_step
        end
        @exec = false

        Open.rm info_file if Open.exists? info_file

        log :setup, "#{Log.color :cyan, "Setup"} #{Log.color :yellow, task.name.to_s || ""}"

        merge_info({
          :pid => Process.pid,
          :issued => Time.now,
          :name => name,
          :clean_name => clean_name,
          :dependencies => dependencies.collect{|dep| [dep.task_name, dep.name, dep.path]},
        })

        dup_inputs
        begin
          run_dependencies
        rescue Exception
          stop_dependencies
          raise $!
        end


        set_info :inputs, Misc.remove_long_items(Misc.zip2hash(task.inputs, @inputs)) unless task.inputs.nil?

        set_info :started, (start_time = Time.now)
        log :started, "#{Log.color :magenta, "Starting"} #{Log.color :yellow, task.name.to_s || ""}"

        begin
          result = _exec
        rescue Aborted
          stop_dependencies
          log(:aborted, "Aborted")
          raise $!
        rescue Exception
          backtrace = $!.backtrace

          # HACK: This fixes an strange behaviour in 1.9.3 where some
          # backtrace strings are coded in ASCII-8BIT
          set_info :backtrace, backtrace 
          log(:error, "#{$!.class}: #{$!.message}")
          backtrace.each{|l| l.force_encoding("UTF-8")} if String.instance_methods.include? :force_encoding
          stop_dependencies
          raise $!
        end

        if not no_load or ENV["RBBT_NO_STREAM"] == "true" 
          result = prepare_result result, @task.description, info if IO === result 
          result = prepare_result result.stream, @task.description, info if TSV::Dumper === result 
        end

        stream = case result
                 when IO
                   result
                 when TSV::Dumper
                   result.stream
                 end

        if stream
          log :streaming, "#{Log.color :magenta, "Streaming"} #{Log.color :yellow, task.name.to_s || ""}"
          ConcurrentStream.setup stream do
            begin
              if status != :done
                Misc.insist do
                  set_info :done, (done_time = Time.now)
                  set_info :time_elapsed, (time_elapsed = done_time - start_time)
                  log :done, "#{Log.color :magenta, "Completed"} #{Log.color :yellow, task.name.to_s || ""} in #{time_elapsed.to_i} sec."
                end
              end
            rescue
              Log.exception $!
            ensure
              join
            end
          end
          stream.abort_callback = Proc.new do
            begin
              log :aborted, "#{Log.color :red, "Aborted"} #{Log.color :yellow, task.name.to_s || ""}" if status == :streaming
            rescue
              Log.exception $!
            end
          end
        else
          set_info :done, (done_time = Time.now)
          set_info :time_elapsed, (time_elapsed = done_time - start_time)
          log :done, "#{Log.color :magenta, "Completed"} #{Log.color :yellow, task.name.to_s || ""} in #{time_elapsed.to_i} sec."
        end

        result
      end

      if no_load
        @result ||= result
        self
      else
        @result = prepare_result result, @task.result_description
      end
    end
  rescue Exception
    exception $!
    raise $!
  end
end

#run_dependenciesObject



121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
# File 'lib/rbbt/workflow/step/run.rb', line 121

def run_dependencies
  @seen ||= []
  dependencies.uniq.each do |dependency| 
    next if seen.collect{|d| d.path}.include?(dependency.path)
    dependency.seen = seen
    @seen.concat dependency.rec_dependencies.collect{|d| d } 
    @seen << dependency
    @seen.uniq!
  end

  @seen.each do |dependency|
    next if dependency == self
    next unless dependencies.include? dependency
    dependency.dup_inputs
  end

  return if @seen.empty?

  log :dependencies, "#{Log.color :magenta, "Dependencies"} #{Log.color :yellow, task.name.to_s || ""}"
  @seen.each do |dependency| 
    next if dependency == self
    next unless dependencies.include? dependency
    Log.info "#{Log.color :cyan, "dependency"} #{Log.color :yellow, task.name.to_s || ""} => #{Log.color :yellow, dependency.task_name.to_s || ""} -- #{Log.color :blue, dependency.path}"
    begin
      if dependency.streaming? 
        next if dependency.running?
        dependency.clean 
      else
        dependency.clean if (dependency.error? or dependency.aborted? or dependency.status.nil? or not (dependency.done? or dependency.running?))
      end

      unless dependency.started? 
        dependency.run(true)
      end
    rescue Aborted
      Log.error "Aborted dep. #{Log.color :red, dependency.task.name.to_s}"
      raise $!
    rescue Interrupt
      Log.error "Interrupted while in dep. #{Log.color :red, dependency.task.name.to_s}"
      raise $!
    rescue Exception
      Log.error "Exception in dep. #{ Log.color :red, dependency.task.name.to_s }"
      raise $!
    end
  end
end

#running?Boolean

Returns:

  • (Boolean)


278
279
280
281
282
283
284
# File 'lib/rbbt/workflow/accessor.rb', line 278

def running?
  return nil if not Open.exists? info_file
  return nil if info[:pid].nil?

  pid = @pid || info[:pid]
  return Misc.pid_exists?(pid) 
end

#save_file(name, content) ⇒ Object



311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
# File 'lib/rbbt/workflow/accessor.rb', line 311

def save_file(name, content)
  content = case
            when String === content
              content
            when Array === content
              content * "\n"
            when TSV === content
              content.to_s
            when Hash === content
              content.collect{|*p| p * "\t"} * "\n"
            else
              content.to_s
            end
  Open.write(file(name), content)
end

#set_info(key, value) ⇒ Object



108
109
110
111
112
113
114
115
116
117
118
119
# File 'lib/rbbt/workflow/accessor.rb', line 108

def set_info(key, value)
  return nil if @exec or info_file.nil?
  value = Annotated.purge value if defined? Annotated
  Open.lock(info_file, :lock => info_lock) do
    i = info(false)
    i[key] = value 
    @info_cache = i
    Misc.sensiblewrite(info_file, INFO_SERIALIAZER.dump(i), :force => true, :lock => false)
    @info_cache_time = Time.now
    value
  end
end

#soft_graceObject



424
425
426
427
428
429
# File 'lib/rbbt/workflow/step/run.rb', line 424

def soft_grace
  until Open.exists? info_file
    sleep 1 
  end
  self
end

#started?Boolean

Returns:

  • (Boolean)


266
267
268
# File 'lib/rbbt/workflow/accessor.rb', line 266

def started?
  Open.exists? info_file or Open.exists? path
end

#statusObject



134
135
136
137
138
139
140
141
# File 'lib/rbbt/workflow/accessor.rb', line 134

def status
  begin
    info[:status]
  rescue Exception
    Log.error "Exception reading status: #{$!.message}" 
    :error
  end
end

#status=(status) ⇒ Object



143
144
145
# File 'lib/rbbt/workflow/accessor.rb', line 143

def status=(status)
  set_info(:status, status)
end

#step(name) ⇒ Object



213
214
215
216
217
218
219
220
221
222
# File 'lib/rbbt/workflow/step.rb', line 213

def step(name)
  @steps ||= {}
  @steps[name] ||= begin
                     deps = rec_dependencies.select{|step| 
                       step.task_name.to_sym == name.to_sym
                     }
                     deps.first
                   end

end

#stop_dependenciesObject



332
333
334
335
336
337
# File 'lib/rbbt/workflow/step/run.rb', line 332

def stop_dependencies
  dependencies.each do |dep|
    dep.abort
  end
  kill_children
end

#streaming?Boolean

Returns:

  • (Boolean)


274
275
276
# File 'lib/rbbt/workflow/accessor.rb', line 274

def streaming?
  IO === @result or status == :streaming
end

#task_nameObject



64
65
66
# File 'lib/rbbt/workflow/step.rb', line 64

def task_name
  @task.name
end