Class: WindUp::QueueManager
- Inherits:
-
Object
- Object
- WindUp::QueueManager
- Includes:
- Celluloid
- Defined in:
- lib/wind_up/queue_manager.rb
Instance Attribute Summary collapse
-
#router ⇒ Object
readonly
Returns the value of attribute router.
-
#size ⇒ Object
Returns the value of attribute size.
-
#worker_class ⇒ Object
readonly
Returns the value of attribute worker_class.
Instance Method Summary collapse
- #__shutdown__ ⇒ Object
-
#backlog ⇒ Integer
Return the size of the queue backlog.
-
#initialize(worker_class, options = {}) ⇒ QueueManager
constructor
Don’t use QueueManager.new, use Klass.queue instead.
- #inspect ⇒ Object
-
#queue ⇒ Object
Access the Queue’s proxy.
-
#restart_actor(actor, reason) ⇒ Object
Restart a crashed actor.
Constructor Details
#initialize(worker_class, options = {}) ⇒ QueueManager
Don’t use QueueManager.new, use Klass.queue instead
37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 |
# File 'lib/wind_up/queue_manager.rb', line 37 def initialize(worker_class, = {}) defaults = { :size => [Celluloid.cores, 2].max, :router => :first_available } = defaults.merge @worker_class = worker_class @args = [:args] ? Array([:args]) : [] @size = [:size] router_class = Routers[[:router]] raise ArgumentError, "Router class not recognized" unless router_class @router = router_class.new @registry = Celluloid::Registry.root @group = [] resize_group end |
Instance Attribute Details
#router ⇒ Object (readonly)
Returns the value of attribute router.
32 33 34 |
# File 'lib/wind_up/queue_manager.rb', line 32 def router @router end |
#size ⇒ Object
Returns the value of attribute size.
32 33 34 |
# File 'lib/wind_up/queue_manager.rb', line 32 def size @size end |
#worker_class ⇒ Object (readonly)
Returns the value of attribute worker_class.
32 33 34 |
# File 'lib/wind_up/queue_manager.rb', line 32 def worker_class @worker_class end |
Instance Method Details
#__shutdown__ ⇒ Object
57 58 59 60 |
# File 'lib/wind_up/queue_manager.rb', line 57 def __shutdown__ @router.shutdown group.reverse_each(&:terminate) end |
#backlog ⇒ Integer
Return the size of the queue backlog
83 84 85 |
# File 'lib/wind_up/queue_manager.rb', line 83 def backlog @router.size end |
#inspect ⇒ Object
87 88 89 |
# File 'lib/wind_up/queue_manager.rb', line 87 def inspect "<Celluloid::ActorProxy(#{self.class}) @size=#{@size} @worker_class=#{@worker_class} @backlog=#{backlog}>" end |
#queue ⇒ Object
Access the Queue’s proxy
67 68 69 |
# File 'lib/wind_up/queue_manager.rb', line 67 def queue WindUp::QueueProxy.new Actor.current end |
#restart_actor(actor, reason) ⇒ Object
Restart a crashed actor
96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 |
# File 'lib/wind_up/queue_manager.rb', line 96 def restart_actor(actor, reason) member = group.find do |_member| _member.actor == actor end raise "A group member went missing. This shouldn't be!" unless member if reason member.restart(reason) else # Remove from group on clean shutdown group.delete_if do |_member| _member.actor == actor end end end |