Class: Msgr::Pool
- Inherits:
-
Object
- Object
- Msgr::Pool
- Includes:
- Celluloid, Logging
- Defined in:
- lib/msgr/pool.rb
Defined Under Namespace
Classes: Worker
Instance Attribute Summary collapse
-
#size ⇒ Object
readonly
Returns the value of attribute size.
Instance Method Summary collapse
-
#available? ⇒ Boolean
Check if a worker is available.
- #busy ⇒ Object
-
#dispatch(message, *args) ⇒ Object
Dispatch given message to a worker.
-
#executed(worker) ⇒ Object
Called by worker to indicated it has finished processing.
- #idle ⇒ Object
-
#initialize(runner_klass, opts = {}) ⇒ Pool
constructor
A new instance of Pool.
- #log_status ⇒ Object
- #messages ⇒ Object
- #poll(worker) ⇒ Object
- #running? ⇒ Boolean
- #start ⇒ Object
-
#stop ⇒ Object
Request a graceful shutdown of all pool workers.
- #to_s ⇒ Object
Methods included from Logging
Constructor Details
#initialize(runner_klass, opts = {}) ⇒ Pool
Returns a new instance of Pool.
8 9 10 11 12 13 14 15 16 17 18 |
# File 'lib/msgr/pool.rb', line 8 def initialize(runner_klass, opts = {}) @runner_klass = runner_klass @runner_args = opts[:args] ? Array(opts[:args]) : [] @size = (opts[:size] || Celluloid.cores).to_i @running = false log(:debug) { "Inialize size => #{@size}" } start if opts[:autostart].nil? || opts[:autostart] every(30) { log_status } if opts[:nostats].nil? || opts[:nostats] end |
Instance Attribute Details
#size ⇒ Object (readonly)
Returns the value of attribute size.
6 7 8 |
# File 'lib/msgr/pool.rb', line 6 def size @size end |
Instance Method Details
#available? ⇒ Boolean
Check if a worker is available.
71 72 73 |
# File 'lib/msgr/pool.rb', line 71 def available? idle.any? end |
#busy ⇒ Object
25 |
# File 'lib/msgr/pool.rb', line 25 def busy; @busy ||= [] end |
#dispatch(message, *args) ⇒ Object
Dispatch given message to a worker.
81 82 83 84 |
# File 'lib/msgr/pool.rb', line 81 def dispatch(, *args) .push [, args] after(0) { signal :dispatch } end |
#executed(worker) ⇒ Object
Called by worker to indicated it has finished processing.
90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 |
# File 'lib/msgr/pool.rb', line 90 def executed(worker) busy.delete worker if running? idle << worker poll worker else log(:debug) { "Terminate worker. Still #{busy.size} to go..." } worker.terminate if worker.alive? if busy.empty? log(:debug) { 'All worker down. Signal :shutdown.' } after(0) { signal :shutdown } end end end |
#idle ⇒ Object
24 |
# File 'lib/msgr/pool.rb', line 24 def idle; @idle ||= [] end |
#log_status ⇒ Object
45 46 47 |
# File 'lib/msgr/pool.rb', line 45 def log_status log(:info) { "[STATUS] Idle: #{idle.size} Busy: #{busy.size}" } end |
#messages ⇒ Object
75 76 77 |
# File 'lib/msgr/pool.rb', line 75 def ||= [] end |
#poll(worker) ⇒ Object
107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 |
# File 'lib/msgr/pool.rb', line 107 def poll(worker) return unless worker.alive? if running? if ( = exclusive { .shift }) idle.delete worker busy << worker worker.dispatch [0], [1] else after(1) { poll worker } end else worker.terminate if worker.alive? after(0) { signal(:shutdown) } if @busy.empty? end end |
#running? ⇒ Boolean
20 21 22 |
# File 'lib/msgr/pool.rb', line 20 def running? @running end |
#start ⇒ Object
27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 |
# File 'lib/msgr/pool.rb', line 27 def start return if running? log(:debug) { 'Spin up worker pool' } @running = true idle.clear busy.clear @size.times.map do |index| idle << Worker.new_link(Actor.current, index, @runner_klass, @runner_args) end log(:debug) { 'Startup done. Invoke worker polling.' } idle.each { |worker| async.poll worker } end |
#stop ⇒ Object
Request a graceful shutdown of all pool workers.
51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 |
# File 'lib/msgr/pool.rb', line 51 def stop log(:debug) { 'Graceful shutdown requested.' } @running = false idle.each { |worker| worker.terminate } idle.clear if busy.any? log(:debug) { "Wait for #{busy.size} workers to terminate." } wait :shutdown end log(:debug) { 'Graceful shutdown done.' } end |
#to_s ⇒ Object
125 126 127 |
# File 'lib/msgr/pool.rb', line 125 def to_s "#{self.class.name}[#{@runner_klass}]<#{object_id}>" end |