Class: Async::Container::Group

Inherits:
Object
  • Object
show all
Defined in:
lib/async/container/group.rb

Overview

Manages a group of running processes.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeGroup

Initialize an empty group.



16
17
18
19
20
21
# File 'lib/async/container/group.rb', line 16

def initialize
	@running = {}
	
	# This queue allows us to wait for processes to complete, without spawning new processes as a result.
	@queue = nil
end

Instance Attribute Details

#runningObject (readonly)

Returns the value of attribute running.



28
29
30
# File 'lib/async/container/group.rb', line 28

def running
  @running
end

#the running tasks, indexed by IO.(runningtasks, indexedbyIO.) ⇒ Object (readonly)



28
# File 'lib/async/container/group.rb', line 28

attr :running

Instance Method Details

#any?Boolean

Whether the group contains any running processes.

Returns:

  • (Boolean)


38
39
40
# File 'lib/async/container/group.rb', line 38

def any?
	@running.any?
end

#empty?Boolean

Whether the group is empty.

Returns:

  • (Boolean)


44
45
46
# File 'lib/async/container/group.rb', line 44

def empty?
	@running.empty?
end

#inspectObject



23
24
25
# File 'lib/async/container/group.rb', line 23

def inspect
	"#<#{self.class} running=#{@running.size}>"
end

#interruptObject

Interrupt all running processes. This resumes the controlling fiber with an instance of Interrupt.



67
68
69
70
71
72
# File 'lib/async/container/group.rb', line 67

def interrupt
	Console.logger.debug(self, "Sending interrupt to #{@running.size} running processes...")
	@running.each_value do |fiber|
		fiber.resume(Interrupt)
	end
end

#running?Boolean

Whether the group contains any running processes.

Returns:

  • (Boolean)


32
33
34
# File 'lib/async/container/group.rb', line 32

def running?
	@running.any?
end

#sleep(duration) ⇒ Object

Sleep for at most the specified duration until some state change occurs.



49
50
51
52
53
54
# File 'lib/async/container/group.rb', line 49

def sleep(duration)
	self.resume
	self.suspend
	
	self.wait_for_children(duration)
end

#stop(timeout = 1) ⇒ Object

Stop all child processes using #terminate.



85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
# File 'lib/async/container/group.rb', line 85

def stop(timeout = 1)
	# Use a default timeout if not specified:
	timeout = 1 if timeout == true
	
	if timeout
		start_time = Async::Clock.now
		
		self.interrupt
		
		while self.any?
			duration = Async::Clock.now - start_time
			remaining = timeout - duration
			
			if remaining >= 0
				self.wait_for_children(duration)
			else
				self.wait_for_children(0)
				break
			end
		end
	end
	
	# Terminate all children:
	self.terminate
	
	# Wait for all children to exit:
	self.wait
end

#terminateObject

Terminate all running processes. This resumes the controlling fiber with an instance of Terminate.



76
77
78
79
80
81
# File 'lib/async/container/group.rb', line 76

def terminate
	Console.logger.debug(self, "Sending terminate to #{@running.size} running processes...")
	@running.each_value do |fiber|
		fiber.resume(Terminate)
	end
end

#waitObject

Begin any outstanding queued processes and wait for them indefinitely.



57
58
59
60
61
62
63
# File 'lib/async/container/group.rb', line 57

def wait
	self.resume
	
	while self.running?
		self.wait_for_children
	end
end

#wait_for(channel) ⇒ Object

Wait for a message in the specified Channel.



115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
# File 'lib/async/container/group.rb', line 115

def wait_for(channel)
	io = channel.in
	
	@running[io] = Fiber.current
	
	while @running.key?(io)
		result = Fiber.yield
		
		if result == Interrupt
			channel.interrupt!
		elsif result == Terminate
			channel.terminate!
		elsif message = channel.receive
			yield message
		else
			return channel.wait
		end
	end
ensure
	@running.delete(io)
end