Class: Async::Container::Generic
- Inherits:
-
Object
- Object
- Async::Container::Generic
- Defined in:
- lib/async/container/generic.rb
Constant Summary collapse
- UNNAMED =
"Unnamed"
Instance Attribute Summary collapse
-
#state ⇒ Object
readonly
Returns the value of attribute state.
-
#statistics ⇒ Object
readonly
Returns the value of attribute statistics.
Class Method Summary collapse
Instance Method Summary collapse
- #[](key) ⇒ Object
- #async(**options, &block) ⇒ Object
- #failed? ⇒ Boolean
-
#initialize(**options) ⇒ Generic
constructor
A new instance of Generic.
- #key?(key) ⇒ Boolean
- #mark?(key) ⇒ Boolean
- #reload ⇒ Object
- #run(count: Container.processor_count, **options, &block) ⇒ Object
-
#running? ⇒ Boolean
Whether there are running tasks.
-
#sleep(duration = nil) ⇒ Object
Sleep until some state change occurs.
- #spawn(name: nil, restart: false, key: nil, &block) ⇒ Object
- #status?(flag) ⇒ Boolean
- #stop(timeout = true) ⇒ Object
- #to_s ⇒ Object
-
#wait ⇒ Object
Wait until all spawned tasks are completed.
- #wait_until_ready ⇒ Object
Constructor Details
#initialize(**options) ⇒ Generic
Returns a new instance of Generic.
56 57 58 59 60 61 62 63 64 |
# File 'lib/async/container/generic.rb', line 56 def initialize(**) @group = Group.new @running = true @state = {} @statistics = Statistics.new @keyed = {} end |
Instance Attribute Details
#state ⇒ Object (readonly)
Returns the value of attribute state.
66 67 68 |
# File 'lib/async/container/generic.rb', line 66 def state @state end |
#statistics ⇒ Object (readonly)
Returns the value of attribute statistics.
76 77 78 |
# File 'lib/async/container/generic.rb', line 76 def statistics @statistics end |
Class Method Details
.run(*arguments, **options, &block) ⇒ Object
50 51 52 |
# File 'lib/async/container/generic.rb', line 50 def self.run(*arguments, **, &block) self.new.run(*arguments, **, &block) end |
Instance Method Details
#[](key) ⇒ Object
72 73 74 |
# File 'lib/async/container/generic.rb', line 72 def [] key @keyed[key]&.value end |
#async(**options, &block) ⇒ Object
175 176 177 178 179 |
# File 'lib/async/container/generic.rb', line 175 def async(**, &block) spawn(**) do |instance| Async::Reactor.run(instance, &block) end end |
#failed? ⇒ Boolean
78 79 80 |
# File 'lib/async/container/generic.rb', line 78 def failed? @statistics.failed? end |
#key?(key) ⇒ Boolean
215 216 217 218 219 |
# File 'lib/async/container/generic.rb', line 215 def key?(key) if key @keyed.key?(key) end end |
#mark?(key) ⇒ Boolean
203 204 205 206 207 208 209 210 211 212 213 |
# File 'lib/async/container/generic.rb', line 203 def mark?(key) if key if value = @keyed[key] value.mark! return true end end return false end |
#reload ⇒ Object
189 190 191 192 193 194 195 196 197 198 199 200 201 |
# File 'lib/async/container/generic.rb', line 189 def reload @keyed.each_value(&:clear!) yield dirty = false @keyed.delete_if do |key, value| value.stop? && (dirty = true) end return dirty end |
#run(count: Container.processor_count, **options, &block) ⇒ Object
181 182 183 184 185 186 187 |
# File 'lib/async/container/generic.rb', line 181 def run(count: Container.processor_count, **, &block) count.times do spawn(**, &block) end return self end |
#running? ⇒ Boolean
Whether there are running tasks.
83 84 85 |
# File 'lib/async/container/generic.rb', line 83 def running? @group.running? end |
#sleep(duration = nil) ⇒ Object
Sleep until some state change occurs.
89 90 91 |
# File 'lib/async/container/generic.rb', line 89 def sleep(duration = nil) @group.sleep(duration) end |
#spawn(name: nil, restart: false, key: nil, &block) ⇒ Object
131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 |
# File 'lib/async/container/generic.rb', line 131 def spawn(name: nil, restart: false, key: nil, &block) name ||= UNNAMED if mark?(key) Async.logger.debug(self) {"Reusing existing child for #{key}: #{name}"} return false end @statistics.spawn! Fiber.new do while @running child = self.start(name, &block) state = insert(key, child) begin status = @group.wait_for(child) do || state.update() end ensure delete(key, child) end if status.success? Async.logger.info(self) {"#{child} exited with #{status}"} else @statistics.failure! Async.logger.error(self) {status} end if restart @statistics.restart! else break end end # ensure # Async.logger.error(self) {$!} if $! end.resume return true end |
#status?(flag) ⇒ Boolean
98 99 100 101 |
# File 'lib/async/container/generic.rb', line 98 def status?(flag) # This also returns true if all processes have exited/failed: @state.all?{|_, state| state[flag]} end |
#stop(timeout = true) ⇒ Object
120 121 122 123 124 125 126 127 128 129 |
# File 'lib/async/container/generic.rb', line 120 def stop(timeout = true) @running = false @group.stop(timeout) if @group.running? Async.logger.warn(self) {"Group is still running after stopping it!"} end ensure @running = true end |
#to_s ⇒ Object
68 69 70 |
# File 'lib/async/container/generic.rb', line 68 def to_s "#{self.class} with #{@statistics.spawns} spawns and #{@statistics.failures} failures." end |
#wait ⇒ Object
Wait until all spawned tasks are completed.
94 95 96 |
# File 'lib/async/container/generic.rb', line 94 def wait @group.wait end |
#wait_until_ready ⇒ Object
103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 |
# File 'lib/async/container/generic.rb', line 103 def wait_until_ready while true Async.logger.debug(self) do |buffer| buffer.puts "Waiting for ready:" @state.each do |child, state| buffer.puts "\t#{child.class}: #{state.inspect}" end end self.sleep if self.status?(:ready) return true end end end |