Class: Celluloid::Supervision::Container::Pool

Inherits:
Object
  • Object
show all
Includes:
Behavior, Celluloid
Defined in:
lib/celluloid/supervision/container/pool.rb,
lib/celluloid/supervision/container/behavior/pool.rb

Overview

Manages a fixed-size pool of actors Delegates work (i.e. methods) and supervises actors Don’t use this class directly. Instead use MyKlass.pool

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options = {}) ⇒ Pool

Returns a new instance of Pool.



15
16
17
18
19
20
21
22
23
24
25
26
27
# File 'lib/celluloid/supervision/container/pool.rb', line 15

def initialize(options={})
  @idle = []
  @busy = []
  @klass = options[:actors]
  @actors = Set.new
  @mutex = Mutex.new

  @size = options[:size] || [Celluloid.cores || 2, 2].max
  @args = options[:args] ? Array(options[:args]) : []

  # Do this last since it can suspend and/or crash
  @idle = @size.times.map { __spawn_actor__ }
end

Dynamic Method Handling

This class handles dynamic methods through the method_missing method

#method_missing(method, *args, &block) ⇒ Object



191
192
193
194
195
196
197
# File 'lib/celluloid/supervision/container/pool.rb', line 191

def method_missing(method, *args, &block)
  if respond_to?(method)
    _send_ method, *args, &block
  else
    super
  end
end

Instance Attribute Details

#actorsObject (readonly)

Returns the value of attribute actors.



13
14
15
# File 'lib/celluloid/supervision/container/pool.rb', line 13

def actors
  @actors
end

#sizeObject

Returns the value of attribute size.



13
14
15
# File 'lib/celluloid/supervision/container/pool.rb', line 13

def size
  @size
end

Class Method Details

.pooling_options(config = {}, mixins = {}) ⇒ Object



50
51
52
53
54
# File 'lib/celluloid/supervision/container/behavior/pool.rb', line 50

def pooling_options(config={}, mixins={})
  combined = {type: Celluloid::Supervision::Container::Pool}.merge(config).merge(mixins)
  combined[:args] = [[:block, :actors, :size, :args].inject({}) { |e, p| e[p] = combined.delete(p) if combined[p]; e }]
  combined
end

Instance Method Details

#__busyObject



121
122
123
# File 'lib/celluloid/supervision/container/pool.rb', line 121

def __busy
  @mutex.synchronize { @busy }
end

#__busy?(actor) ⇒ Boolean

Returns:

  • (Boolean)


117
118
119
# File 'lib/celluloid/supervision/container/pool.rb', line 117

def __busy?(actor)
  @mutex.synchronize { @busy.include? actor }
end

#__crash_handler__(actor, reason) ⇒ Object

Spawn a new worker for every crashed one



160
161
162
163
164
165
166
167
# File 'lib/celluloid/supervision/container/pool.rb', line 160

def __crash_handler__(actor, reason)
  @busy.delete actor
  @idle.delete actor
  @actors.delete actor
  return unless reason
  @idle << __spawn_actor__
  signal :respawn_complete
end

#__idleObject



125
126
127
# File 'lib/celluloid/supervision/container/pool.rb', line 125

def __idle
  @mutex.synchronize { @idle }
end

#__idle?(actor) ⇒ Boolean

Returns:

  • (Boolean)


113
114
115
# File 'lib/celluloid/supervision/container/pool.rb', line 113

def __idle?(actor)
  @mutex.synchronize { @idle.include? actor }
end

#__provision_actor__Object

Provision a new actor ( take it out of idle, move it into busy, and avail it )



144
145
146
147
148
149
150
151
152
153
154
155
156
157
# File 'lib/celluloid/supervision/container/pool.rb', line 144

def __provision_actor__
  Task.current.guard_warnings = true
  @mutex.synchronize do
    while @idle.empty?
      # Wait for responses from one of the busy actors
      response = exclusive { receive { |msg| msg.is_a?(Internals::Response) } }
      Thread.current[:celluloid_actor].handle_message(response)
    end

    actor = @idle.shift
    @busy << actor
    actor
  end
end

#__shutdown__Object



29
30
31
32
33
34
35
36
37
38
39
40
# File 'lib/celluloid/supervision/container/pool.rb', line 29

def __shutdown__
  return unless defined?(@actors) && @actors
  # TODO: these can be nil if initializer crashes
  terminators = @actors.map do |actor|
    begin
      actor.future(:terminate)
    rescue DeadActorError
    end
  end

  terminators.compact.each { |terminator| terminator.value rescue nil }
end

#__spawn_actor__Object

Instantiate an actor, add it to the actor Set, and return it



136
137
138
139
140
141
# File 'lib/celluloid/supervision/container/pool.rb', line 136

def __spawn_actor__
  actor = @klass.new_link(*@args)
  @mutex.synchronize { @actors.add(actor) }
  @actors.add(actor)
  actor
end

#__state(actor) ⇒ Object



129
130
131
132
133
# File 'lib/celluloid/supervision/container/pool.rb', line 129

def __state(actor)
  return :busy if __busy?(actor)
  return :idle if __idle?(actor)
  :missing
end

#_send_(method, *args, &block) ⇒ Object



42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
# File 'lib/celluloid/supervision/container/pool.rb', line 42

def _send_(method, *args, &block)
  actor = __provision_actor__
  begin
    actor._send_ method, *args, &block
  rescue DeadActorError # if we get a dead actor out of the pool
    wait :respawn_complete
    actor = __provision_actor__
    retry
  rescue ::Exception => ex
    abort ex
  ensure
    if actor.alive?
      @idle << actor
      @busy.delete actor

      # Broadcast that actor is done processing and
      # waiting idle
      signal :actor_idle
    end
  end
end

#busy_sizeObject



105
106
107
# File 'lib/celluloid/supervision/container/pool.rb', line 105

def busy_size
  @mutex.synchronize { @busy.length }
end

#idle_sizeObject



109
110
111
# File 'lib/celluloid/supervision/container/pool.rb', line 109

def idle_size
  @mutex.synchronize { @idle.length }
end

#inspectObject



84
85
86
# File 'lib/celluloid/supervision/container/pool.rb', line 84

def inspect
  _send_ :inspect
end

#is_a?(klass) ⇒ Boolean

Returns:

  • (Boolean)


68
69
70
# File 'lib/celluloid/supervision/container/pool.rb', line 68

def is_a?(klass)
  _send_ :is_a?, klass
end

#kind_of?(klass) ⇒ Boolean

Returns:

  • (Boolean)


72
73
74
# File 'lib/celluloid/supervision/container/pool.rb', line 72

def kind_of?(klass)
  _send_ :kind_of?, klass
end

#method(meth) ⇒ Object

Since Pool allocates worker objects only just before calling them, we can still help Celluloid::Call detect passing invalid parameters to async methods by checking for those methods on the worker class



202
203
204
205
206
# File 'lib/celluloid/supervision/container/pool.rb', line 202

def method(meth)
  super
rescue NameError
  @klass.instance_method(meth.to_sym)
end

#methods(include_ancestors = true) ⇒ Object



76
77
78
# File 'lib/celluloid/supervision/container/pool.rb', line 76

def methods(include_ancestors = true)
  _send_ :methods, include_ancestors
end

#nameObject



64
65
66
# File 'lib/celluloid/supervision/container/pool.rb', line 64

def name
  _send_ @mailbox, :name
end

#respond_to?(meth, include_private = false) ⇒ Boolean

Returns:

  • (Boolean)


169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
# File 'lib/celluloid/supervision/container/pool.rb', line 169

def respond_to?(meth, include_private = false)
  # NOTE: use method() here since this class
  # shouldn't be used directly, and method() is less
  # likely to be "reimplemented" inconsistently
  # with other Object.*method* methods.

  found = method(meth)
  if include_private
    found ? true : false
  else
    if found.is_a?(UnboundMethod)
      found.owner.public_instance_methods.include?(meth) ||
        found.owner.protected_instance_methods.include?(meth)
    else
      found.receiver.public_methods.include?(meth) ||
        found.receiver.protected_methods.include?(meth)
    end
  end
rescue NameError
  false
end

#to_sObject



80
81
82
# File 'lib/celluloid/supervision/container/pool.rb', line 80

def to_s
  _send_ :to_s
end