Class: Async::Container::Group

Inherits:
Object
  • Object
show all
Defined in:
lib/async/container/group.rb

Overview

Manages a group of running processes.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(health_check_interval: 1.0) ⇒ Group

Initialize an empty group.



24
25
26
27
28
29
30
31
32
# File 'lib/async/container/group.rb', line 24

def initialize(health_check_interval: 1.0)
  @health_check_interval = health_check_interval
  
  # The running fibers, indexed by IO:
  @running = {}
  
  # This queue allows us to wait for processes to complete, without spawning new processes as a result.
  @queue = nil
end

Instance Attribute Details

#runningObject (readonly)

Returns the value of attribute running.



40
41
42
# File 'lib/async/container/group.rb', line 40

def running
  @running
end

#the running tasks, indexed by IO.(runningtasks, indexedbyIO.) ⇒ Object (readonly)



40
# File 'lib/async/container/group.rb', line 40

attr :running

Instance Method Details

#any?Boolean

Whether the group contains any running processes.

Returns:

  • (Boolean)


55
56
57
# File 'lib/async/container/group.rb', line 55

def any?
  @running.any?
end

#empty?Boolean

Whether the group is empty.

Returns:

  • (Boolean)


61
62
63
# File 'lib/async/container/group.rb', line 61

def empty?
  @running.empty?
end

#health_check!Object

Perform a health check on all running processes.



109
110
111
112
113
# File 'lib/async/container/group.rb', line 109

def health_check!
  each_running do |fiber|
    fiber.resume(:health_check!)
  end
end

#inspectObject



35
36
37
# File 'lib/async/container/group.rb', line 35

def inspect
  "#<#{self.class} running=#{@running.size}>"
end

#interruptObject

Interrupt all running processes. This resumes the controlling fiber with an instance of Interrupt.



117
118
119
120
121
122
# File 'lib/async/container/group.rb', line 117

def interrupt
  Console.info(self, "Sending interrupt to #{@running.size} running processes...")
  each_running do |fiber|
    fiber.resume(Interrupt)
  end
end

#killObject

Kill all running processes. This resumes the controlling fiber with an instance of Kill.



135
136
137
138
139
140
# File 'lib/async/container/group.rb', line 135

def kill
  Console.info(self, "Sending kill to #{@running.size} running processes...")
  each_running do |fiber|
    fiber.resume(Kill)
  end
end

#running?Boolean

Whether the group contains any running processes.

Returns:

  • (Boolean)


49
50
51
# File 'lib/async/container/group.rb', line 49

def running?
  @running.any?
end

#sizeObject



43
44
45
# File 'lib/async/container/group.rb', line 43

def size
  @running.size
end

#sleep(duration) ⇒ Object

Sleep for at most the specified duration until some state change occurs.



66
67
68
69
70
71
# File 'lib/async/container/group.rb', line 66

def sleep(duration)
  self.resume
  self.suspend
  
  self.wait_for_children(duration)
end

#stop(graceful = true, interrupt_timeout: INTERRUPT_TIMEOUT, terminate_timeout: TERMINATE_TIMEOUT) ⇒ Object

Stop all child processes with a multi-phase shutdown sequence.

A graceful shutdown performs the following sequence:

  1. Send SIGINT and wait up to ‘interrupt_timeout` seconds

  2. Send SIGTERM and wait up to ‘terminate_timeout` seconds

  3. Send SIGKILL and wait indefinitely for process cleanup

If ‘graceful` is false, skips the SIGINT phase and goes directly to SIGTERM → SIGKILL.



167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
# File 'lib/async/container/group.rb', line 167

def stop(graceful = true, interrupt_timeout: INTERRUPT_TIMEOUT, terminate_timeout: TERMINATE_TIMEOUT)
  case graceful
  when true
    # Use defaults.
  when false
    interrupt_timeout = nil
  when Numeric
    interrupt_timeout = graceful
    terminate_timeout = graceful
  end
  
  Console.debug(self, "Stopping all processes...", interrupt_timeout: interrupt_timeout, terminate_timeout: terminate_timeout)
  
  # If a timeout is specified, interrupt the children first:
  if interrupt_timeout
    clock = Async::Clock.start
    
    # Interrupt the children:
    self.interrupt
    
    # Wait for the children to exit:
    self.wait_for_exit(clock, interrupt_timeout)
  end
  
  if terminate_timeout and self.any?
    clock = Async::Clock.start
    
    # If the children are still running, terminate them:
    self.terminate
    
    # Wait for the children to exit:
    self.wait_for_exit(clock, terminate_timeout)
  end
  
  if any?
    self.kill
    self.wait
  end
end

#terminateObject

Terminate all running processes. This resumes the controlling fiber with an instance of Terminate.



126
127
128
129
130
131
# File 'lib/async/container/group.rb', line 126

def terminate
  Console.info(self, "Sending terminate to #{@running.size} running processes...")
  each_running do |fiber|
    fiber.resume(Terminate)
  end
end

#waitObject

Begin any outstanding queued processes and wait for them indefinitely.



74
75
76
77
78
79
80
# File 'lib/async/container/group.rb', line 74

def wait
  self.resume
  
  with_health_checks do |duration|
    self.wait_for_children(duration)
  end
end

#wait_for(channel) ⇒ Object

Wait for a message in the specified Channel.



208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
# File 'lib/async/container/group.rb', line 208

def wait_for(channel)
  io = channel.in
  
  @running[io] = Fiber.current
  
  while @running.key?(io)
    # Wait for some event on the channel:
    result = Fiber.yield
    
    if result == Interrupt
      channel.interrupt!
    elsif result == Terminate
      channel.terminate!
    elsif result == Kill
      channel.kill!
    elsif result
      yield result
    elsif message = channel.receive
      yield message
    else
      # Wait for the channel to exit:
      return channel.wait
    end
  end
ensure
  @running.delete(io)
end