Class: RExec::Task

Inherits:
Object
  • Object
show all
Defined in:
lib/rexec/task.rb

Overview

Represents a running process, either a child process or a background/daemon process. Provides an easy high level interface for managing process life-cycle.

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(input, output, error, pid) ⇒ Task

Returns a new instance of Task.



341
342
343
344
345
346
347
348
349
350
351
352
# File 'lib/rexec/task.rb', line 341

def initialize(input, output, error, pid)
	@input = input
	@output = output
	@error = error

	@pid = pid

	@result = nil
	@status = :running
	@result_lock = Mutex.new
	@result_available = ConditionVariable.new
end

Instance Attribute Details

#errorObject (readonly)

Standard error from the running task.



361
362
363
# File 'lib/rexec/task.rb', line 361

def error
  @error
end

#inputObject (readonly)

Standard input to the running task.



355
356
357
# File 'lib/rexec/task.rb', line 355

def input
  @input
end

#outputObject (readonly)

Standard output from the running task.



358
359
360
# File 'lib/rexec/task.rb', line 358

def output
  @output
end

#pidObject (readonly)

The PID of the running task.



364
365
366
# File 'lib/rexec/task.rb', line 364

def pid
  @pid
end

#resultObject (readonly)

The status of the task after calling task.wait.



367
368
369
# File 'lib/rexec/task.rb', line 367

def result
  @result
end

Class Method Details

.open(command, options = {}, &block) ⇒ Object

Open a process. Similar to IO.popen, but provides a much more generic interface to stdin, stdout, stderr and the pid. We also attempt to tidy up as much as possible given some kind of error or exception. You may write to output, and read from input and error.

Typical usage looks similar to IO.popen: count = 0 result = Task.open([“ls”, “-la”], :passthrough => :err) do |task| count = task.output.read.split(/n/).size end puts “Count: #count” if result.exitstatus == 0

The basic command can simply be a string, and this will be passed to Kernel#exec which will perform shell expansion on the arguments.

If the command passed is an array, this will be executed without shell expansion.

If a Proc (or anything that respond_to? :call) is provided, this will be executed in the child process. Here is an example of a long running background process:

daemon = Proc.new do # Long running process sleep(1000) end task = Task.open(daemon, :daemonize => true, :in => …, :out => …, :err => …) exit(0)

Options

:passthrough, :in, :out, :err

The current process (e.g. ruby) has a set of existing pipes $stdin, $stdout and $stderr. These pipes can also be used by the child process. The passthrough option allows you to specify which pipes are retained from the parent process by the child.

Typically it is useful to passthrough $stderr, so that errors in the child process are printed out in the terminal of the parent process: Task.open(, :passthrough => :err) Task.open(, :passthrough => [:in, :out, :err]) Task.open(, :passthrough => :all)

It is also possible to redirect to files, which can be useful if you want to keep a a log file: Task.open(, :out => File.open(“output.log”))

The default behaviour is to create a new pipe, but any pipe (e.g. a network socket) could be used: Task.open(, :in => IO.pipe)

:daemonize

The process that is opened may be detached from the parent process. This allows the child process to exist even if the parent process exits. In this case, you will also probably want to specify the :passthrough option for log files: Task.open(, :daemonize => true, :in => File.open(“/dev/null”), :out => File.open(“/var/log/child.log”, “a”), :err => File.open(“/var/log/child.err”, “a”) )

:env, :env!

Provide a environment which will be used by the child process. Use :env to update the exsting environment and :env! to replace it completely. Task.open(, :env => => ‘bar’)

:umask

Set the umask for the new process, as per File.umask.

:chdir

Set the current working directory for the new process, as per Dir.chdir.

:preflight

Similar to a proc based command, but executed before execing the given process. preflight = Proc.new do |command, options| # Setup some default state before exec the new process. end Task.open(, :preflight => preflight)

The options hash is passed directly so you can supply custom arguments to the preflight function.



278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
# File 'lib/rexec/task.rb', line 278

def self.open(command, options = {}, &block)
	cin, cout, cerr = pipes_for_options(options)
	spawn = options[:daemonize] ? :spawn_daemon : :spawn_child

	cid = self.send(spawn) do
		close_pipes(cin[WR], cout[RD], cerr[RD])

		STDIN.reopen(cin[RD]) if cin[RD]
		STDOUT.reopen(cout[WR]) if cout[WR]
		STDERR.reopen(cerr[WR]) if cerr[WR]

		prepare_child_environment(command, options)

		if command.respond_to? :call
			command.call
		elsif Array === command
			# If command is a Pathname, we need to convert it to an absolute path if possible,
			# otherwise if it is relative it might cause problems.
			if command[0].respond_to? :realpath
				command[0] = command[0].realpath
			end
			
			# exec expects an array of Strings:
			command.collect! { |item| item.to_s }
			
			# Ensure that we DON'T use the shell for execution:
			command[0] = [command[0], command[0]]
			
			exec *command
		else
			if command.respond_to? :realpath
				command = command.realpath
			end
			
			exec command.to_s
		end
	end

	close_pipes(cin[RD], cout[WR], cerr[WR])

	task = Task.new(cin[WR], cout[RD], cerr[RD], cid)

	if block_given?
		begin
			yield task
			
			# Close all input pipes if not done already.
			task.close_input
			
			# The task has stopped if task.wait returns correctly.
			return task.wait
		rescue Interrupt
			# If task.wait is interrupted, we should also interrupt the child process
			task.kill
		ensure
			# Print out any remaining data from @output or @error
			task.close
		end
	else
		return task
	end
end

.running?(pid) ⇒ Boolean

Returns true if the given pid is a current process

Returns:

  • (Boolean)


141
142
143
144
145
# File 'lib/rexec/task.rb', line 141

def self.running?(pid)
	gpid = Process.getpgid(pid) rescue nil

	return gpid != nil ? true : false
end

.spawn_child(&block) ⇒ Object

Very simple method to spawn a child process

spawn_child do puts “Hello from child!” end



187
188
189
190
191
192
193
194
195
# File 'lib/rexec/task.rb', line 187

def self.spawn_child(&block)
	pid = fork do
		yield

		exit!(0)
	end

	return pid
end

.spawn_daemon(&block) ⇒ Object

Very simple method to spawn a child daemon. A daemon is detatched from the controlling tty, and thus is not killed when the parent process finishes.

spawn_daemon do Dir.chdir(“/”) File.umask 0000 puts “Hello from daemon!” sleep(600) puts “This code will not quit when parent process finishes…” puts “…but $stdout might be closed unless you set it to a file.” end



158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
# File 'lib/rexec/task.rb', line 158

def self.spawn_daemon(&block)
	pid_pipe = IO.pipe

	fork do
		Process.setsid
		exit if fork

		# Send the pid back to the parent
		pid_pipe[RD].close
		pid_pipe[WR].write(Process.pid.to_s)
		pid_pipe[WR].close

		yield

		exit(0)
	end

	pid_pipe[WR].close
	pid = pid_pipe[RD].read
	pid_pipe[RD].close

	return pid.to_i
end

Instance Method Details

#closeObject

Close all connections to the child process



383
384
385
386
387
388
389
# File 'lib/rexec/task.rb', line 383

def close
	begin
		self.class.dump_pipes(@output, @error)
	ensure
		self.class.close_pipes(@input, @output, @error)
	end
end

#close_inputObject

Close input pipe to child process (if applicable)



392
393
394
# File 'lib/rexec/task.rb', line 392

def close_input
	@input.close if @input and !@input.closed?
end

#kill(signal = "INT") ⇒ Object

Send a signal to the child process



397
398
399
400
401
402
403
# File 'lib/rexec/task.rb', line 397

def kill(signal = "INT")
	if running?
		Process.kill(signal, @pid)
	else
		raise Errno::ECHILD
	end
end

#running?Boolean

Returns true if the current task is still running

Returns:

  • (Boolean)


370
371
372
373
374
375
376
377
378
379
380
# File 'lib/rexec/task.rb', line 370

def running?
	if self.class.running?(@pid)
		# The pid still seems alive, check that it isn't some other process using the same pid...
		@result_lock.synchronize do
			# If we haven't waited for it yet, it must be either a running process or a zombie...
			return @status != :stopped
		end
	end
	
	return false
end

#stopObject

Forcefully stop the child process.



446
447
448
449
450
451
# File 'lib/rexec/task.rb', line 446

def stop
	if running?
		close_input
		kill
	end
end

#waitObject

Wait for the child process to finish, return the exit status. This function can be called from multiple threads.



407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
# File 'lib/rexec/task.rb', line 407

def wait
	begin_wait = false
	
	# Check to see if some other caller is already waiting on the result...
	@result_lock.synchronize do
		case @status
		when :waiting
			# If so, wait for the wait to finish...
			@result_available.wait(@result_lock)
		when :running
			# Else, mark that we should begin waiting...
			begin_wait = true
			@status = :waiting
		when :stopped
			return @result
		end
	end
	
	# If we should begin waiting (the first thread to wait)...
	if begin_wait
		begin
			# Wait for the result...
			_pid, @result = Process.wait2(@pid)
		end
		
		# The result is now available...
		@result_lock.synchronize do
			@status = :stopped
		end
		
		# Notify other threads...
		@result_available.broadcast()
	end
	
	# Return the result
	return @result
end