Class: Async::Container::Generic
- Inherits:
-
Object
- Object
- Async::Container::Generic
- Defined in:
- lib/async/container/generic.rb
Constant Summary collapse
- UNNAMED =
"Unnamed"
Instance Attribute Summary collapse
-
#state ⇒ Object
readonly
Returns the value of attribute state.
-
#statistics ⇒ Object
readonly
Returns the value of attribute statistics.
Class Method Summary collapse
Instance Method Summary collapse
- #[](key) ⇒ Object
- #async(**options, &block) ⇒ Object
- #failed? ⇒ Boolean
-
#initialize(**options) ⇒ Generic
constructor
A new instance of Generic.
- #key?(key) ⇒ Boolean
- #mark?(key) ⇒ Boolean
- #reload ⇒ Object
- #run(count: Container.processor_count, **options, &block) ⇒ Object
-
#running? ⇒ Boolean
Whether there are running tasks.
-
#sleep(duration = nil) ⇒ Object
Sleep until some state change occurs.
- #spawn(name: nil, restart: false, key: nil, &block) ⇒ Object
- #status?(flag) ⇒ Boolean
- #stop(timeout = true) ⇒ Object
- #to_s ⇒ Object
-
#wait ⇒ Object
Wait until all spawned tasks are completed.
- #wait_until_ready ⇒ Object
Constructor Details
#initialize(**options) ⇒ Generic
Returns a new instance of Generic.
47 48 49 50 51 52 53 54 55 |
# File 'lib/async/container/generic.rb', line 47 def initialize(**) @group = Group.new @running = true @state = {} @statistics = Statistics.new @keyed = {} end |
Instance Attribute Details
#state ⇒ Object (readonly)
Returns the value of attribute state.
57 58 59 |
# File 'lib/async/container/generic.rb', line 57 def state @state end |
#statistics ⇒ Object (readonly)
Returns the value of attribute statistics.
67 68 69 |
# File 'lib/async/container/generic.rb', line 67 def statistics @statistics end |
Class Method Details
.run(*arguments, **options, &block) ⇒ Object
41 42 43 |
# File 'lib/async/container/generic.rb', line 41 def self.run(*arguments, **, &block) self.new.run(*arguments, **, &block) end |
Instance Method Details
#[](key) ⇒ Object
63 64 65 |
# File 'lib/async/container/generic.rb', line 63 def [] key @keyed[key]&.value end |
#async(**options, &block) ⇒ Object
166 167 168 169 170 |
# File 'lib/async/container/generic.rb', line 166 def async(**, &block) spawn(**) do |instance| Async::Reactor.run(instance, &block) end end |
#failed? ⇒ Boolean
69 70 71 |
# File 'lib/async/container/generic.rb', line 69 def failed? @statistics.failed? end |
#key?(key) ⇒ Boolean
206 207 208 209 210 |
# File 'lib/async/container/generic.rb', line 206 def key?(key) if key @keyed.key?(key) end end |
#mark?(key) ⇒ Boolean
194 195 196 197 198 199 200 201 202 203 204 |
# File 'lib/async/container/generic.rb', line 194 def mark?(key) if key if value = @keyed[key] value.mark! return true end end return false end |
#reload ⇒ Object
180 181 182 183 184 185 186 187 188 189 190 191 192 |
# File 'lib/async/container/generic.rb', line 180 def reload @keyed.each_value(&:clear!) yield dirty = false @keyed.delete_if do |key, value| value.stop? && (dirty = true) end return dirty end |
#run(count: Container.processor_count, **options, &block) ⇒ Object
172 173 174 175 176 177 178 |
# File 'lib/async/container/generic.rb', line 172 def run(count: Container.processor_count, **, &block) count.times do spawn(**, &block) end return self end |
#running? ⇒ Boolean
Whether there are running tasks.
74 75 76 |
# File 'lib/async/container/generic.rb', line 74 def running? @group.running? end |
#sleep(duration = nil) ⇒ Object
Sleep until some state change occurs.
80 81 82 |
# File 'lib/async/container/generic.rb', line 80 def sleep(duration = nil) @group.sleep(duration) end |
#spawn(name: nil, restart: false, key: nil, &block) ⇒ Object
122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 |
# File 'lib/async/container/generic.rb', line 122 def spawn(name: nil, restart: false, key: nil, &block) name ||= UNNAMED if mark?(key) Async.logger.debug(self) {"Reusing existing child for #{key}: #{name}"} return false end @statistics.spawn! Fiber.new do while @running child = self.start(name, &block) state = insert(key, child) begin status = @group.wait_for(child) do || state.update() end ensure delete(key, child) end if status.success? Async.logger.info(self) {"#{child} exited with #{status}"} else @statistics.failure! Async.logger.error(self) {status} end if restart @statistics.restart! else break end end # ensure # Async.logger.error(self) {$!} if $! end.resume return true end |
#status?(flag) ⇒ Boolean
89 90 91 92 |
# File 'lib/async/container/generic.rb', line 89 def status?(flag) # This also returns true if all processes have exited/failed: @state.all?{|_, state| state[flag]} end |
#stop(timeout = true) ⇒ Object
111 112 113 114 115 116 117 118 119 120 |
# File 'lib/async/container/generic.rb', line 111 def stop(timeout = true) @running = false @group.stop(timeout) if @group.running? Async.logger.warn(self) {"Group is still running after stopping it!"} end ensure @running = true end |
#to_s ⇒ Object
59 60 61 |
# File 'lib/async/container/generic.rb', line 59 def to_s "#{self.class} with #{@statistics.spawns} spawns and #{@statistics.failures} failures." end |
#wait ⇒ Object
Wait until all spawned tasks are completed.
85 86 87 |
# File 'lib/async/container/generic.rb', line 85 def wait @group.wait end |
#wait_until_ready ⇒ Object
94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 |
# File 'lib/async/container/generic.rb', line 94 def wait_until_ready while true Async.logger.debug(self) do |buffer| buffer.puts "Waiting for ready:" @state.each do |child, state| buffer.puts "\t#{child.class}: #{state.inspect}" end end self.sleep if self.status?(:ready) return true end end end |