Class: Process::Group

Inherits:
Object
  • Object
show all
Defined in:
lib/process/group.rb,
lib/process/group/version.rb

Overview

A group of tasks which can be run asynchrnously using fibers. Someone must call Group#wait to ensure that all fibers eventually resume.

Defined Under Namespace

Classes: Command, Fork, Spawn

Constant Summary collapse

VERSION =
"1.2.1"

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(limit: nil, terminal: Terminal::Device.new?) ⇒ Group

Create a new process group. Can specify ‘limit:` which limits the maximum number of concurrent processes.

Raises:

  • (ArgumentError)


102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
# File 'lib/process/group.rb', line 102

def initialize(limit: nil, terminal: Terminal::Device.new?)
	raise ArgumentError.new("Limit must be nil (unlimited) or > 0") unless limit == nil or limit > 0
	
	@pid = Process.pid
	
	@terminal = terminal
	
	@queue = []
	@limit = limit
	
	@running = {}
	@fiber = nil
	
	@pgid = nil
	
	# Whether we can actively schedule tasks or not:
	@waiting = false
end

Instance Attribute Details

#limitObject

The maximum number of processes to run concurrently, or zero



125
126
127
# File 'lib/process/group.rb', line 125

def limit
  @limit
end

#runningObject (readonly)

A table of currently running processes.



122
123
124
# File 'lib/process/group.rb', line 122

def running
  @running
end

Class Method Details

.wait(**options, &block) ⇒ Object



27
28
29
30
31
# File 'lib/process/group.rb', line 27

def self.wait(**options, &block)
	group = Group.new(options)
	
	group.wait(&block)
end

Instance Method Details

#asyncObject



152
153
154
155
156
# File 'lib/process/group.rb', line 152

def async
	Fiber.new do
		yield self
	end.resume
end

#available?Boolean

Whether or not #spawn, #fork or #run can be scheduled immediately.

Returns:

  • (Boolean)


169
170
171
172
173
174
175
# File 'lib/process/group.rb', line 169

def available?
	if @limit
		@running.size < @limit
	else
		true
	end
end

#blocking?Boolean

Whether or not calling #spawn, #fork or #run would block the caller fiber (i.e. call Fiber.yield).

Returns:

  • (Boolean)


178
179
180
# File 'lib/process/group.rb', line 178

def blocking?
	not available?
end

#fork(**options, &block) ⇒ Object

Fork a block as a child process.



164
165
166
# File 'lib/process/group.rb', line 164

def fork(**options, &block)
	append! Fork.new(block, **options)
end

#idObject

The id of the process group, only valid if processes are currently running.

Raises:

  • (RuntimeError)


128
129
130
131
132
# File 'lib/process/group.rb', line 128

def id
	raise RuntimeError.new("No processes in group, no group id available.") if @running.size == 0
	
	-@pgid
end

#kill(signal = :INT) ⇒ Object

Send a signal to all currently running processes. No-op unless #running?



223
224
225
226
227
# File 'lib/process/group.rb', line 223

def kill(signal = :INT)
	if running?
		Process.kill(signal, id)
	end
end

#queued?Boolean

Returns:

  • (Boolean)


134
135
136
# File 'lib/process/group.rb', line 134

def queued?
	@queue.size > 0
end

#run(*arguments, **options) ⇒ Object

Run a process in a new fiber, arguments have same meaning as Process#spawn.



144
145
146
147
148
149
150
# File 'lib/process/group.rb', line 144

def run(*arguments, **options)
	Fiber.new do
		exit_status = self.spawn(*arguments, **options)
		
		yield exit_status if block_given?
	end.resume
end

#running?Boolean

Are there processes currently running?

Returns:

  • (Boolean)


139
140
141
# File 'lib/process/group.rb', line 139

def running?
	@running.size > 0
end

#spawn(*arguments, **options) ⇒ Object

Run a specific command as a child process.



159
160
161
# File 'lib/process/group.rb', line 159

def spawn(*arguments, **options)
	append! Spawn.new(arguments, **options)
end

#to_sObject



229
230
231
# File 'lib/process/group.rb', line 229

def to_s
	"#<#{self.class} running=#{@running.size} queued=#{@queue.size} limit=#{@limit} pgid=#{@pgid}>"
end

#waitObject

Wait for all running and queued processes to finish. If you provide a block, it will be invoked before waiting, but within canonical signal handling machinery.



183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
# File 'lib/process/group.rb', line 183

def wait
	raise ArgumentError.new("Cannot call Process::Group#wait from child process!") unless @pid == Process.pid
	
	waiting do
		yield(self) if block_given?
		
		while running?
			process, status = wait_one
			
			schedule!
			
			process.resume(status)
		end
	end
	
	# No processes, process group is no longer valid:
	@pgid = nil
	
	return self
rescue Interrupt
	# If the user interrupts the wait, interrupt the process group and wait for them to finish:
	self.kill(:INT)
	
	# If user presses Ctrl-C again (or something else goes wrong), we will come out and kill(:TERM) in the ensure below:
	wait_all
	
	raise
ensure
	# You'd only get here with running processes if some unexpected error was thrown in user code:
	begin
		self.kill(:TERM)
	rescue Errno::EPERM
		# Sometimes, `kill` code can give EPERM, if any signal couldn't be delivered to a child. This might occur if an exception is thrown in the user code (e.g. within the fiber), and there are other zombie processes which haven't been reaped yet. These should be dealt with below, so it shouldn't be an issue to ignore this condition.
	end
	
	# Clean up zombie processes - if user presses Ctrl-C or for some reason something else blows up, exception would propagate back to caller:
	wait_all
end