Class: WindUp::QueueManager

Inherits:
Object
  • Object
show all
Includes:
Celluloid
Defined in:
lib/wind_up/queue_manager.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(worker_class, options = {}) ⇒ QueueManager

Don’t use QueueManager.new, use Klass.queue instead

Raises:

  • (ArgumentError)


37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
# File 'lib/wind_up/queue_manager.rb', line 37

def initialize(worker_class, options = {})
  defaults = { :size => [Celluloid.cores, 2].max,
               :router => :first_available }
  options = defaults.merge options

  @worker_class = worker_class
  @args = options[:args] ? Array(options[:args]) : []
  @size = options[:size]

  router_class = Routers[options[:router]]
  raise ArgumentError, "Router class not recognized" unless router_class
  @router = router_class.new

  @registry = Celluloid::Registry.root
  @group    = []
  resize_group
end

Instance Attribute Details

#routerObject (readonly)

Returns the value of attribute router.



32
33
34
# File 'lib/wind_up/queue_manager.rb', line 32

def router
  @router
end

#sizeObject

Returns the value of attribute size.



32
33
34
# File 'lib/wind_up/queue_manager.rb', line 32

def size
  @size
end

#worker_classObject (readonly)

Returns the value of attribute worker_class.



32
33
34
# File 'lib/wind_up/queue_manager.rb', line 32

def worker_class
  @worker_class
end

Instance Method Details

#__shutdown__Object



57
58
59
60
# File 'lib/wind_up/queue_manager.rb', line 57

def __shutdown__
  @router.shutdown
  group.reverse_each(&:terminate)
end

#backlogInteger

Return the size of the queue backlog

Returns:

  • (Integer)

    the number of messages queueing



83
84
85
# File 'lib/wind_up/queue_manager.rb', line 83

def backlog
  @router.size
end

#inspectObject



87
88
89
# File 'lib/wind_up/queue_manager.rb', line 87

def inspect
  "<Celluloid::ActorProxy(#{self.class}) @size=#{@size} @worker_class=#{@worker_class} @backlog=#{backlog}>"
end

#queueObject

Access the Queue’s proxy



67
68
69
# File 'lib/wind_up/queue_manager.rb', line 67

def queue
  WindUp::QueueProxy.new Actor.current
end

#restart_actor(actor, reason) ⇒ Object

Restart a crashed actor



96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
# File 'lib/wind_up/queue_manager.rb', line 96

def restart_actor(actor, reason)
  member = group.find do |_member|
    _member.actor == actor
  end
  raise "A group member went missing. This shouldn't be!" unless member

  if reason
    member.restart(reason)
  else
    # Remove from group on clean shutdown
    group.delete_if do |_member|
      _member.actor == actor
    end
  end
end