Class: Async::Container::Group
- Inherits:
-
Object
- Object
- Async::Container::Group
- Defined in:
- lib/async/container/group.rb
Overview
Manages a group of running processes.
Instance Attribute Summary collapse
-
#running ⇒ Object
readonly
Returns the value of attribute running.
- #the running tasks, indexed by IO.(runningtasks, indexedbyIO.) ⇒ Object readonly
Instance Method Summary collapse
- #any? ⇒ Boolean
- #empty? ⇒ Boolean
-
#initialize ⇒ Group
constructor
A new instance of Group.
- #interrupt ⇒ Object
- #running? ⇒ Boolean
-
#sleep(duration) ⇒ Object
This method sleeps for at most the specified duration.
- #stop(timeout = 1) ⇒ Object
- #terminate ⇒ Object
- #wait ⇒ Object
- #wait_for(channel) ⇒ Object
Constructor Details
#initialize ⇒ Group
Returns a new instance of Group.
31 32 33 34 35 36 |
# File 'lib/async/container/group.rb', line 31 def initialize @running = {} # This queue allows us to wait for processes to complete, without spawning new processes as a result. @queue = nil end |
Instance Attribute Details
#running ⇒ Object (readonly)
Returns the value of attribute running.
39 40 41 |
# File 'lib/async/container/group.rb', line 39 def running @running end |
#the running tasks, indexed by IO.(runningtasks, indexedbyIO.) ⇒ Object (readonly)
39 |
# File 'lib/async/container/group.rb', line 39 attr :running |
Instance Method Details
#any? ⇒ Boolean
45 46 47 |
# File 'lib/async/container/group.rb', line 45 def any? @running.any? end |
#empty? ⇒ Boolean
49 50 51 |
# File 'lib/async/container/group.rb', line 49 def empty? @running.empty? end |
#interrupt ⇒ Object
69 70 71 72 73 |
# File 'lib/async/container/group.rb', line 69 def interrupt @running.each_value do |fiber| fiber.resume(Interrupt) end end |
#running? ⇒ Boolean
41 42 43 |
# File 'lib/async/container/group.rb', line 41 def running? @running.any? end |
#sleep(duration) ⇒ Object
This method sleeps for at most the specified duration.
54 55 56 57 58 59 |
# File 'lib/async/container/group.rb', line 54 def sleep(duration) self.resume self.suspend self.wait_for_children(duration) end |
#stop(timeout = 1) ⇒ Object
81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 |
# File 'lib/async/container/group.rb', line 81 def stop(timeout = 1) # Handle legacy `graceful = true` argument: if timeout start_time = Async::Clock.now # Use a default timeout if not specified: timeout = 1 if timeout == true self.interrupt while self.any? duration = Async::Clock.now - start_time remaining = timeout - duration if remaining >= 0 self.wait_for_children(duration) else self.wait_for_children(0) break end end end # Timeout can also be `graceful = false`: if timeout self.interrupt self.sleep(timeout) end self.wait_for_children(duration) # Terminate all children: self.terminate # Wait for all children to exit: self.wait end |
#terminate ⇒ Object
75 76 77 78 79 |
# File 'lib/async/container/group.rb', line 75 def terminate @running.each_value do |fiber| fiber.resume(Terminate) end end |
#wait ⇒ Object
61 62 63 64 65 66 67 |
# File 'lib/async/container/group.rb', line 61 def wait self.resume while self.running? self.wait_for_children end end |
#wait_for(channel) ⇒ Object
119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 |
# File 'lib/async/container/group.rb', line 119 def wait_for(channel) io = channel.in @running[io] = Fiber.current while @running.key?(io) result = Fiber.yield if result == Interrupt channel.interrupt! elsif result == Terminate channel.terminate! elsif = channel.receive yield else return channel.wait end end ensure @running.delete(io) end |