Class: Async::Container::Group
- Inherits:
-
Object
- Object
- Async::Container::Group
- Defined in:
- lib/async/container/group.rb
Overview
Manages a group of running processes.
Instance Attribute Summary collapse
-
#running ⇒ Object
readonly
Returns the value of attribute running.
- #the running tasks, indexed by IO.(runningtasks, indexedbyIO.) ⇒ Object readonly
Instance Method Summary collapse
-
#any? ⇒ Boolean
Whether the group contains any running processes.
-
#empty? ⇒ Boolean
Whether the group is empty.
-
#initialize ⇒ Group
constructor
Initialize an empty group.
- #inspect ⇒ Object
-
#interrupt ⇒ Object
Interrupt all running processes.
-
#running? ⇒ Boolean
Whether the group contains any running processes.
-
#sleep(duration) ⇒ Object
Sleep for at most the specified duration until some state change occurs.
-
#stop(timeout = 1) ⇒ Object
Stop all child processes using #terminate.
-
#terminate ⇒ Object
Terminate all running processes.
-
#wait ⇒ Object
Begin any outstanding queued processes and wait for them indefinitely.
-
#wait_for(channel) ⇒ Object
Wait for a message in the specified Channel.
Constructor Details
#initialize ⇒ Group
Initialize an empty group.
16 17 18 19 20 21 |
# File 'lib/async/container/group.rb', line 16 def initialize @running = {} # This queue allows us to wait for processes to complete, without spawning new processes as a result. @queue = nil end |
Instance Attribute Details
#running ⇒ Object (readonly)
Returns the value of attribute running.
28 29 30 |
# File 'lib/async/container/group.rb', line 28 def running @running end |
#the running tasks, indexed by IO.(runningtasks, indexedbyIO.) ⇒ Object (readonly)
28 |
# File 'lib/async/container/group.rb', line 28 attr :running |
Instance Method Details
#any? ⇒ Boolean
Whether the group contains any running processes.
38 39 40 |
# File 'lib/async/container/group.rb', line 38 def any? @running.any? end |
#empty? ⇒ Boolean
Whether the group is empty.
44 45 46 |
# File 'lib/async/container/group.rb', line 44 def empty? @running.empty? end |
#inspect ⇒ Object
23 24 25 |
# File 'lib/async/container/group.rb', line 23 def inspect "#<#{self.class} running=#{@running.size}>" end |
#interrupt ⇒ Object
Interrupt all running processes. This resumes the controlling fiber with an instance of Interrupt.
67 68 69 70 71 72 |
# File 'lib/async/container/group.rb', line 67 def interrupt Console.logger.debug(self, "Sending interrupt to #{@running.size} running processes...") @running.each_value do |fiber| fiber.resume(Interrupt) end end |
#running? ⇒ Boolean
Whether the group contains any running processes.
32 33 34 |
# File 'lib/async/container/group.rb', line 32 def running? @running.any? end |
#sleep(duration) ⇒ Object
Sleep for at most the specified duration until some state change occurs.
49 50 51 52 53 54 |
# File 'lib/async/container/group.rb', line 49 def sleep(duration) self.resume self.suspend self.wait_for_children(duration) end |
#stop(timeout = 1) ⇒ Object
Stop all child processes using #terminate.
85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 |
# File 'lib/async/container/group.rb', line 85 def stop(timeout = 1) # Use a default timeout if not specified: timeout = 1 if timeout == true if timeout start_time = Async::Clock.now self.interrupt while self.any? duration = Async::Clock.now - start_time remaining = timeout - duration if remaining >= 0 self.wait_for_children(duration) else self.wait_for_children(0) break end end end # Terminate all children: self.terminate # Wait for all children to exit: self.wait end |
#terminate ⇒ Object
Terminate all running processes. This resumes the controlling fiber with an instance of Terminate.
76 77 78 79 80 81 |
# File 'lib/async/container/group.rb', line 76 def terminate Console.logger.debug(self, "Sending terminate to #{@running.size} running processes...") @running.each_value do |fiber| fiber.resume(Terminate) end end |
#wait ⇒ Object
Begin any outstanding queued processes and wait for them indefinitely.
57 58 59 60 61 62 63 |
# File 'lib/async/container/group.rb', line 57 def wait self.resume while self.running? self.wait_for_children end end |
#wait_for(channel) ⇒ Object
Wait for a message in the specified Channel.
115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 |
# File 'lib/async/container/group.rb', line 115 def wait_for(channel) io = channel.in @running[io] = Fiber.current while @running.key?(io) result = Fiber.yield if result == Interrupt channel.interrupt! elsif result == Terminate channel.terminate! elsif = channel.receive yield else return channel.wait end end ensure @running.delete(io) end |