Class: Async::Container::Generic
- Inherits:
-
Object
- Object
- Async::Container::Generic
- Defined in:
- lib/async/container/generic.rb
Constant Summary collapse
- UNNAMED =
"Unnamed"
Instance Attribute Summary collapse
-
#state ⇒ Object
readonly
Returns the value of attribute state.
-
#statistics ⇒ Object
readonly
Returns the value of attribute statistics.
Class Method Summary collapse
Instance Method Summary collapse
- #[](key) ⇒ Object
- #async(**options, &block) ⇒ Object
- #failed? ⇒ Boolean
-
#initialize(**options) ⇒ Generic
constructor
A new instance of Generic.
- #key?(key) ⇒ Boolean
- #mark?(key) ⇒ Boolean
- #reload ⇒ Object
- #run(count: Container.processor_count, **options, &block) ⇒ Object
-
#running? ⇒ Boolean
Whether there are running tasks.
-
#sleep(duration = nil) ⇒ Object
Sleep until some state change occurs.
- #spawn(name: nil, restart: false, key: nil, &block) ⇒ Object
- #status?(flag) ⇒ Boolean
- #stop(timeout = true) ⇒ Object
- #to_s ⇒ Object
-
#wait ⇒ Object
Wait until all spawned tasks are completed.
- #wait_until_ready ⇒ Object
Constructor Details
#initialize(**options) ⇒ Generic
Returns a new instance of Generic.
45 46 47 48 49 50 51 52 53 |
# File 'lib/async/container/generic.rb', line 45 def initialize(**) @group = Group.new @running = true @state = {} @statistics = Statistics.new @keyed = {} end |
Instance Attribute Details
#state ⇒ Object (readonly)
Returns the value of attribute state.
55 56 57 |
# File 'lib/async/container/generic.rb', line 55 def state @state end |
#statistics ⇒ Object (readonly)
Returns the value of attribute statistics.
65 66 67 |
# File 'lib/async/container/generic.rb', line 65 def statistics @statistics end |
Class Method Details
.run(*arguments, **options, &block) ⇒ Object
39 40 41 |
# File 'lib/async/container/generic.rb', line 39 def self.run(*arguments, **, &block) self.new.run(*arguments, **, &block) end |
Instance Method Details
#[](key) ⇒ Object
61 62 63 |
# File 'lib/async/container/generic.rb', line 61 def [] key @keyed[key]&.value end |
#async(**options, &block) ⇒ Object
164 165 166 167 168 |
# File 'lib/async/container/generic.rb', line 164 def async(**, &block) spawn(**) do |instance| Async::Reactor.run(instance, &block) end end |
#failed? ⇒ Boolean
67 68 69 |
# File 'lib/async/container/generic.rb', line 67 def failed? @statistics.failed? end |
#key?(key) ⇒ Boolean
204 205 206 207 208 |
# File 'lib/async/container/generic.rb', line 204 def key?(key) if key @keyed.key?(key) end end |
#mark?(key) ⇒ Boolean
192 193 194 195 196 197 198 199 200 201 202 |
# File 'lib/async/container/generic.rb', line 192 def mark?(key) if key if value = @keyed[key] value.mark! return true end end return false end |
#reload ⇒ Object
178 179 180 181 182 183 184 185 186 187 188 189 190 |
# File 'lib/async/container/generic.rb', line 178 def reload @keyed.each_value(&:clear!) yield dirty = false @keyed.delete_if do |key, value| value.stop? && (dirty = true) end return dirty end |
#run(count: Container.processor_count, **options, &block) ⇒ Object
170 171 172 173 174 175 176 |
# File 'lib/async/container/generic.rb', line 170 def run(count: Container.processor_count, **, &block) count.times do spawn(**, &block) end return self end |
#running? ⇒ Boolean
Whether there are running tasks.
72 73 74 |
# File 'lib/async/container/generic.rb', line 72 def running? @group.running? end |
#sleep(duration = nil) ⇒ Object
Sleep until some state change occurs.
78 79 80 |
# File 'lib/async/container/generic.rb', line 78 def sleep(duration = nil) @group.sleep(duration) end |
#spawn(name: nil, restart: false, key: nil, &block) ⇒ Object
120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 |
# File 'lib/async/container/generic.rb', line 120 def spawn(name: nil, restart: false, key: nil, &block) name ||= UNNAMED if mark?(key) Async.logger.debug(self) {"Reusing existing child for #{key}: #{name}"} return false end @statistics.spawn! Fiber.new do while @running child = self.start(name, &block) state = insert(key, child) begin status = @group.wait_for(child) do || state.update() end ensure delete(key, child) end if status.success? Async.logger.info(self) {"#{child} #{status}"} else @statistics.failure! Async.logger.error(self) {status} end if restart @statistics.restart! else break end end # ensure # Async.logger.error(self) {$!} if $! end.resume return true end |
#status?(flag) ⇒ Boolean
87 88 89 90 |
# File 'lib/async/container/generic.rb', line 87 def status?(flag) # This also returns true if all processes have exited/failed: @state.all?{|_, state| state[flag]} end |
#stop(timeout = true) ⇒ Object
109 110 111 112 113 114 115 116 117 118 |
# File 'lib/async/container/generic.rb', line 109 def stop(timeout = true) @running = false @group.stop(timeout) if @group.running? Async.logger.warn(self) {"Group is still running after stopping it!"} end ensure @running = true end |
#to_s ⇒ Object
57 58 59 |
# File 'lib/async/container/generic.rb', line 57 def to_s "#{self.class} with #{@statistics.spawns} spawns and #{@statistics.failures} failures." end |
#wait ⇒ Object
Wait until all spawned tasks are completed.
83 84 85 |
# File 'lib/async/container/generic.rb', line 83 def wait @group.wait end |
#wait_until_ready ⇒ Object
92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 |
# File 'lib/async/container/generic.rb', line 92 def wait_until_ready while true Async.logger.debug(self) do |buffer| buffer.puts "Waiting for ready:" @state.each do |child, state| buffer.puts "\t#{child.class}: #{state.inspect}" end end self.sleep if self.status?(:ready) return true end end end |