Class: Async::Container::Generic
- Inherits:
-
Object
- Object
- Async::Container::Generic
- Defined in:
- lib/async/container/generic.rb
Overview
A base class for implementing containers.
Constant Summary collapse
- UNNAMED =
"Unnamed"
Instance Attribute Summary collapse
-
#group ⇒ Object
readonly
Returns the value of attribute group.
-
#state ⇒ Object
readonly
Returns the value of attribute state.
-
#statistics ⇒ Object
Statistics relating to the behavior of children instances.
- #The group of running children instances.(groupofrunningchildreninstances.) ⇒ Object readonly
Class Method Summary collapse
-
.run ⇒ Object
Run a new container.
Instance Method Summary collapse
-
#[](key) ⇒ Object
Look up a child process by key.
-
#async(**options, &block) ⇒ Object
deprecated
Deprecated.
Please use #spawn or Generic.run instead.
-
#failed? ⇒ Boolean
Whether any failures have occurred within the container.
-
#initialize(**options) ⇒ Generic
constructor
Initialize the container.
-
#key?(key) ⇒ Boolean
Whether a child instance exists for the given key.
-
#mark?(key) ⇒ Boolean
Mark the container’s keyed instance which ensures that it won’t be discarded.
-
#reload ⇒ Object
Reload the container’s keyed instances.
-
#run(count: Container.processor_count, **options, &block) ⇒ Object
Run multiple instances of the same block in the container.
-
#running? ⇒ Boolean
Whether the container has running children instances.
- #size ⇒ Object
-
#sleep(duration = nil) ⇒ Object
Sleep until some state change occurs.
-
#spawn(name: nil, restart: false, key: nil, health_check_timeout: nil, &block) ⇒ Object
Spawn a child instance into the container.
-
#status?(flag) ⇒ Boolean
Returns true if all children instances have the specified status flag set.
-
#stop(timeout = true) ⇒ Object
Stop the children instances.
- #The state of each child instance.=(stateofeachchildinstance. = (value)) ⇒ Object
-
#to_s ⇒ Object
A human readable representation of the container.
-
#wait ⇒ Object
Wait until all spawned tasks are completed.
-
#wait_until_ready ⇒ Object
Wait until all the children instances have indicated that they are ready.
Constructor Details
#initialize(**options) ⇒ Generic
Initialize the container.
46 47 48 49 50 51 52 53 54 |
# File 'lib/async/container/generic.rb', line 46 def initialize(**) @group = Group.new(**) @running = true @state = {} @statistics = Statistics.new @keyed = {} end |
Instance Attribute Details
#group ⇒ Object (readonly)
Returns the value of attribute group.
57 58 59 |
# File 'lib/async/container/generic.rb', line 57 def group @group end |
#state ⇒ Object (readonly)
Returns the value of attribute state.
65 66 67 |
# File 'lib/async/container/generic.rb', line 65 def state @state end |
#statistics ⇒ Object
Statistics relating to the behavior of children instances.
81 82 83 |
# File 'lib/async/container/generic.rb', line 81 def statistics @statistics end |
#The group of running children instances.(groupofrunningchildreninstances.) ⇒ Object (readonly)
57 |
# File 'lib/async/container/generic.rb', line 57 attr :group |
Class Method Details
.run ⇒ Object
Run a new container.
37 38 39 |
# File 'lib/async/container/generic.rb', line 37 def self.run(...) self.new.run(...) end |
Instance Method Details
#[](key) ⇒ Object
Look up a child process by key. A key could be a symbol, a file path, or something else which the child instance represents.
75 76 77 |
# File 'lib/async/container/generic.rb', line 75 def [] key @keyed[key]&.value end |
#async(**options, &block) ⇒ Object
244 245 246 247 248 249 250 251 252 |
# File 'lib/async/container/generic.rb', line 244 def async(**, &block) # warn "#{self.class}##{__method__} is deprecated, please use `spawn` or `run` instead.", uplevel: 1 require "async" spawn(**) do |instance| Async(instance, &block) end end |
#failed? ⇒ Boolean
Whether any failures have occurred within the container.
85 86 87 |
# File 'lib/async/container/generic.rb', line 85 def failed? @statistics.failed? end |
#key?(key) ⇒ Boolean
Whether a child instance exists for the given key.
283 284 285 286 287 |
# File 'lib/async/container/generic.rb', line 283 def key?(key) if key @keyed.key?(key) end end |
#mark?(key) ⇒ Boolean
Mark the container’s keyed instance which ensures that it won’t be discarded.
270 271 272 273 274 275 276 277 278 279 280 |
# File 'lib/async/container/generic.rb', line 270 def mark?(key) if key if value = @keyed[key] value.mark! return true end end return false end |
#reload ⇒ Object
Reload the container’s keyed instances.
255 256 257 258 259 260 261 262 263 264 265 266 267 |
# File 'lib/async/container/generic.rb', line 255 def reload @keyed.each_value(&:clear!) yield dirty = false @keyed.delete_if do |key, value| value.stop? && (dirty = true) end return dirty end |
#run(count: Container.processor_count, **options, &block) ⇒ Object
Run multiple instances of the same block in the container.
235 236 237 238 239 240 241 |
# File 'lib/async/container/generic.rb', line 235 def run(count: Container.processor_count, **, &block) count.times do spawn(**, &block) end return self end |
#running? ⇒ Boolean
Whether the container has running children instances.
90 91 92 |
# File 'lib/async/container/generic.rb', line 90 def running? @group.running? end |
#size ⇒ Object
60 61 62 |
# File 'lib/async/container/generic.rb', line 60 def size @group.size end |
#sleep(duration = nil) ⇒ Object
Sleep until some state change occurs.
96 97 98 |
# File 'lib/async/container/generic.rb', line 96 def sleep(duration = nil) @group.sleep(duration) end |
#spawn(name: nil, restart: false, key: nil, health_check_timeout: nil, &block) ⇒ Object
Spawn a child instance into the container.
171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 |
# File 'lib/async/container/generic.rb', line 171 def spawn(name: nil, restart: false, key: nil, health_check_timeout: nil, &block) name ||= UNNAMED if mark?(key) Console.debug(self, "Reusing existing child.", child: {key: key, name: name}) return false end @statistics.spawn! fiber do while @running Console.debug(self, "Starting child...", child: {key: key, name: name, restart: restart, health_check_timeout: health_check_timeout}, statistics: @statistics) child = self.start(name, &block) state = insert(key, child) Console.debug(self, "Started child.", child: child, spawn: {key: key, restart: restart, health_check_timeout: health_check_timeout}, statistics: @statistics) # If a health check is specified, we will monitor the child process and terminate it if it does not update its state within the specified time. if health_check_timeout age_clock = state[:age] = Clock.start end status = nil begin status = @group.wait_for(child) do || case when :health_check! if health_check_timeout&.<(age_clock.total) health_check_failed!(child, age_clock, health_check_timeout) end else state.update() age_clock&.reset! end end rescue => error Console.error(self, "Error during child process management!", exception: error, running: @running) ensure delete(key, child) end if status&.success? Console.debug(self, "Child exited successfully.", status: status, running: @running) else @statistics.failure! Console.error(self, "Child exited with error!", status: status, running: @running) end if restart @statistics.restart! else break end end end.resume return true end |
#status?(flag) ⇒ Boolean
Returns true if all children instances have the specified status flag set. e.g. ‘:ready`. This state is updated by the process readiness protocol mechanism. See Notify::Client for more details.
109 110 111 112 |
# File 'lib/async/container/generic.rb', line 109 def status?(flag) # This also returns true if all processes have exited/failed: @state.all?{|_, state| state[flag]} end |
#stop(timeout = true) ⇒ Object
Stop the children instances.
142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 |
# File 'lib/async/container/generic.rb', line 142 def stop(timeout = true) Console.info(self, "Stopping container...", timeout: timeout, caller: caller_locations) @running = false @group.stop(timeout) if @group.running? Console.warn(self, "Group is still running after stopping it!") else Console.info(self, "Group has stopped.") end rescue => error Console.error(self, "Error while stopping container!", exception: error) raise ensure @running = true end |
#The state of each child instance.=(stateofeachchildinstance. = (value)) ⇒ Object
65 |
# File 'lib/async/container/generic.rb', line 65 attr :state |
#to_s ⇒ Object
A human readable representation of the container.
69 70 71 |
# File 'lib/async/container/generic.rb', line 69 def to_s "#{self.class} with #{@statistics.spawns} spawns and #{@statistics.failures} failures." end |
#wait ⇒ Object
Wait until all spawned tasks are completed.
101 102 103 |
# File 'lib/async/container/generic.rb', line 101 def wait @group.wait end |
#wait_until_ready ⇒ Object
Wait until all the children instances have indicated that they are ready.
116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 |
# File 'lib/async/container/generic.rb', line 116 def wait_until_ready while true Console.debug(self) do |buffer| buffer.puts "Waiting for ready:" @state.each do |child, state| buffer.puts "\t#{child.inspect}: #{state}" end end self.sleep if self.status?(:ready) Console.logger.debug(self) do |buffer| buffer.puts "All ready:" @state.each do |child, state| buffer.puts "\t#{child.inspect}: #{state}" end end return true end end end |