Module: DTAS::Pipeline

Includes:
SpawnFix
Defined in:
lib/dtas/pipeline.rb

Overview

:nodoc:

Instance Method Summary collapse

Methods included from SpawnFix

#spawn

Instance Method Details

#pspawn(env, cmd, rdr = {}) ⇒ Object

Process.spawn wrapper which supports running Proc-like objects in a separate process, not just external commands. Returns the pid of the spawned process



13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# File 'lib/dtas/pipeline.rb', line 13

def pspawn(env, cmd, rdr = {})
  case cmd
  when Array
    spawn(env, *cmd, rdr)
  else # support running Proc-like objects, too:
    fork do
      ENV.update(env) if env

      # setup redirects
      [ $stdin, $stdout, $stderr ].each_with_index do |io, fd|
        dst = rdr[fd] and io.reopen(dst)
      end

      # close all other pipes, since we can't rely on FD_CLOEXEC
      # (as we do not exec, here)
      rdr.each do |k, v|
        k.close if v == :close
      end
      cmd.call
    end
  end
end

#run_pipeline(env, pipeline) ⇒ Object

pipeline is an Array of (Arrays or Procs)



37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
# File 'lib/dtas/pipeline.rb', line 37

def run_pipeline(env, pipeline)
  pids = {} # pid => pipeline index
  work = pipeline.dup
  last = work.pop
  nr = work.size
  rdr = {} # redirect mapping for Process.spawn

  # we need to make sure pipes are closed in any forked processes
  # (they are redirected to stdin or stdout, first)
  pipes = nr.times.map { IO.pipe.each { |io| rdr[io] = :close } }

  # start the first and last commands first, they only have one pipe, each
  last_pid = pspawn(env, last, rdr.merge(0 => pipes[-1][0]))
  pids[last_pid] = nr
  first = work.shift
  first_pid = pspawn(env, first, rdr.merge(1 => pipes[0][1]))
  pids[first_pid] = 0

  # start the middle commands, they both have two pipes:
  work.each_with_index do |cmd, i|
    pid = pspawn(env, cmd, rdr.merge(0 => pipes[i][0], 1 => pipes[i+1][1]))
    pids[pid] = i + 1
  end

  # all pipes handed off to children, close so they see EOF
  pipes.flatten!.each(&:close).clear

  # wait for children to finish
  fails = []
  until pids.empty?
    pid, status = Process.waitpid2(-1)
    nr = pids.delete(pid)
    status.success? or
      fails << "reaped #{nr} #{pipeline[nr].inspect} #{status.inspect}"
  end
  # behave like "set -o pipefail" in bash
  raise fails.join("\n") if fails[0]
end