Class: Celluloid::Supervision::Container::Pool
- Inherits:
-
Object
- Object
- Celluloid::Supervision::Container::Pool
show all
- Includes:
- Behavior, Celluloid
- Defined in:
- lib/celluloid/supervision/container/pool.rb,
lib/celluloid/supervision/container/behavior/pool.rb
Overview
Manages a fixed-size pool of actors Delegates work (i.e. methods) and supervises actors Don’t use this class directly. Instead use MyKlass.pool
Instance Attribute Summary collapse
Class Method Summary
collapse
Instance Method Summary
collapse
-
#__busy ⇒ Object
-
#__busy?(actor) ⇒ Boolean
-
#__crash_handler__(actor, reason) ⇒ Object
Spawn a new worker for every crashed one.
-
#__idle ⇒ Object
-
#__idle?(actor) ⇒ Boolean
-
#__provision_actor__ ⇒ Object
Provision a new actor ( take it out of idle, move it into busy, and avail it ).
-
#__shutdown__ ⇒ Object
-
#__spawn_actor__ ⇒ Object
Instantiate an actor, add it to the actor Set, and return it.
-
#__state(actor) ⇒ Object
-
#_send_(method, *args, &block) ⇒ Object
-
#busy_size ⇒ Object
-
#idle_size ⇒ Object
-
#initialize(options = {}) ⇒ Pool
constructor
-
#inspect ⇒ Object
-
#is_a?(klass) ⇒ Boolean
-
#kind_of?(klass) ⇒ Boolean
-
#method(meth) ⇒ Object
Since Pool allocates worker objects only just before calling them, we can still help Celluloid::Call detect passing invalid parameters to async methods by checking for those methods on the worker class.
-
#method_missing(method, *args, &block) ⇒ Object
-
#methods(include_ancestors = true) ⇒ Object
-
#name ⇒ Object
-
#respond_to?(meth, include_private = false) ⇒ Boolean
-
#to_s ⇒ Object
Constructor Details
#initialize(options = {}) ⇒ Pool
Returns a new instance of Pool.
15
16
17
18
19
20
21
22
23
24
25
26
27
|
# File 'lib/celluloid/supervision/container/pool.rb', line 15
def initialize(options={})
@idle = []
@busy = []
@klass = options[:actors]
@actors = Set.new
@mutex = Mutex.new
@size = options[:size] || [Celluloid.cores || 2, 2].max
@args = options[:args] ? Array(options[:args]) : []
@idle = @size.times.map { __spawn_actor__ }
end
|
Dynamic Method Handling
This class handles dynamic methods through the method_missing method
#method_missing(method, *args, &block) ⇒ Object
191
192
193
194
195
196
197
|
# File 'lib/celluloid/supervision/container/pool.rb', line 191
def method_missing(method, *args, &block)
if respond_to?(method)
_send_ method, *args, &block
else
super
end
end
|
Instance Attribute Details
#actors ⇒ Object
Returns the value of attribute actors.
13
14
15
|
# File 'lib/celluloid/supervision/container/pool.rb', line 13
def actors
@actors
end
|
#size ⇒ Object
Returns the value of attribute size.
13
14
15
|
# File 'lib/celluloid/supervision/container/pool.rb', line 13
def size
@size
end
|
Class Method Details
.pooling_options(config = {}, mixins = {}) ⇒ Object
50
51
52
53
54
|
# File 'lib/celluloid/supervision/container/behavior/pool.rb', line 50
def pooling_options(config={}, mixins={})
combined = {type: Celluloid::Supervision::Container::Pool}.merge(config).merge(mixins)
combined[:args] = [[:block, :actors, :size, :args].inject({}) { |e, p| e[p] = combined.delete(p) if combined[p]; e }]
combined
end
|
Instance Method Details
#__busy ⇒ Object
121
122
123
|
# File 'lib/celluloid/supervision/container/pool.rb', line 121
def __busy
@mutex.synchronize { @busy }
end
|
#__busy?(actor) ⇒ Boolean
117
118
119
|
# File 'lib/celluloid/supervision/container/pool.rb', line 117
def __busy?(actor)
@mutex.synchronize { @busy.include? actor }
end
|
#__crash_handler__(actor, reason) ⇒ Object
Spawn a new worker for every crashed one
160
161
162
163
164
165
166
167
|
# File 'lib/celluloid/supervision/container/pool.rb', line 160
def __crash_handler__(actor, reason)
@busy.delete actor
@idle.delete actor
@actors.delete actor
return unless reason
@idle << __spawn_actor__
signal :respawn_complete
end
|
#__idle ⇒ Object
125
126
127
|
# File 'lib/celluloid/supervision/container/pool.rb', line 125
def __idle
@mutex.synchronize { @idle }
end
|
#__idle?(actor) ⇒ Boolean
113
114
115
|
# File 'lib/celluloid/supervision/container/pool.rb', line 113
def __idle?(actor)
@mutex.synchronize { @idle.include? actor }
end
|
#__provision_actor__ ⇒ Object
Provision a new actor ( take it out of idle, move it into busy, and avail it )
144
145
146
147
148
149
150
151
152
153
154
155
156
157
|
# File 'lib/celluloid/supervision/container/pool.rb', line 144
def __provision_actor__
Task.current.guard_warnings = true
@mutex.synchronize do
while @idle.empty?
response = exclusive { receive { |msg| msg.is_a?(Internals::Response) } }
Thread.current[:celluloid_actor].handle_message(response)
end
actor = @idle.shift
@busy << actor
actor
end
end
|
#__shutdown__ ⇒ Object
29
30
31
32
33
34
35
36
37
38
39
40
|
# File 'lib/celluloid/supervision/container/pool.rb', line 29
def __shutdown__
return unless defined?(@actors) && @actors
terminators = @actors.map do |actor|
begin
actor.future(:terminate)
rescue DeadActorError
end
end
terminators.compact.each { |terminator| terminator.value rescue nil }
end
|
#__spawn_actor__ ⇒ Object
Instantiate an actor, add it to the actor Set, and return it
136
137
138
139
140
141
|
# File 'lib/celluloid/supervision/container/pool.rb', line 136
def __spawn_actor__
actor = @klass.new_link(*@args)
@mutex.synchronize { @actors.add(actor) }
@actors.add(actor)
actor
end
|
#__state(actor) ⇒ Object
129
130
131
132
133
|
# File 'lib/celluloid/supervision/container/pool.rb', line 129
def __state(actor)
return :busy if __busy?(actor)
return :idle if __idle?(actor)
:missing
end
|
#_send_(method, *args, &block) ⇒ Object
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
|
# File 'lib/celluloid/supervision/container/pool.rb', line 42
def _send_(method, *args, &block)
actor = __provision_actor__
begin
actor._send_ method, *args, &block
rescue DeadActorError wait :respawn_complete
actor = __provision_actor__
retry
rescue ::Exception => ex
abort ex
ensure
if actor.alive?
@idle << actor
@busy.delete actor
signal :actor_idle
end
end
end
|
#busy_size ⇒ Object
105
106
107
|
# File 'lib/celluloid/supervision/container/pool.rb', line 105
def busy_size
@mutex.synchronize { @busy.length }
end
|
#idle_size ⇒ Object
109
110
111
|
# File 'lib/celluloid/supervision/container/pool.rb', line 109
def idle_size
@mutex.synchronize { @idle.length }
end
|
#inspect ⇒ Object
84
85
86
|
# File 'lib/celluloid/supervision/container/pool.rb', line 84
def inspect
_send_ :inspect
end
|
#is_a?(klass) ⇒ Boolean
68
69
70
|
# File 'lib/celluloid/supervision/container/pool.rb', line 68
def is_a?(klass)
_send_ :is_a?, klass
end
|
#kind_of?(klass) ⇒ Boolean
72
73
74
|
# File 'lib/celluloid/supervision/container/pool.rb', line 72
def kind_of?(klass)
_send_ :kind_of?, klass
end
|
#method(meth) ⇒ Object
Since Pool allocates worker objects only just before calling them, we can still help Celluloid::Call detect passing invalid parameters to async methods by checking for those methods on the worker class
202
203
204
205
206
|
# File 'lib/celluloid/supervision/container/pool.rb', line 202
def method(meth)
super
rescue NameError
@klass.instance_method(meth.to_sym)
end
|
#methods(include_ancestors = true) ⇒ Object
76
77
78
|
# File 'lib/celluloid/supervision/container/pool.rb', line 76
def methods(include_ancestors = true)
_send_ :methods, include_ancestors
end
|
#name ⇒ Object
64
65
66
|
# File 'lib/celluloid/supervision/container/pool.rb', line 64
def name
_send_ @mailbox, :name
end
|
#respond_to?(meth, include_private = false) ⇒ Boolean
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
|
# File 'lib/celluloid/supervision/container/pool.rb', line 169
def respond_to?(meth, include_private = false)
found = method(meth)
if include_private
found ? true : false
else
if found.is_a?(UnboundMethod)
found.owner.public_instance_methods.include?(meth) ||
found.owner.protected_instance_methods.include?(meth)
else
found.receiver.public_methods.include?(meth) ||
found.receiver.protected_methods.include?(meth)
end
end
rescue NameError
false
end
|
#to_s ⇒ Object
80
81
82
|
# File 'lib/celluloid/supervision/container/pool.rb', line 80
def to_s
_send_ :to_s
end
|