Class: Async::Container::Group
- Inherits:
-
Object
- Object
- Async::Container::Group
- Defined in:
- lib/async/container/group.rb
Overview
Manages a group of running processes.
Instance Attribute Summary collapse
-
#running ⇒ Object
readonly
Returns the value of attribute running.
- #the running tasks, indexed by IO.(runningtasks, indexedbyIO.) ⇒ Object readonly
Instance Method Summary collapse
- #any? ⇒ Boolean
- #empty? ⇒ Boolean
-
#initialize ⇒ Group
constructor
A new instance of Group.
- #interrupt ⇒ Object
- #running? ⇒ Boolean
-
#sleep(duration) ⇒ Object
This method sleeps for at most the specified duration.
- #stop(timeout = 1) ⇒ Object
- #terminate ⇒ Object
- #wait ⇒ Object
- #wait_for(channel) ⇒ Object
Constructor Details
#initialize ⇒ Group
Returns a new instance of Group.
33 34 35 36 37 38 |
# File 'lib/async/container/group.rb', line 33 def initialize @running = {} # This queue allows us to wait for processes to complete, without spawning new processes as a result. @queue = nil end |
Instance Attribute Details
#running ⇒ Object (readonly)
Returns the value of attribute running.
41 42 43 |
# File 'lib/async/container/group.rb', line 41 def running @running end |
#the running tasks, indexed by IO.(runningtasks, indexedbyIO.) ⇒ Object (readonly)
41 |
# File 'lib/async/container/group.rb', line 41 attr :running |
Instance Method Details
#any? ⇒ Boolean
47 48 49 |
# File 'lib/async/container/group.rb', line 47 def any? @running.any? end |
#empty? ⇒ Boolean
51 52 53 |
# File 'lib/async/container/group.rb', line 51 def empty? @running.empty? end |
#interrupt ⇒ Object
71 72 73 74 75 |
# File 'lib/async/container/group.rb', line 71 def interrupt @running.each_value do |fiber| fiber.resume(Interrupt) end end |
#running? ⇒ Boolean
43 44 45 |
# File 'lib/async/container/group.rb', line 43 def running? @running.any? end |
#sleep(duration) ⇒ Object
This method sleeps for at most the specified duration.
56 57 58 59 60 61 |
# File 'lib/async/container/group.rb', line 56 def sleep(duration) self.resume self.suspend self.wait_for_children(duration) end |
#stop(timeout = 1) ⇒ Object
83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 |
# File 'lib/async/container/group.rb', line 83 def stop(timeout = 1) # Handle legacy `graceful = true` argument: if timeout start_time = Async::Clock.now # Use a default timeout if not specified: timeout = 1 if timeout == true self.interrupt while self.any? duration = Async::Clock.now - start_time remaining = timeout - duration if remaining >= 0 self.wait_for_children(duration) else self.wait_for_children(0) break end end end # Timeout can also be `graceful = false`: if timeout self.interrupt self.sleep(timeout) end self.wait_for_children(duration) # Terminate all children: self.terminate # Wait for all children to exit: self.wait end |
#terminate ⇒ Object
77 78 79 80 81 |
# File 'lib/async/container/group.rb', line 77 def terminate @running.each_value do |fiber| fiber.resume(Terminate) end end |
#wait ⇒ Object
63 64 65 66 67 68 69 |
# File 'lib/async/container/group.rb', line 63 def wait self.resume while self.running? self.wait_for_children end end |
#wait_for(channel) ⇒ Object
121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 |
# File 'lib/async/container/group.rb', line 121 def wait_for(channel) io = channel.in @running[io] = Fiber.current while @running.key?(io) result = Fiber.yield if result == Interrupt channel.interrupt! elsif result == Terminate channel.terminate! elsif = channel.receive yield else return channel.wait end end ensure @running.delete(io) end |