Class: Async::Container::Threaded

Inherits:
Controller show all
Defined in:
lib/async/container/threaded.rb

Overview

Manages a reactor within one or more threads.

Defined Under Namespace

Classes: Instance

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods inherited from Controller

#async, #attach, #run

Constructor Details

#initializeThreaded

Returns a new instance of Threaded.



57
58
59
60
61
62
63
# File 'lib/async/container/threaded.rb', line 57

def initialize
	super
	
	@threads = []
	@running = true
	@statistics = Statistics.new
end

Instance Attribute Details

#statisticsObject (readonly)

Returns the value of attribute statistics.



65
66
67
# File 'lib/async/container/threaded.rb', line 65

def statistics
  @statistics
end

Class Method Details

.multiprocess?Boolean

Returns:

  • (Boolean)


53
54
55
# File 'lib/async/container/threaded.rb', line 53

def self.multiprocess?
	false
end

.run(*args, &block) ⇒ Object



49
50
51
# File 'lib/async/container/threaded.rb', line 49

def self.run(*args, &block)
	self.new.run(*args, &block)
end

Instance Method Details

#spawn(name: nil, restart: false, &block) ⇒ Object



67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
# File 'lib/async/container/threaded.rb', line 67

def spawn(name: nil, restart: false, &block)
	@statistics.spawn!
	
	thread = ::Thread.new do
		thread = ::Thread.current
		
		thread.name = name if name
		
		instance = Instance.new(thread)
		
		while @running
			begin
				yield instance
			rescue Exception => exception
				Async.logger.error(self) {exception}
				
				@statistics.failure!
			end
			
			if restart
				@statistics.restart!
			else
				break
			end
		end
	# rescue Interrupt
	# 	# Graceful exit.
	end
	
	@threads << thread
	
	return self
end

#stop(graceful = true) ⇒ Object

Gracefully shut down all reactors.



111
112
113
114
115
116
117
118
119
120
121
122
123
124
# File 'lib/async/container/threaded.rb', line 111

def stop(graceful = true)
	@running = false
	super
	
	if graceful
		@threads.each{|thread| thread.raise(Interrupt)}
	else
		@threads.each(&:kill)
	end
	
	self.wait
ensure
	@running = true
end

#wait(forever = false) ⇒ Object



101
102
103
104
105
106
107
108
# File 'lib/async/container/threaded.rb', line 101

def wait(forever = false)
	@threads.each(&:join)
	@threads.clear
	
	sleep if forever
rescue Interrupt
	# Graceful exit.
end