Class: Async::Container::Group

Inherits:
Object
  • Object
show all
Defined in:
lib/async/container/group.rb

Overview

Manages a group of running processes.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeGroup

Initialize an empty group.



16
17
18
19
20
21
# File 'lib/async/container/group.rb', line 16

def initialize
  @running = {}
  
  # This queue allows us to wait for processes to complete, without spawning new processes as a result.
  @queue = nil
end

Instance Attribute Details

#runningObject (readonly)

Returns the value of attribute running.



24
25
26
# File 'lib/async/container/group.rb', line 24

def running
  @running
end

#the running tasks, indexed by IO.(runningtasks, indexedbyIO.) ⇒ Object (readonly)



24
# File 'lib/async/container/group.rb', line 24

attr :running

Instance Method Details

#any?Boolean

Whether the group contains any running processes.

Returns:

  • (Boolean)


34
35
36
# File 'lib/async/container/group.rb', line 34

def any?
  @running.any?
end

#empty?Boolean

Whether the group is empty.

Returns:

  • (Boolean)


40
41
42
# File 'lib/async/container/group.rb', line 40

def empty?
  @running.empty?
end

#interruptObject

Interrupt all running processes. This resumes the controlling fiber with an instance of Interrupt.



63
64
65
66
67
68
# File 'lib/async/container/group.rb', line 63

def interrupt
  Console.logger.debug(self, "Sending interrupt to #{@running.size} running processes...")
  @running.each_value do |fiber|
    fiber.resume(Interrupt)
  end
end

#running?Boolean

Whether the group contains any running processes.

Returns:

  • (Boolean)


28
29
30
# File 'lib/async/container/group.rb', line 28

def running?
  @running.any?
end

#sleep(duration) ⇒ Object

Sleep for at most the specified duration until some state change occurs.



45
46
47
48
49
50
# File 'lib/async/container/group.rb', line 45

def sleep(duration)
  self.resume
  self.suspend
  
  self.wait_for_children(duration)
end

#stop(timeout = 1) ⇒ Object

Stop all child processes using #terminate.



81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
# File 'lib/async/container/group.rb', line 81

def stop(timeout = 1)
  # Use a default timeout if not specified:
  timeout = 1 if timeout == true
  
  if timeout
    start_time = Async::Clock.now
    
    self.interrupt
    
    while self.any?
      duration = Async::Clock.now - start_time
      remaining = timeout - duration
      
      if remaining >= 0
        self.wait_for_children(duration)
      else
        self.wait_for_children(0)
        break
      end
    end
  end
  
  # Terminate all children:
  self.terminate
  
  # Wait for all children to exit:
  self.wait
end

#terminateObject

Terminate all running processes. This resumes the controlling fiber with an instance of Terminate.



72
73
74
75
76
77
# File 'lib/async/container/group.rb', line 72

def terminate
  Console.logger.debug(self, "Sending terminate to #{@running.size} running processes...")
  @running.each_value do |fiber|
    fiber.resume(Terminate)
  end
end

#waitObject

Begin any outstanding queued processes and wait for them indefinitely.



53
54
55
56
57
58
59
# File 'lib/async/container/group.rb', line 53

def wait
  self.resume
  
  while self.running?
    self.wait_for_children
  end
end

#wait_for(channel) ⇒ Object

Wait for a message in the specified Channel.



111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
# File 'lib/async/container/group.rb', line 111

def wait_for(channel)
  io = channel.in
  
  @running[io] = Fiber.current
  
  while @running.key?(io)
    result = Fiber.yield
    
    if result == Interrupt
      channel.interrupt!
    elsif result == Terminate
      channel.terminate!
    elsif message = channel.receive
      yield message
    else
      return channel.wait
    end
  end
ensure
  @running.delete(io)
end