Class: Process::Group

Inherits:
Object
  • Object
show all
Defined in:
lib/process/group.rb,
lib/process/group/version.rb

Overview

A group of tasks which can be run asynchrnously using fibers. Someone must call Group#wait to ensure that all fibers eventually resume.

Defined Under Namespace

Classes: Command, Fork, Spawn

Constant Summary collapse

VERSION =
"1.2.3"

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(limit: nil, terminal: Terminal::Device.new?) ⇒ Group

Create a new process group. Can specify ‘limit:` which limits the maximum number of concurrent processes.

Raises:

  • (ArgumentError)


106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
# File 'lib/process/group.rb', line 106

def initialize(limit: nil, terminal: Terminal::Device.new?)
	raise ArgumentError.new("Limit must be nil (unlimited) or > 0") unless limit == nil or limit > 0
	
	@pid = Process.pid
	
	@terminal = terminal
	
	@queue = []
	@limit = limit
	
	@running = {}
	@fiber = nil
	
	@pgid = nil
	
	# Whether we can actively schedule tasks or not:
	@waiting = false
end

Instance Attribute Details

#limitObject

The maximum number of processes to run concurrently, or zero



129
130
131
# File 'lib/process/group.rb', line 129

def limit
  @limit
end

#runningObject (readonly)

A table of currently running processes.



126
127
128
# File 'lib/process/group.rb', line 126

def running
  @running
end

Class Method Details

.wait(**options, &block) ⇒ Object



27
28
29
30
31
# File 'lib/process/group.rb', line 27

def self.wait(**options, &block)
	group = Group.new(**options)
	
	group.wait(&block)
end

Instance Method Details

#asyncObject



156
157
158
159
160
# File 'lib/process/group.rb', line 156

def async
	Fiber.new do
		yield self
	end.resume
end

#available?Boolean

Whether or not #spawn, #fork or #run can be scheduled immediately.

Returns:

  • (Boolean)


173
174
175
176
177
178
179
# File 'lib/process/group.rb', line 173

def available?
	if @limit
		@running.size < @limit
	else
		true
	end
end

#blocking?Boolean

Whether or not calling #spawn, #fork or #run would block the caller fiber (i.e. call Fiber.yield).

Returns:

  • (Boolean)


182
183
184
# File 'lib/process/group.rb', line 182

def blocking?
	not available?
end

#fork(**options, &block) ⇒ Object

Fork a block as a child process.



168
169
170
# File 'lib/process/group.rb', line 168

def fork(**options, &block)
	append! Fork.new(block, **options)
end

#idObject

The id of the process group, only valid if processes are currently running.

Raises:

  • (RuntimeError)


132
133
134
135
136
# File 'lib/process/group.rb', line 132

def id
	raise RuntimeError.new("No processes in group, no group id available.") if @running.size == 0
	
	-@pgid
end

#kill(signal = :INT) ⇒ Object

Send a signal to all currently running processes. No-op unless #running?



227
228
229
230
231
# File 'lib/process/group.rb', line 227

def kill(signal = :INT)
	if running?
		Process.kill(signal, id)
	end
end

#queued?Boolean

Returns:

  • (Boolean)


138
139
140
# File 'lib/process/group.rb', line 138

def queued?
	@queue.size > 0
end

#run(*arguments, **options) ⇒ Object

Run a process in a new fiber, arguments have same meaning as Process#spawn.



148
149
150
151
152
153
154
# File 'lib/process/group.rb', line 148

def run(*arguments, **options)
	Fiber.new do
		exit_status = self.spawn(*arguments, **options)
		
		yield exit_status if block_given?
	end.resume
end

#running?Boolean

Are there processes currently running?

Returns:

  • (Boolean)


143
144
145
# File 'lib/process/group.rb', line 143

def running?
	@running.size > 0
end

#spawn(*arguments, **options) ⇒ Object

Run a specific command as a child process.



163
164
165
# File 'lib/process/group.rb', line 163

def spawn(*arguments, **options)
	append! Spawn.new(arguments, **options)
end

#to_sObject



233
234
235
# File 'lib/process/group.rb', line 233

def to_s
	"#<#{self.class} running=#{@running.size} queued=#{@queue.size} limit=#{@limit} pgid=#{@pgid}>"
end

#waitObject

Wait for all running and queued processes to finish. If you provide a block, it will be invoked before waiting, but within canonical signal handling machinery.



187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
# File 'lib/process/group.rb', line 187

def wait
	raise ArgumentError.new("Cannot call Process::Group#wait from child process!") unless @pid == Process.pid
	
	waiting do
		yield(self) if block_given?
		
		while running?
			process, status = wait_one
			
			schedule!
			
			process.resume(status)
		end
	end
	
	# No processes, process group is no longer valid:
	@pgid = nil
	
	return self
rescue Interrupt
	# If the user interrupts the wait, interrupt the process group and wait for them to finish:
	self.kill(:INT)
	
	# If user presses Ctrl-C again (or something else goes wrong), we will come out and kill(:TERM) in the ensure below:
	wait_all
	
	raise
ensure
	# You'd only get here with running processes if some unexpected error was thrown in user code:
	begin
		self.kill(:TERM)
	rescue Errno::EPERM
		# Sometimes, `kill` code can give EPERM, if any signal couldn't be delivered to a child. This might occur if an exception is thrown in the user code (e.g. within the fiber), and there are other zombie processes which haven't been reaped yet. These should be dealt with below, so it shouldn't be an issue to ignore this condition.
	end
	
	# Clean up zombie processes - if user presses Ctrl-C or for some reason something else blows up, exception would propagate back to caller:
	wait_all
end