Class: Async::Container::Generic
- Inherits:
-
Object
- Object
- Async::Container::Generic
- Defined in:
- lib/async/container/generic.rb
Overview
A base class for implementing containers.
Constant Summary collapse
- UNNAMED =
"Unnamed"
Instance Attribute Summary collapse
-
#group ⇒ Object
readonly
Returns the value of attribute group.
-
#state ⇒ Object
readonly
Returns the value of attribute state.
-
#statistics ⇒ Object
Statistics relating to the behavior of children instances.
Class Method Summary collapse
Instance Method Summary collapse
-
#[](key) ⇒ Object
Look up a child process by key.
-
#async(**options, &block) ⇒ Object
deprecated
Deprecated.
Please use #spawn or Generic.run instead.
-
#failed? ⇒ Boolean
Whether any failures have occurred within the container.
-
#initialize(**options) ⇒ Generic
constructor
A new instance of Generic.
-
#key?(key) ⇒ Boolean
Whether a child instance exists for the given key.
-
#mark?(key) ⇒ Boolean
Mark the container’s keyed instance which ensures that it won’t be discarded.
-
#reload ⇒ Object
Reload the container’s keyed instances.
-
#run(count: Container.processor_count, **options, &block) ⇒ Object
Run multiple instances of the same block in the container.
-
#running? ⇒ Boolean
Whether the container has running children instances.
-
#sleep(duration = nil) ⇒ Object
Sleep until some state change occurs.
-
#spawn(name: nil, restart: false, key: nil, &block) ⇒ Object
Spawn a child instance into the container.
-
#status?(flag) ⇒ Boolean
Returns true if all children instances have the specified status flag set.
-
#stop(timeout = true) ⇒ Object
Stop the children instances.
-
#to_s ⇒ Object
A human readable representation of the container.
-
#wait ⇒ Object
Wait until all spawned tasks are completed.
-
#wait_until_ready ⇒ Object
Wait until all the children instances have indicated that they are ready.
Constructor Details
#initialize(**options) ⇒ Generic
Returns a new instance of Generic.
42 43 44 45 46 47 48 49 50 |
# File 'lib/async/container/generic.rb', line 42 def initialize(**) @group = Group.new @running = true @state = {} @statistics = Statistics.new @keyed = {} end |
Instance Attribute Details
#group ⇒ Object (readonly)
Returns the value of attribute group.
52 53 54 |
# File 'lib/async/container/generic.rb', line 52 def group @group end |
#state ⇒ Object (readonly)
Returns the value of attribute state.
54 55 56 |
# File 'lib/async/container/generic.rb', line 54 def state @state end |
#statistics ⇒ Object
Statistics relating to the behavior of children instances.
70 71 72 |
# File 'lib/async/container/generic.rb', line 70 def statistics @statistics end |
Class Method Details
.run(*arguments, **options, &block) ⇒ Object
36 37 38 |
# File 'lib/async/container/generic.rb', line 36 def self.run(*arguments, **, &block) self.new.run(*arguments, **, &block) end |
Instance Method Details
#[](key) ⇒ Object
Look up a child process by key. A key could be a symbol, a file path, or something else which the child instance represents.
64 65 66 |
# File 'lib/async/container/generic.rb', line 64 def [] key @keyed[key]&.value end |
#async(**options, &block) ⇒ Object
194 195 196 197 198 |
# File 'lib/async/container/generic.rb', line 194 def async(**, &block) spawn(**) do |instance| Async::Reactor.run(instance, &block) end end |
#failed? ⇒ Boolean
Whether any failures have occurred within the container.
74 75 76 |
# File 'lib/async/container/generic.rb', line 74 def failed? @statistics.failed? end |
#key?(key) ⇒ Boolean
Whether a child instance exists for the given key.
229 230 231 232 233 |
# File 'lib/async/container/generic.rb', line 229 def key?(key) if key @keyed.key?(key) end end |
#mark?(key) ⇒ Boolean
Mark the container’s keyed instance which ensures that it won’t be discarded.
216 217 218 219 220 221 222 223 224 225 226 |
# File 'lib/async/container/generic.rb', line 216 def mark?(key) if key if value = @keyed[key] value.mark! return true end end return false end |
#reload ⇒ Object
Reload the container’s keyed instances.
201 202 203 204 205 206 207 208 209 210 211 212 213 |
# File 'lib/async/container/generic.rb', line 201 def reload @keyed.each_value(&:clear!) yield dirty = false @keyed.delete_if do |key, value| value.stop? && (dirty = true) end return dirty end |
#run(count: Container.processor_count, **options, &block) ⇒ Object
Run multiple instances of the same block in the container.
185 186 187 188 189 190 191 |
# File 'lib/async/container/generic.rb', line 185 def run(count: Container.processor_count, **, &block) count.times do spawn(**, &block) end return self end |
#running? ⇒ Boolean
Whether the container has running children instances.
79 80 81 |
# File 'lib/async/container/generic.rb', line 79 def running? @group.running? end |
#sleep(duration = nil) ⇒ Object
Sleep until some state change occurs.
85 86 87 |
# File 'lib/async/container/generic.rb', line 85 def sleep(duration = nil) @group.sleep(duration) end |
#spawn(name: nil, restart: false, key: nil, &block) ⇒ Object
Spawn a child instance into the container.
139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 |
# File 'lib/async/container/generic.rb', line 139 def spawn(name: nil, restart: false, key: nil, &block) name ||= UNNAMED if mark?(key) Console.logger.debug(self) {"Reusing existing child for #{key}: #{name}"} return false end @statistics.spawn! fiber do while @running child = self.start(name, &block) state = insert(key, child) begin status = @group.wait_for(child) do || state.update() end ensure delete(key, child) end if status.success? Console.logger.debug(self) {"#{child} exited with #{status}"} else @statistics.failure! Console.logger.error(self) {status} end if restart @statistics.restart! else break end end # ensure # Console.logger.error(self) {$!} if $! end.resume return true end |
#status?(flag) ⇒ Boolean
Returns true if all children instances have the specified status flag set. e.g. ‘:ready`. This state is updated by the process readiness protocol mechanism. See Notify::Client for more details.
98 99 100 101 |
# File 'lib/async/container/generic.rb', line 98 def status?(flag) # This also returns true if all processes have exited/failed: @state.all?{|_, state| state[flag]} end |
#stop(timeout = true) ⇒ Object
Stop the children instances.
124 125 126 127 128 129 130 131 132 133 |
# File 'lib/async/container/generic.rb', line 124 def stop(timeout = true) @running = false @group.stop(timeout) if @group.running? Console.logger.warn(self) {"Group is still running after stopping it!"} end ensure @running = true end |
#to_s ⇒ Object
A human readable representation of the container.
58 59 60 |
# File 'lib/async/container/generic.rb', line 58 def to_s "#{self.class} with #{@statistics.spawns} spawns and #{@statistics.failures} failures." end |
#wait ⇒ Object
Wait until all spawned tasks are completed.
90 91 92 |
# File 'lib/async/container/generic.rb', line 90 def wait @group.wait end |
#wait_until_ready ⇒ Object
Wait until all the children instances have indicated that they are ready.
105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 |
# File 'lib/async/container/generic.rb', line 105 def wait_until_ready while true Console.logger.debug(self) do |buffer| buffer.puts "Waiting for ready:" @state.each do |child, state| buffer.puts "\t#{child.class}: #{state.inspect}" end end self.sleep if self.status?(:ready) return true end end end |