Class: Process::Group
- Inherits:
-
Object
- Object
- Process::Group
- Defined in:
- lib/process/group.rb,
lib/process/group/version.rb
Overview
A group of tasks which can be run asynchrnously using fibers. Someone must call Group#wait to ensure that all fibers eventually resume.
Defined Under Namespace
Constant Summary collapse
- VERSION =
"1.2.1"
Instance Attribute Summary collapse
-
#limit ⇒ Object
The maximum number of processes to run concurrently, or zero.
-
#running ⇒ Object
readonly
A table of currently running processes.
Class Method Summary collapse
Instance Method Summary collapse
- #async ⇒ Object
-
#available? ⇒ Boolean
Whether or not #spawn, #fork or #run can be scheduled immediately.
-
#blocking? ⇒ Boolean
Whether or not calling #spawn, #fork or #run would block the caller fiber (i.e. call Fiber.yield).
-
#fork(**options, &block) ⇒ Object
Fork a block as a child process.
-
#id ⇒ Object
The id of the process group, only valid if processes are currently running.
-
#initialize(limit: nil, terminal: Terminal::Device.new?) ⇒ Group
constructor
Create a new process group.
-
#kill(signal = :INT) ⇒ Object
Send a signal to all currently running processes.
- #queued? ⇒ Boolean
-
#run(*arguments, **options) ⇒ Object
Run a process in a new fiber, arguments have same meaning as Process#spawn.
-
#running? ⇒ Boolean
Are there processes currently running?.
-
#spawn(*arguments, **options) ⇒ Object
Run a specific command as a child process.
- #to_s ⇒ Object
-
#wait ⇒ Object
Wait for all running and queued processes to finish.
Constructor Details
#initialize(limit: nil, terminal: Terminal::Device.new?) ⇒ Group
Create a new process group. Can specify ‘limit:` which limits the maximum number of concurrent processes.
102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 |
# File 'lib/process/group.rb', line 102 def initialize(limit: nil, terminal: Terminal::Device.new?) raise ArgumentError.new("Limit must be nil (unlimited) or > 0") unless limit == nil or limit > 0 @pid = Process.pid @terminal = terminal @queue = [] @limit = limit @running = {} @fiber = nil @pgid = nil # Whether we can actively schedule tasks or not: @waiting = false end |
Instance Attribute Details
#limit ⇒ Object
The maximum number of processes to run concurrently, or zero
125 126 127 |
# File 'lib/process/group.rb', line 125 def limit @limit end |
#running ⇒ Object (readonly)
A table of currently running processes.
122 123 124 |
# File 'lib/process/group.rb', line 122 def running @running end |
Class Method Details
Instance Method Details
#async ⇒ Object
152 153 154 155 156 |
# File 'lib/process/group.rb', line 152 def async Fiber.new do yield self end.resume end |
#available? ⇒ Boolean
Whether or not #spawn, #fork or #run can be scheduled immediately.
169 170 171 172 173 174 175 |
# File 'lib/process/group.rb', line 169 def available? if @limit @running.size < @limit else true end end |
#blocking? ⇒ Boolean
Whether or not calling #spawn, #fork or #run would block the caller fiber (i.e. call Fiber.yield).
178 179 180 |
# File 'lib/process/group.rb', line 178 def blocking? not available? end |
#fork(**options, &block) ⇒ Object
Fork a block as a child process.
164 165 166 |
# File 'lib/process/group.rb', line 164 def fork(**, &block) append! Fork.new(block, **) end |
#id ⇒ Object
The id of the process group, only valid if processes are currently running.
128 129 130 131 132 |
# File 'lib/process/group.rb', line 128 def id raise RuntimeError.new("No processes in group, no group id available.") if @running.size == 0 -@pgid end |
#kill(signal = :INT) ⇒ Object
Send a signal to all currently running processes. No-op unless #running?
223 224 225 226 227 |
# File 'lib/process/group.rb', line 223 def kill(signal = :INT) if running? Process.kill(signal, id) end end |
#queued? ⇒ Boolean
134 135 136 |
# File 'lib/process/group.rb', line 134 def queued? @queue.size > 0 end |
#run(*arguments, **options) ⇒ Object
Run a process in a new fiber, arguments have same meaning as Process#spawn.
144 145 146 147 148 149 150 |
# File 'lib/process/group.rb', line 144 def run(*arguments, **) Fiber.new do exit_status = self.spawn(*arguments, **) yield exit_status if block_given? end.resume end |
#running? ⇒ Boolean
Are there processes currently running?
139 140 141 |
# File 'lib/process/group.rb', line 139 def running? @running.size > 0 end |
#spawn(*arguments, **options) ⇒ Object
Run a specific command as a child process.
159 160 161 |
# File 'lib/process/group.rb', line 159 def spawn(*arguments, **) append! Spawn.new(arguments, **) end |
#to_s ⇒ Object
229 230 231 |
# File 'lib/process/group.rb', line 229 def to_s "#<#{self.class} running=#{@running.size} queued=#{@queue.size} limit=#{@limit} pgid=#{@pgid}>" end |
#wait ⇒ Object
Wait for all running and queued processes to finish. If you provide a block, it will be invoked before waiting, but within canonical signal handling machinery.
183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 |
# File 'lib/process/group.rb', line 183 def wait raise ArgumentError.new("Cannot call Process::Group#wait from child process!") unless @pid == Process.pid waiting do yield(self) if block_given? while running? process, status = wait_one schedule! process.resume(status) end end # No processes, process group is no longer valid: @pgid = nil return self rescue Interrupt # If the user interrupts the wait, interrupt the process group and wait for them to finish: self.kill(:INT) # If user presses Ctrl-C again (or something else goes wrong), we will come out and kill(:TERM) in the ensure below: wait_all raise ensure # You'd only get here with running processes if some unexpected error was thrown in user code: begin self.kill(:TERM) rescue Errno::EPERM # Sometimes, `kill` code can give EPERM, if any signal couldn't be delivered to a child. This might occur if an exception is thrown in the user code (e.g. within the fiber), and there are other zombie processes which haven't been reaped yet. These should be dealt with below, so it shouldn't be an issue to ignore this condition. end # Clean up zombie processes - if user presses Ctrl-C or for some reason something else blows up, exception would propagate back to caller: wait_all end |