Class: EventMachine::POSIX::Spawn::Child

Inherits:
Object
  • Object
show all
Includes:
Deferrable, EventMachine::POSIX::Spawn
Defined in:
lib/em/posix/spawn/child.rb

Defined Under Namespace

Classes: ReadableStream, SignalHandler, Stream, WritableStream

Constant Summary

Constants included from EventMachine::POSIX::Spawn

VERSION

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(*args) ⇒ Child

Spawn a new process, write all input and read all output. Supports the standard spawn interface as described in the POSIX::Spawn module documentation:

new([env], command, [argv1, ...], [options])

The following options are supported in addition to the standard POSIX::Spawn options:

:input   => str      Write str to the new process's standard input.
:timeout => int      Maximum number of seconds to allow the process
                     to execute before aborting with a TimeoutExceeded
                     exception.
:max     => total    Maximum number of bytes of output to allow the
                     process to generate before aborting with a
                     MaximumOutputExceeded exception.
:prepend_stdout => str Data to prepend to stdout
:prepend_stderr => str Data to prepend to stderr

Returns a new Child instance that is being executed. The object includes the Deferrable module, and executes the success callback when the process has exited, or the failure callback when the process was killed because of exceeding the timeout, or exceeding the maximum number of bytes to read from stdout and stderr combined. Once the success callback is triggered, this objects’s out, err and status attributes are available. Clients can register callbacks to listen to updates from out and err streams of the process.



44
45
46
47
48
49
50
51
52
53
54
55
56
# File 'lib/em/posix/spawn/child.rb', line 44

def initialize(*args)
  @env, @argv, options = extract_process_spawn_arguments(*args)
  @options = options.dup
  @input = @options.delete(:input)
  @timeout = @options.delete(:timeout)
  @max = @options.delete(:max)
  @discard_output = @options.delete(:discard_output)
  @prepend_stdout = @options.delete(:prepend_stdout) || ""
  @prepend_stderr = @options.delete(:prepend_stderr) || ""
  @options.delete(:chdir) if @options[:chdir].nil?

  exec!
end

Instance Attribute Details

#errObject (readonly)

All data written to the child process’s stderr stream as a String.



62
63
64
# File 'lib/em/posix/spawn/child.rb', line 62

def err
  @err
end

#outObject (readonly)

All data written to the child process’s stdout stream as a String.



59
60
61
# File 'lib/em/posix/spawn/child.rb', line 59

def out
  @out
end

#pidObject (readonly)

Returns the value of attribute pid.



70
71
72
# File 'lib/em/posix/spawn/child.rb', line 70

def pid
  @pid
end

#runtimeObject (readonly)

Total command execution time (wall-clock time)



68
69
70
# File 'lib/em/posix/spawn/child.rb', line 68

def runtime
  @runtime
end

#statusObject (readonly)

A Process::Status object with information on how the child exited.



65
66
67
# File 'lib/em/posix/spawn/child.rb', line 65

def status
  @status
end

Instance Method Details

#add_streams_listener(&listener) ⇒ Object



104
105
106
# File 'lib/em/posix/spawn/child.rb', line 104

def add_streams_listener(&listener)
  [@cout.after_read(&listener), @cerr.after_read(&listener)]
end

#exec!Object

Execute command, write input, and read output. This is called immediately when a new instance of this object is initialized.



198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
# File 'lib/em/posix/spawn/child.rb', line 198

def exec!
  # The signal handler MUST be installed before spawning a new process
  SignalHandler.setup!

  if RUBY_PLATFORM =~ /linux/i && @options.delete(:close_others)
    @options[:in] = :in
    @options[:out] = :out
    @options[:err] = :err

    ::Dir.glob("/proc/%d/fd/*" % Process.pid).map do |file|
      fd = File.basename(file).to_i

      if fd > 2
        @options[fd] = :close
      end
    end
  end

  @pid, stdin, stdout, stderr = popen4(@env, *(@argv + [@options]))
  @start = Time.now

  # Don't leak into processes spawned after us.
  [stdin, stdout, stderr].each { |io| io.close_on_exec = true }

  # watch fds
  @cin = EM.watch stdin, WritableStream, (@input || "").dup, "stdin"
  @cout = EM.watch stdout, ReadableStream, @prepend_stdout, "stdout", @discard_output
  @cerr = EM.watch stderr, ReadableStream, @prepend_stderr, "stderr", @discard_output

  # register events
  @cin.notify_writable = true
  @cout.notify_readable = true
  @cerr.notify_readable = true

  # keep track of open fds
  in_flight = [@cin, @cout, @cerr].compact
  in_flight.each { |io|
    # force binary encoding
    io.force_encoding

    # register finalize hook
    io.callback { in_flight.delete(io) }
  }

  failure = nil

  # keep track of max output
  max = @max
  if max && max > 0
    check_buffer_size = lambda { |listener, _|
      if !terminated? && !listener.closed?
        if @cout.buffer.size + @cerr.buffer.size > max
          failure = MaximumOutputExceeded
          in_flight.each(&:close)
          in_flight.clear
          request_termination
        end
      end
    }

    @cout.after_read(&check_buffer_size)
    @cerr.after_read(&check_buffer_size)
  end

  # request termination of process when it doesn't terminate
  # in time
  timeout = @timeout
  if timeout && timeout > 0
    @sigterm_timer = Timer.new(timeout) {
      failure = TimeoutExceeded
      in_flight.each(&:close)
      in_flight.clear
      request_termination
    }
  end

  # run block when pid is reaped
  SignalHandler.instance.pid_callback(@pid) {
    @sigterm_timer.cancel if @sigterm_timer
    @sigkill_timer.cancel if @sigkill_timer
    @runtime = Time.now - @start
    @status = SignalHandler.instance.pid_to_process_status(@pid)

    in_flight.each do |io|
      # Trigger final read to make sure buffer is drained
      if io.respond_to?(:notify_readable)
        io.notify_readable
      end

      io.close
    end

    in_flight.clear

    @out = @cout.buffer
    @err = @cerr.buffer

    if failure
      set_deferred_failure failure
    else
      set_deferred_success
    end
  }
end

#kill(timeout = 0) ⇒ Object

Send the SIGTERM signal to the process. Then send the SIGKILL signal to the process after the specified timeout.



85
86
87
88
89
90
91
92
93
94
# File 'lib/em/posix/spawn/child.rb', line 85

def kill(timeout = 0)
  return false if terminated? || @sigkill_timer
  timeout ||= 0
  request_termination
  @sigkill_timer = Timer.new(timeout) {
    ::Process.kill('KILL', @pid) rescue nil
  }

  true
end

#request_terminationObject

Send the SIGTERM signal to the process.

Returns the Process::Status object obtained by reaping the process.



99
100
101
102
# File 'lib/em/posix/spawn/child.rb', line 99

def request_termination
  @sigterm_timer.cancel if @sigterm_timer
  ::Process.kill('TERM', @pid) rescue nil
end

#success?Boolean

Determine if the process did exit with a zero exit status.

Returns:

  • (Boolean)


73
74
75
# File 'lib/em/posix/spawn/child.rb', line 73

def success?
  @status && @status.success?
end

#terminated?Boolean

Determine if the process has already terminated.

Returns:

  • (Boolean)


78
79
80
# File 'lib/em/posix/spawn/child.rb', line 78

def terminated?
  !! @status
end