Class: Async::Container::Group
- Inherits:
-
Object
- Object
- Async::Container::Group
- Defined in:
- lib/async/container/group.rb
Overview
Manages a group of running processes.
Instance Attribute Summary collapse
-
#running ⇒ Object
readonly
Returns the value of attribute running.
- #the running tasks, indexed by IO.(runningtasks, indexedbyIO.) ⇒ Object readonly
Instance Method Summary collapse
-
#any? ⇒ Boolean
Whether the group contains any running processes.
-
#empty? ⇒ Boolean
Whether the group is empty.
-
#health_check! ⇒ Object
Perform a health check on all running processes.
-
#initialize(health_check_interval: 1.0) ⇒ Group
constructor
Initialize an empty group.
- #inspect ⇒ Object
-
#interrupt ⇒ Object
Interrupt all running processes.
-
#kill ⇒ Object
Kill all running processes.
-
#running? ⇒ Boolean
Whether the group contains any running processes.
- #size ⇒ Object
-
#sleep(duration) ⇒ Object
Sleep for at most the specified duration until some state change occurs.
-
#stop(graceful = true, interrupt_timeout: INTERRUPT_TIMEOUT, terminate_timeout: TERMINATE_TIMEOUT) ⇒ Object
Stop all child processes with a multi-phase shutdown sequence.
-
#terminate ⇒ Object
Terminate all running processes.
-
#wait ⇒ Object
Begin any outstanding queued processes and wait for them indefinitely.
-
#wait_for(channel) ⇒ Object
Wait for a message in the specified Channel.
Constructor Details
#initialize(health_check_interval: 1.0) ⇒ Group
Initialize an empty group.
24 25 26 27 28 29 30 31 32 |
# File 'lib/async/container/group.rb', line 24 def initialize(health_check_interval: 1.0) @health_check_interval = health_check_interval # The running fibers, indexed by IO: @running = {} # This queue allows us to wait for processes to complete, without spawning new processes as a result. @queue = nil end |
Instance Attribute Details
#running ⇒ Object (readonly)
Returns the value of attribute running.
40 41 42 |
# File 'lib/async/container/group.rb', line 40 def running @running end |
#the running tasks, indexed by IO.(runningtasks, indexedbyIO.) ⇒ Object (readonly)
40 |
# File 'lib/async/container/group.rb', line 40 attr :running |
Instance Method Details
#any? ⇒ Boolean
Whether the group contains any running processes.
55 56 57 |
# File 'lib/async/container/group.rb', line 55 def any? @running.any? end |
#empty? ⇒ Boolean
Whether the group is empty.
61 62 63 |
# File 'lib/async/container/group.rb', line 61 def empty? @running.empty? end |
#health_check! ⇒ Object
Perform a health check on all running processes.
109 110 111 112 113 |
# File 'lib/async/container/group.rb', line 109 def health_check! each_running do |fiber| fiber.resume(:health_check!) end end |
#inspect ⇒ Object
35 36 37 |
# File 'lib/async/container/group.rb', line 35 def inspect "#<#{self.class} running=#{@running.size}>" end |
#interrupt ⇒ Object
Interrupt all running processes. This resumes the controlling fiber with an instance of Interrupt.
117 118 119 120 121 122 |
# File 'lib/async/container/group.rb', line 117 def interrupt Console.info(self, "Sending interrupt to #{@running.size} running processes...") each_running do |fiber| fiber.resume(Interrupt) end end |
#kill ⇒ Object
Kill all running processes. This resumes the controlling fiber with an instance of Kill.
135 136 137 138 139 140 |
# File 'lib/async/container/group.rb', line 135 def kill Console.info(self, "Sending kill to #{@running.size} running processes...") each_running do |fiber| fiber.resume(Kill) end end |
#running? ⇒ Boolean
Whether the group contains any running processes.
49 50 51 |
# File 'lib/async/container/group.rb', line 49 def running? @running.any? end |
#size ⇒ Object
43 44 45 |
# File 'lib/async/container/group.rb', line 43 def size @running.size end |
#sleep(duration) ⇒ Object
Sleep for at most the specified duration until some state change occurs.
66 67 68 69 70 71 |
# File 'lib/async/container/group.rb', line 66 def sleep(duration) self.resume self.suspend self.wait_for_children(duration) end |
#stop(graceful = true, interrupt_timeout: INTERRUPT_TIMEOUT, terminate_timeout: TERMINATE_TIMEOUT) ⇒ Object
Stop all child processes with a multi-phase shutdown sequence.
A graceful shutdown performs the following sequence:
-
Send SIGINT and wait up to ‘interrupt_timeout` seconds
-
Send SIGTERM and wait up to ‘terminate_timeout` seconds
-
Send SIGKILL and wait indefinitely for process cleanup
If ‘graceful` is false, skips the SIGINT phase and goes directly to SIGTERM → SIGKILL.
167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 |
# File 'lib/async/container/group.rb', line 167 def stop(graceful = true, interrupt_timeout: INTERRUPT_TIMEOUT, terminate_timeout: TERMINATE_TIMEOUT) case graceful when true # Use defaults. when false interrupt_timeout = nil when Numeric interrupt_timeout = graceful terminate_timeout = graceful end Console.debug(self, "Stopping all processes...", interrupt_timeout: interrupt_timeout, terminate_timeout: terminate_timeout) # If a timeout is specified, interrupt the children first: if interrupt_timeout clock = Async::Clock.start # Interrupt the children: self.interrupt # Wait for the children to exit: self.wait_for_exit(clock, interrupt_timeout) end if terminate_timeout and self.any? clock = Async::Clock.start # If the children are still running, terminate them: self.terminate # Wait for the children to exit: self.wait_for_exit(clock, terminate_timeout) end if any? self.kill self.wait end end |
#terminate ⇒ Object
Terminate all running processes. This resumes the controlling fiber with an instance of Terminate.
126 127 128 129 130 131 |
# File 'lib/async/container/group.rb', line 126 def terminate Console.info(self, "Sending terminate to #{@running.size} running processes...") each_running do |fiber| fiber.resume(Terminate) end end |
#wait ⇒ Object
Begin any outstanding queued processes and wait for them indefinitely.
74 75 76 77 78 79 80 |
# File 'lib/async/container/group.rb', line 74 def wait self.resume with_health_checks do |duration| self.wait_for_children(duration) end end |
#wait_for(channel) ⇒ Object
Wait for a message in the specified Channel.
208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 |
# File 'lib/async/container/group.rb', line 208 def wait_for(channel) io = channel.in @running[io] = Fiber.current while @running.key?(io) # Wait for some event on the channel: result = Fiber.yield if result == Interrupt channel.interrupt! elsif result == Terminate channel.terminate! elsif result == Kill channel.kill! elsif result yield result elsif = channel.receive yield else # Wait for the channel to exit: return channel.wait end end ensure @running.delete(io) end |