Class: Process::Group

Inherits:
Object
  • Object
show all
Defined in:
lib/process/group.rb,
lib/process/group/version.rb

Overview

A group of tasks which can be run asynchrnously using fibers. Someone must call Group#wait to ensure that all fibers eventually resume.

Defined Under Namespace

Classes: Command, Fork

Constant Summary collapse

VERSION =
"0.1.0"

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options = {}) ⇒ Group

Create a new process group. Can specify ‘options` which limits the maximum number of concurrent processes.



76
77
78
79
80
81
82
83
84
# File 'lib/process/group.rb', line 76

def initialize(options = {})
	@queue = []
	@limit = options[:limit]

	@running = {}
	@fiber = nil

	@pgid = nil
end

Instance Attribute Details

#limitObject

The maximum number of processes to run concurrently, or zero



90
91
92
# File 'lib/process/group.rb', line 90

def limit
  @limit
end

#runningObject (readonly)

A table of currently running processes.



87
88
89
# File 'lib/process/group.rb', line 87

def running
  @running
end

Instance Method Details

#available?Boolean

Whether not not calling run would be scheduled immediately.

Returns:

  • (Boolean)


120
121
122
123
124
125
126
# File 'lib/process/group.rb', line 120

def available?
	if @limit
		@running.size < @limit
	else
		true
	end
end

#blocking?Boolean

Whether or not calling run would block the caller.

Returns:

  • (Boolean)


129
130
131
# File 'lib/process/group.rb', line 129

def blocking?
	not available?
end

#fork(options = {}, &block) ⇒ Object



115
116
117
# File 'lib/process/group.rb', line 115

def fork(options = {}, &block)
	append! Fork.new(block, options)
end

#idObject

The id of the process group, only valid if processes are currently running.

Raises:

  • (RuntimeError)


93
94
95
96
97
# File 'lib/process/group.rb', line 93

def id
	raise RuntimeError.new("No processes in group, no group id available.") if @running.size == 0
	
	-@pgid
end

#kill(signal) ⇒ Object

Send a signal to all processes.



153
154
155
156
157
# File 'lib/process/group.rb', line 153

def kill(signal)
	if @running.size > 0
		Process.kill(signal, id)
	end
end

#run(*arguments) ⇒ Object

Run a process, arguments have same meaning as Process#spawn.



100
101
102
103
104
105
106
# File 'lib/process/group.rb', line 100

def run(*arguments)
	Fiber.new do
		exit_status = self.spawn(*arguments)
		
		yield exit_status if block_given?
	end.resume
end

#spawn(*arguments) ⇒ Object



108
109
110
111
112
113
# File 'lib/process/group.rb', line 108

def spawn(*arguments)
	# Could be nice to use ** splat, but excludes ruby < 2.0.
	options = Hash === arguments.last ? arguments.pop : {}
	
	append! Command.new(arguments, options)
end

#waitObject

Wait for all processes to finish, naturally would schedule any fibers which are currently blocked.



134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
# File 'lib/process/group.rb', line 134

def wait
	while @running.size > 0
		# Wait for processes in this group:
		pid, status = Process.wait2(-@pgid)
	
		process = @running.delete(pid)
	
		raise RuntimeError.new("Process id=#{pid} is not part of group!") unless process
	
		schedule!
	
		process.resume(status)
	end
	
	# No processes, process group is no longer valid:
	@pgid = nil
end