Class: Celluloid::Worker::Manager

Inherits:
Object
  • Object
show all
Includes:
Celluloid
Defined in:
lib/celluloid/worker.rb

Overview

Delegates work (i.e. methods) and supervises workers

Constant Summary

Constants included from Celluloid

SHUTDOWN_TIMEOUT, VERSION

Instance Method Summary collapse

Methods included from Celluloid

#abort, actor?, #after, #alive?, #async, cores, current_actor, #current_actor, #defer, #every, exception_handler, #exclusive, exclusive?, #future, included, #inspect, #link, #linked_to?, #links, #name, #notify_link, #notify_unlink, #receive, receive, shutdown, #signal, #sleep, sleep, #tasks, #terminate, #unlink, uuid, version, #wait, #wrapped_object

Constructor Details

#initialize(worker_class, options = {}) ⇒ Manager

Returns a new instance of Manager.

Raises:

  • (ArgumentError)


26
27
28
29
30
31
32
33
34
35
# File 'lib/celluloid/worker.rb', line 26

def initialize(worker_class, options = {})
  @size = options[:size]
  raise ArgumentError, "minimum pool size is 2" if @size && @size < 2
  
  @size ||= [Celluloid.cores, 2].max
  @args = options[:args] ? Array(options[:args]) : []
  
  @worker_class = worker_class
  @idle = @size.times.map { worker_class.new_link(*@args) }
end

Dynamic Method Handling

This class handles dynamic methods through the method_missing method

#method_missing(method, *args, &block) ⇒ Object



69
70
71
72
73
74
75
# File 'lib/celluloid/worker.rb', line 69

def method_missing(method, *args, &block)
  if respond_to?(method)
    execute method, *args, &block
  else
    super
  end
end

Instance Method Details

#crash_handler(actor, reason) ⇒ Object

Spawn a new worker for every crashed one



60
61
62
63
# File 'lib/celluloid/worker.rb', line 60

def crash_handler(actor, reason)
  return unless reason # don't restart workers that exit cleanly
  @idle << @worker_class.new_link(*@args)
end

#execute(method, *args, &block) ⇒ Object

Execute the given method within a worker



38
39
40
41
42
43
44
45
46
# File 'lib/celluloid/worker.rb', line 38

def execute(method, *args, &block)
  worker = provision_worker
  
  begin
    worker._send_ method, *args, &block
  ensure
    @idle << worker if worker.alive?
  end
end

#provision_workerObject

Provision a new worker



49
50
51
52
53
54
55
56
57
# File 'lib/celluloid/worker.rb', line 49

def provision_worker
  while @idle.empty?
    # Using exclusive mode blocks incoming messages, so they don't pile
    # up as waiting Celluloid::Tasks
    response = exclusive { receive { |msg| msg.is_a? Response } }
    Thread.current[:actor].handle_message(response)
  end
  @idle.shift
end

#respond_to?(method) ⇒ Boolean

Returns:

  • (Boolean)


65
66
67
# File 'lib/celluloid/worker.rb', line 65

def respond_to?(method)
  super || (@worker_class ? @worker_class.instance_methods.include?(method.to_sym) : false)
end