Class: MicroQ::Manager::Default
- Inherits:
-
Object
- Object
- MicroQ::Manager::Default
- Includes:
- Celluloid
- Defined in:
- lib/micro_q/manager/default.rb
Overview
The default manager implementation. Wrapper for a Queue and a pool of Workers. At each time slice after start! was called, try to dequeue messages from the queue. Perform each message on the worker pool.
The pool of workers (more info):
https://github.com/celluloid/celluloid/wiki/Pools
The pool manages asynchronously assigning messages to available workers, handles exceptions by restarting the dead actors and is generally a beautiful abstraction on top of a group of linked actors/threads.
Instance Attribute Summary collapse
-
#queue ⇒ Object
readonly
Returns the value of attribute queue.
-
#workers ⇒ Object
readonly
Returns the value of attribute workers.
Class Method Summary collapse
Instance Method Summary collapse
-
#build_missing_workers ⇒ Object
Don’t shrink the pool if the config changes.
- #kill_all ⇒ Object
- #missing_worker_count ⇒ Object
- #queue_only? ⇒ Boolean
-
#reinitialize ⇒ Object
(also: #initialize)
Handle init/death of the Queue or the Worker pool.
- #start ⇒ Object
- #work_done(worker) ⇒ Object
Instance Attribute Details
#queue ⇒ Object (readonly)
Returns the value of attribute queue.
23 24 25 |
# File 'lib/micro_q/manager/default.rb', line 23 def queue @queue end |
#workers ⇒ Object (readonly)
Returns the value of attribute workers.
23 24 25 |
# File 'lib/micro_q/manager/default.rb', line 23 def workers @workers end |
Class Method Details
.shutdown! ⇒ Object
93 94 95 |
# File 'lib/micro_q/manager/default.rb', line 93 def self.shutdown! @shutdown = true end |
.shutdown? ⇒ Boolean
89 90 91 |
# File 'lib/micro_q/manager/default.rb', line 89 def self.shutdown? !!@shutdown end |
Instance Method Details
#build_missing_workers ⇒ Object
Don’t shrink the pool if the config changes
66 67 68 69 70 71 72 73 74 75 |
# File 'lib/micro_q/manager/default.rb', line 66 def build_missing_workers return if queue_only? workers.select!(&:alive?) @busy.select!(&:alive?) missing_worker_count.times do workers << MicroQ.config.worker.new_link(current_actor) end end |
#kill_all ⇒ Object
81 82 83 |
# File 'lib/micro_q/manager/default.rb', line 81 def kill_all (@workers + @busy).each {|w| w.terminate if w.alive? } end |
#missing_worker_count ⇒ Object
77 78 79 |
# File 'lib/micro_q/manager/default.rb', line 77 def missing_worker_count [MicroQ.config.workers - (workers.size + @busy.size), 0].max end |
#queue_only? ⇒ Boolean
85 86 87 |
# File 'lib/micro_q/manager/default.rb', line 85 def queue_only? @queue_only ||= MicroQ.config.sqs? && !MicroQ.config.worker_mode? end |
#reinitialize ⇒ Object Also known as: initialize
Handle init/death of the Queue or the Worker pool
50 51 52 53 54 55 56 57 58 59 60 61 |
# File 'lib/micro_q/manager/default.rb', line 50 def reinitialize(*) kill_all and return if self.class.shutdown? unless @queue && @queue.alive? @queue = MicroQ.config.queue.new_link end @busy ||= [] @workers ||= [] build_missing_workers end |
#start ⇒ Object
25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 |
# File 'lib/micro_q/manager/default.rb', line 25 def start return if queue_only? count = workers.size if ( = queue.dequeue(count)).any? .each do || worker = workers.pop @busy << worker worker.perform!() end end after(2) { start } end |
#work_done(worker) ⇒ Object
42 43 44 45 |
# File 'lib/micro_q/manager/default.rb', line 42 def work_done(worker) @busy.delete(worker) workers.push(worker) end |