Class: Celluloid::Supervision::Container::Pool

Inherits:
Object
  • Object
show all
Includes:
Behavior, Celluloid
Defined in:
lib/celluloid/supervision/container/pool.rb,
lib/celluloid/supervision/container/behavior/pool.rb

Overview

Manages a fixed-size pool of actors Delegates work (i.e. methods) and supervises actors Don’t use this class directly. Instead use MyKlass.pool

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options = {}) ⇒ Pool

Returns a new instance of Pool.



15
16
17
18
19
20
21
22
23
24
25
26
27
# File 'lib/celluloid/supervision/container/pool.rb', line 15

def initialize(options={})
  @idle = []
  @busy = []
  @klass = options[:actors]
  @actors = Set.new
  @mutex = Mutex.new

  @size = options[:size] || [Celluloid.cores || 2, 2].max
  @args = options[:args] ? Array(options[:args]) : []

  # Do this last since it can suspend and/or crash
  @idle = @size.times.map { __spawn_actor__ }
end

Dynamic Method Handling

This class handles dynamic methods through the method_missing method

#method_missing(method, *args, &block) ⇒ Object



197
198
199
200
201
202
203
# File 'lib/celluloid/supervision/container/pool.rb', line 197

def method_missing(method, *args, &block)
  if respond_to?(method)
    _send_ method, *args, &block
  else
    super
  end
end

Instance Attribute Details

#actorsObject (readonly)

Returns the value of attribute actors.



13
14
15
# File 'lib/celluloid/supervision/container/pool.rb', line 13

def actors
  @actors
end

#sizeObject

Returns the value of attribute size.



13
14
15
# File 'lib/celluloid/supervision/container/pool.rb', line 13

def size
  @size
end

Class Method Details

.pooling_options(config = {}, mixins = {}) ⇒ Object



50
51
52
53
54
# File 'lib/celluloid/supervision/container/behavior/pool.rb', line 50

def pooling_options(config={},mixins={})
  combined = { :type => Celluloid::Supervision::Container::Pool }.merge(config).merge(mixins)
  combined[:args] = [[:block, :actors, :size, :args].inject({}) { |e,p| e[p] = combined.delete(p) if combined[p]; e }]
  combined
end

Instance Method Details

#__busyObject



121
122
123
# File 'lib/celluloid/supervision/container/pool.rb', line 121

def __busy
  @busy
end

#__busy?(actor) ⇒ Boolean

Returns:

  • (Boolean)


117
118
119
# File 'lib/celluloid/supervision/container/pool.rb', line 117

def __busy?(actor)
  @busy.include? actor
end

#__crash_handler__(actor, reason) ⇒ Object

Spawn a new worker for every crashed one



163
164
165
166
167
168
169
170
171
172
173
# File 'lib/celluloid/supervision/container/pool.rb', line 163

def __crash_handler__(actor, reason)
  @mutex.synchronize {
    @busy.delete actor
    @idle.delete actor
    @actors.delete actor
  }
  return unless reason

  @idle << __spawn_actor__
  signal :respawn_complete
end

#__idleObject



125
126
127
# File 'lib/celluloid/supervision/container/pool.rb', line 125

def __idle
  @idle
end

#__idle?(actor) ⇒ Boolean

Returns:

  • (Boolean)


113
114
115
# File 'lib/celluloid/supervision/container/pool.rb', line 113

def __idle?(actor)
  @idle.include? actor
end

#__idling?Boolean

Returns:

  • (Boolean)


129
130
131
# File 'lib/celluloid/supervision/container/pool.rb', line 129

def __idling?
  @mutex.synchronize { @idle.empty? }
end

#__provision_actor__Object

Provision a new actor ( take it out of idle, move it into busy, and avail it )



147
148
149
150
151
152
153
154
155
156
157
158
159
160
# File 'lib/celluloid/supervision/container/pool.rb', line 147

def __provision_actor__
  Task.current.guard_warnings = true
  while __idling?
    # Wait for responses from one of the busy actors
    response = exclusive { receive { |msg| msg.is_a?(Internals::Response) } }
    Thread.current[:celluloid_actor].handle_message(response)
  end

  @mutex.synchronize {
    actor = @idle.shift
    @busy << actor
    actor
  }
end

#__shutdown__Object



29
30
31
32
33
34
35
36
37
38
39
40
# File 'lib/celluloid/supervision/container/pool.rb', line 29

def __shutdown__
  return unless @actors
  # TODO: these can be nil if initializer crashes
  terminators = @actors.map do |actor|
    begin
      actor.future(:terminate)
    rescue DeadActorError
    end
  end

  terminators.compact.each { |terminator| terminator.value rescue nil }
end

#__spawn_actor__Object

Instantiate an actor, add it to the actor Set, and return it



140
141
142
143
144
# File 'lib/celluloid/supervision/container/pool.rb', line 140

def __spawn_actor__
  actor = @klass.new_link(*@args)
  @mutex.synchronize { @actors.add(actor) }
  actor
end

#__state(actor) ⇒ Object



133
134
135
136
137
# File 'lib/celluloid/supervision/container/pool.rb', line 133

def __state(actor)
  return :busy if __busy?(actor)
  return :idle if __idle?(actor)
  :missing
end

#_send_(method, *args, &block) ⇒ Object



42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
# File 'lib/celluloid/supervision/container/pool.rb', line 42

def _send_(method, *args, &block)
  actor = __provision_actor__
  begin
    actor._send_ method, *args, &block
  rescue DeadActorError # if we get a dead actor out of the pool
    wait :respawn_complete
    actor = __provision_actor__
    retry
  rescue Exception => ex
    abort ex
  ensure
    if actor.alive?
      @idle << actor
      @busy.delete actor

      # Broadcast that actor is done processing and
      # waiting idle
      signal :actor_idle
    end
  end
end

#busy_sizeObject



105
106
107
# File 'lib/celluloid/supervision/container/pool.rb', line 105

def busy_size
  @busy.length
end

#idle_sizeObject



109
110
111
# File 'lib/celluloid/supervision/container/pool.rb', line 109

def idle_size
  @idle.length
end

#inspectObject



84
85
86
# File 'lib/celluloid/supervision/container/pool.rb', line 84

def inspect
  _send_ :inspect
end

#is_a?(klass) ⇒ Boolean

Returns:

  • (Boolean)


68
69
70
# File 'lib/celluloid/supervision/container/pool.rb', line 68

def is_a?(klass)
  _send_ :is_a?, klass
end

#kind_of?(klass) ⇒ Boolean

Returns:

  • (Boolean)


72
73
74
# File 'lib/celluloid/supervision/container/pool.rb', line 72

def kind_of?(klass)
  _send_ :kind_of?, klass
end

#method(meth) ⇒ Object

Since Pool allocates worker objects only just before calling them, we can still help Celluloid::Call detect passing invalid parameters to async methods by checking for those methods on the worker class



208
209
210
211
212
# File 'lib/celluloid/supervision/container/pool.rb', line 208

def method(meth)
  super
rescue NameError
  @klass.instance_method(meth.to_sym)
end

#methods(include_ancestors = true) ⇒ Object



76
77
78
# File 'lib/celluloid/supervision/container/pool.rb', line 76

def methods(include_ancestors = true)
  _send_ :methods, include_ancestors
end

#nameObject



64
65
66
# File 'lib/celluloid/supervision/container/pool.rb', line 64

def name
  _send_ @mailbox, :name
end

#respond_to?(meth, include_private = false) ⇒ Boolean

Returns:

  • (Boolean)


175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
# File 'lib/celluloid/supervision/container/pool.rb', line 175

def respond_to?(meth, include_private = false)
  # NOTE: use method() here since this class
  # shouldn't be used directly, and method() is less
  # likely to be "reimplemented" inconsistently
  # with other Object.*method* methods.

  found = method(meth)
  if include_private
    found ? true : false
  else
    if found.is_a?(UnboundMethod)
      found.owner.public_instance_methods.include?(meth) ||
        found.owner.protected_instance_methods.include?(meth)
    else
      found.receiver.public_methods.include?(meth) ||
        found.receiver.protected_methods.include?(meth)
    end
  end
rescue NameError
  false
end

#to_sObject



80
81
82
# File 'lib/celluloid/supervision/container/pool.rb', line 80

def to_s
  _send_ :to_s
end