Class: RExec::Task

Inherits:
Object
  • Object
show all
Defined in:
lib/rexec/task.rb

Overview

Represents a running process, either a child process or a background/daemon process. Provides an easy high level interface for managing process life-cycle.

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(input, output, error, pid) ⇒ Task

Returns a new instance of Task.



336
337
338
339
340
341
342
343
344
345
346
347
# File 'lib/rexec/task.rb', line 336

def initialize(input, output, error, pid)
	@input = input
	@output = output
	@error = error

	@pid = pid

	@result = nil
	@status = :running
	@result_lock = Mutex.new
	@result_available = ConditionVariable.new
end

Instance Attribute Details

#errorObject (readonly)

Standard error from the running task.



356
357
358
# File 'lib/rexec/task.rb', line 356

def error
  @error
end

#inputObject (readonly)

Standard input to the running task.



350
351
352
# File 'lib/rexec/task.rb', line 350

def input
  @input
end

#outputObject (readonly)

Standard output from the running task.



353
354
355
# File 'lib/rexec/task.rb', line 353

def output
  @output
end

#pidObject (readonly)

The PID of the running task.



359
360
361
# File 'lib/rexec/task.rb', line 359

def pid
  @pid
end

#resultObject (readonly)

The status of the task after calling task.wait.



362
363
364
# File 'lib/rexec/task.rb', line 362

def result
  @result
end

Class Method Details

.close_pipes(*pipes) ⇒ Object

Close all the supplied pipes.



90
91
92
93
94
95
96
# File 'lib/rexec/task.rb', line 90

def self.close_pipes(*pipes)
	pipes = pipes.compact.reject{|pipe| STDPIPES.include?(pipe)}

	pipes.each do |pipe|
		pipe.close unless pipe.closed?
	end
end

.dump_pipes(*pipes) ⇒ Object

Dump any remaining data from the pipes, until they are closed.



99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
# File 'lib/rexec/task.rb', line 99

def self.dump_pipes(*pipes)
	pipes = pipes.compact.reject{|pipe| STDPIPES.include?(pipe)}

	pipes.delete_if { |pipe| pipe.closed? }
	# Dump any output that was not consumed (errors, etc)
	while pipes.size > 0
		result = IO.select(pipes)

		result[0].each do |pipe|
			if pipe.closed? || pipe.eof?
				pipes.delete(pipe)
				next
			end

			$stderr.puts pipe.readline.chomp
		end
	end
end

.open(command, options = {}, &block) ⇒ Object

Open a process. Similar to IO.popen, but provides a much more generic interface to stdin, stdout, stderr and the pid. We also attempt to tidy up as much as possible given some kind of error or exception. You may write to output, and read from input and error.

Typical usage looks similar to IO.popen: count = 0 result = Task.open([“ls”, “-la”], :passthrough => :err) do |task| count = task.output.read.split(/n/).size end puts “Count: #count” if result.exitstatus == 0

The basic command can simply be a string, and this will be passed to Kernel#exec which will perform shell expansion on the arguments.

If the command passed is an array, this will be executed without shell expansion.

If a Proc (or anything that respond_to? :call) is provided, this will be executed in the child process. Here is an example of a long running background process:

daemon = Proc.new do # Long running process sleep(1000) end task = Task.open(daemon, :daemonize => true, :in => …, :out => …, :err => …) exit(0)

Options

:passthrough, :in, :out, :err

The current process (e.g. ruby) has a set of existing pipes $stdin, $stdout and $stderr. These pipes can also be used by the child process. The passthrough option allows you to specify which pipes are retained from the parent process by the child.

Typically it is useful to passthrough $stderr, so that errors in the child process are printed out in the terminal of the parent process: Task.open(, :passthrough => :err) Task.open(, :passthrough => [:in, :out, :err]) Task.open(, :passthrough => :all)

It is also possible to redirect to files, which can be useful if you want to keep a a log file: Task.open(, :out => File.open(“output.log”))

The default behaviour is to create a new pipe, but any pipe (e.g. a network socket) could be used: Task.open(, :in => IO.pipe)

:daemonize

The process that is opened may be detached from the parent process. This allows the child process to exist even if the parent process exits. In this case, you will also probably want to specify the :passthrough option for log files: Task.open(, :daemonize => true, :in => File.open(“/dev/null”), :out => File.open(“/var/log/child.log”, “a”), :err => File.open(“/var/log/child.err”, “a”) )

:env, :env!

Provide a environment which will be used by the child process. Use :env to update the exsting environment and :env! to replace it completely. Task.open(, :env => => ‘bar’)

:umask

Set the umask for the new process, as per File.umask.

:chdir

Set the current working directory for the new process, as per Dir.chdir.

:preflight

Similar to a proc based command, but executed before execing the given process. preflight = Proc.new do |command, options| # Setup some default state before exec the new process. end Task.open(, :preflight => preflight)

The options hash is passed directly so you can supply custom arguments to the preflight function.



257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
# File 'lib/rexec/task.rb', line 257

def self.open(command, options = {}, &block)
	cin, cout, cerr = pipes_for_options(options)
	spawn = options[:daemonize] ? :spawn_daemon : :spawn_child

	cid = self.send(spawn) do
		close_pipes(cin[WR], cout[RD], cerr[RD])

		STDIN.reopen(cin[RD]) if cin[RD]
		STDOUT.reopen(cout[WR]) if cout[WR]
		STDERR.reopen(cerr[WR]) if cerr[WR]

		if options[:env!]
			ENV.clear
			ENV.update(options[:env!])
		elsif options[:env]
			ENV.update(options[:env])
		end

		if options[:umask]
			File.umask(options[:umask])
		end

		if options[:chdir]
			Dir.chdir(options[:chdir])
		end

		if options[:preflight]
			preflight.call(command, options)
		end

		if command.respond_to? :call
			command.call
		elsif Array === command
			command = command.dup
			
			# If command is a Pathname, we need to convert it to an absolute path if possible,
			# otherwise if it is relative it might cause problems.
			if command[0].respond_to? :realpath
				command[0] = command[0].realpath
			end
			
			# exec expects an array of Strings:
			command.collect! { |item| item.to_s }
			
			exec *command
		else
			if command.respond_to? :realpath
				command = command.realpath
			end
			
			exec command.to_s
		end
	end

	close_pipes(cin[RD], cout[WR], cerr[WR])

	task = Task.new(cin[WR], cout[RD], cerr[RD], cid)

	if block_given?
		begin
			yield task
			
			# Close all input pipes if not done already.
			task.close_input
			
			# The task has stopped if task.wait returns correctly.
			return task.wait
		rescue Interrupt
			# If task.wait is interrupted, we should also interrupt the child process
			task.kill
		ensure
			# Print out any remaining data from @output or @error
			task.close
		end
	else
		return task
	end
end

.pipes_for_options(options) ⇒ Object



45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
# File 'lib/rexec/task.rb', line 45

def self.pipes_for_options(options)
	pipes = [[nil, nil], [nil, nil], [nil, nil]]

	if options[:passthrough]
		passthrough = options[:passthrough]

		if passthrough == :all
			passthrough = [:in, :out, :err]
		elsif passthrough.kind_of?(Symbol)
			passthrough = [passthrough]
		end

		passthrough.each do |name|
			case(name)
			when :in
				options[:in] = $stdin
			when :out
				options[:out] = $stdout
			when :err
				options[:err] = $stderr
			end
		end
	end

	modes = [RD, WR, WR]
	{:in => 0, :out => 1, :err => 2}.each do |name, idx|
		m = modes[idx]
		p = options[name]

		if p.kind_of?(IO)
			pipes[idx][m] = p
		elsif p.kind_of?(Array) and p.size == 2
			pipes[idx] = p
		else
			pipes[idx] = IO.pipe
		end
	end

	return pipes
end

.running?(pid) ⇒ Boolean

Returns true if the given pid is a current process

Returns:

  • (Boolean)


120
121
122
123
124
# File 'lib/rexec/task.rb', line 120

def self.running?(pid)
	gpid = Process.getpgid(pid) rescue nil

	return gpid != nil ? true : false
end

.spawn_child(&block) ⇒ Object

Very simple method to spawn a child process

spawn_child do puts “Hello from child!” end



166
167
168
169
170
171
172
173
174
# File 'lib/rexec/task.rb', line 166

def self.spawn_child(&block)
	pid = fork do
		yield

		exit!(0)
	end

	return pid
end

.spawn_daemon(&block) ⇒ Object

Very simple method to spawn a child daemon. A daemon is detatched from the controlling tty, and thus is not killed when the parent process finishes.

spawn_daemon do Dir.chdir(“/”) File.umask 0000 puts “Hello from daemon!” sleep(600) puts “This code will not quit when parent process finishes…” puts “…but $stdout might be closed unless you set it to a file.” end



137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
# File 'lib/rexec/task.rb', line 137

def self.spawn_daemon(&block)
	pid_pipe = IO.pipe

	fork do
		Process.setsid
		exit if fork

		# Send the pid back to the parent
		pid_pipe[RD].close
		pid_pipe[WR].write(Process.pid.to_s)
		pid_pipe[WR].close

		yield

		exit(0)
	end

	pid_pipe[WR].close
	pid = pid_pipe[RD].read
	pid_pipe[RD].close

	return pid.to_i
end

Instance Method Details

#closeObject

Close all connections to the child process



378
379
380
381
382
383
384
# File 'lib/rexec/task.rb', line 378

def close
	begin
		self.class.dump_pipes(@output, @error)
	ensure
		self.class.close_pipes(@input, @output, @error)
	end
end

#close_inputObject

Close input pipe to child process (if applicable)



387
388
389
# File 'lib/rexec/task.rb', line 387

def close_input
	@input.close if @input and !@input.closed?
end

#kill(signal = "INT") ⇒ Object

Send a signal to the child process



392
393
394
395
396
397
398
# File 'lib/rexec/task.rb', line 392

def kill(signal = "INT")
	if running?
		Process.kill(signal, @pid)
	else
		raise Errno::ECHILD
	end
end

#running?Boolean

Returns true if the current task is still running

Returns:

  • (Boolean)


365
366
367
368
369
370
371
372
373
374
375
# File 'lib/rexec/task.rb', line 365

def running?
	if self.class.running?(@pid)
		# The pid still seems alive, check that it isn't some other process using the same pid...
		@result_lock.synchronize do
			# If we haven't waited for it yet, it must be either a running process or a zombie...
			return @status != :stopped
		end
	end
	
	return false
end

#stopObject

Forcefully stop the child process.



441
442
443
444
445
446
# File 'lib/rexec/task.rb', line 441

def stop
	if running?
		close_input
		kill
	end
end

#waitObject

Wait for the child process to finish, return the exit status. This function can be called from multiple threads.



402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
# File 'lib/rexec/task.rb', line 402

def wait
	begin_wait = false
	
	# Check to see if some other caller is already waiting on the result...
	@result_lock.synchronize do
		case @status
		when :waiting
			# If so, wait for the wait to finish...
			@result_available.wait(@result_lock)
		when :running
			# Else, mark that we should begin waiting...
			begin_wait = true
			@status = :waiting
		when :stopped
			return @result
		end
	end
	
	# If we should begin waiting (the first thread to wait)...
	if begin_wait
		begin
			# Wait for the result...
			_pid, @result = Process.wait2(@pid)
		end
		
		# The result is now available...
		@result_lock.synchronize do
			@status = :stopped
		end
		
		# Notify other threads...
		@result_available.broadcast()
	end
	
	# Return the result
	return @result
end