Class: Micron::Runner::ForkWorker

Inherits:
Object
  • Object
show all
Includes:
Debug
Defined in:
lib/micron/runner/fork_worker.rb

Overview

Fork Worker

Examples:

fw = Micron::Runner::ForkWorker.new do
  system("ls")
  "sup"
end
fw.run.wait
puts fw.stdout
puts fw.result # => sup

Constant Summary collapse

CHUNK_SIZE =
1024 * 16

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Debug

#debug

Constructor Details

#initialize(context = nil, capture_stdout = true, capture_stderr = true, &block) ⇒ ForkWorker

Returns a new instance of ForkWorker.



27
28
29
30
31
32
33
# File 'lib/micron/runner/fork_worker.rb', line 27

def initialize(context=nil, capture_stdout=true, capture_stderr=true, &block)
  @context        = context
  @capture_stdout = capture_stdout
  @capture_stderr = capture_stderr
  @block          = block
  @done           = false
end

Instance Attribute Details

#contextObject (readonly)

Returns the value of attribute context.



25
26
27
# File 'lib/micron/runner/fork_worker.rb', line 25

def context
  @context
end

#pidObject (readonly)

Returns the value of attribute pid.



25
26
27
# File 'lib/micron/runner/fork_worker.rb', line 25

def pid
  @pid
end

#statusObject (readonly)

Returns the value of attribute status.



25
26
27
# File 'lib/micron/runner/fork_worker.rb', line 25

def status
  @status
end

Instance Method Details

#errObject



155
156
157
# File 'lib/micron/runner/fork_worker.rb', line 155

def err
  @err.first
end

#outObject



151
152
153
# File 'lib/micron/runner/fork_worker.rb', line 151

def out
  @out.first
end

#resultObject



126
127
128
129
130
131
132
133
# File 'lib/micron/runner/fork_worker.rb', line 126

def result
  if !@done then
    wait()
  end
  @result = Marshal.load(@parent_read)
  @parent_read.close
  @result
end

#run(check_liveness = false) ⇒ Object



35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
# File 'lib/micron/runner/fork_worker.rb', line 35

def run(check_liveness=false)
  @out, @err = IO.pipe, IO.pipe
  @parent_read, @child_write = IO.pipe
  @child_write.fcntl(Fcntl::F_SETFD, Fcntl::FD_CLOEXEC)

  @liveness_checker = LivenessChecker.new if check_liveness

  @pid = fork do

    Thread.current[:name] = "worker"
    Micron.trap_thread_dump()

    # close unused readers in child
    @out.first.close
    @err.first.close
    @parent_read.close

    # redirect io
    STDOUT.reopen(@out.last) if @capture_stdout
    STDERR.reopen(@err.last) if @capture_stderr
    STDOUT.sync = STDERR.sync = true

    clean_parent_file_descriptors()

    @liveness_checker.pong if check_liveness

    # run
    debug("calling block")
    ret = @block.call()
    debug("block returned")

    # Pass result to parent via pipe
    Marshal.dump(ret, @child_write)
    @child_write.flush
    debug("wrote result to pipe")

    # cleanup
    @out.last.close
    @err.last.close
    @child_write.close
  end

  # close unused writers in parent
  @out.last.close
  @err.last.close
  @child_write.close

  @liveness_checker.ping(self) if check_liveness

  self
end

#stderrObject



143
144
145
146
147
148
149
# File 'lib/micron/runner/fork_worker.rb', line 143

def stderr
  return @stderr if !@stderr.nil?
  @stderr = @err.first.read
  @err.first.close
  @stderr ||= ""
  @stderr
end

#stdoutObject



135
136
137
138
139
140
141
# File 'lib/micron/runner/fork_worker.rb', line 135

def stdout
  return @stdout if !@stdout.nil?
  @stdout = @out.first.read
  @out.first.close
  @stdout ||= ""
  @stdout
end

#waitself

Blocking wait for process to finish

Returns:

  • (self)


90
91
92
93
94
95
96
97
98
# File 'lib/micron/runner/fork_worker.rb', line 90

def wait
  return self if @done

  Process.wait(@pid)
  @done = true
  @status = $?
  @liveness_checker.stop if @liveness_checker
  self
end

#wait2Process::Status

Blocking wait for process to finish

Returns:

  • (Process::Status)


103
104
105
106
107
108
109
110
# File 'lib/micron/runner/fork_worker.rb', line 103

def wait2
  return @status if @done

  pid, @status = Process.wait2(@pid)
  @done = true
  @liveness_checker.stop if @liveness_checker
  return @status
end

#wait_nonblockProcess::Status

Non-blocking wait for process to finish

Returns:

  • (Process::Status)

    nil if not yet complete



115
116
117
118
119
120
121
122
123
124
# File 'lib/micron/runner/fork_worker.rb', line 115

def wait_nonblock
  return @status if @done

  pid, @status = Process.waitpid2(@pid, Process::WNOHANG)
  if !@status.nil? then
    @done = true
    @liveness_checker.stop if @liveness_checker
  end
  return @status
end