Class: Msgr::Pool
- Inherits:
-
Object
- Object
- Msgr::Pool
- Includes:
- Celluloid, Logging
- Defined in:
- lib/msgr/pool.rb
Defined Under Namespace
Classes: Worker
Instance Attribute Summary collapse
-
#size ⇒ Object
readonly
Returns the value of attribute size.
Instance Method Summary collapse
-
#available? ⇒ Boolean
Check if a worker is available.
- #busy ⇒ Object
-
#dispatch(*args) ⇒ Object
Dispatch given message to a worker.
-
#executed(worker) ⇒ Object
Called by worker to indicated it has finished processing.
-
#fetch_idle_worker ⇒ Object
Return an idle worker.
- #idle ⇒ Object
-
#initialize(runner_klass, opts = {}) ⇒ Pool
constructor
A new instance of Pool.
- #log_status ⇒ Object
- #messages ⇒ Object
- #running? ⇒ Boolean
- #start ⇒ Object
-
#stop ⇒ Object
(also: #shutdown)
Request a graceful shutdown of all pool workers.
- #to_s ⇒ Object
Methods included from Logging
Constructor Details
#initialize(runner_klass, opts = {}) ⇒ Pool
Returns a new instance of Pool.
8 9 10 11 12 13 14 15 16 17 18 19 |
# File 'lib/msgr/pool.rb', line 8 def initialize(runner_klass, opts = {}) @runner_klass = runner_klass @runner_args = opts[:args] ? Array(opts[:args]) : [] @size = (opts[:size] || Celluloid.cores).to_i @running = false raise ArgumentError.new 'Pool size must be greater zero.' if @size <= 0 log(:debug) { "Inialize size => #{@size}" } every([opts.fetch(:stats_interval, 30).to_i, 1].max) { log_status } if opts[:nostats].nil? || opts[:nostats] end |
Instance Attribute Details
#size ⇒ Object (readonly)
Returns the value of attribute size.
6 7 8 |
# File 'lib/msgr/pool.rb', line 6 def size @size end |
Instance Method Details
#available? ⇒ Boolean
Check if a worker is available.
71 72 73 |
# File 'lib/msgr/pool.rb', line 71 def available? idle.any? end |
#busy ⇒ Object
26 |
# File 'lib/msgr/pool.rb', line 26 def busy; @busy ||= [] end |
#dispatch(*args) ⇒ Object
Dispatch given message to a worker.
81 82 83 84 85 |
# File 'lib/msgr/pool.rb', line 81 def dispatch(*args) log(:debug) { 'Dispatch message to worker.' } fetch_idle_worker.future :dispatch, args end |
#executed(worker) ⇒ Object
Called by worker to indicated it has finished processing.
103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 |
# File 'lib/msgr/pool.rb', line 103 def executed(worker) busy.delete worker if running? idle << worker after(0) { signal :worker_done } else log(:debug) { "Terminate worker. Still #{busy.size} to go..." } worker.terminate if worker.alive? if busy.empty? log(:debug) { 'All worker down. Signal :shutdown.' } after(0) { signal :shutdown } end end end |
#fetch_idle_worker ⇒ Object
Return an idle worker.
89 90 91 92 93 94 95 96 97 |
# File 'lib/msgr/pool.rb', line 89 def fetch_idle_worker if (worker = idle.shift) busy << worker worker else wait :worker_done fetch_idle_worker end end |
#idle ⇒ Object
25 |
# File 'lib/msgr/pool.rb', line 25 def idle; @idle ||= [] end |
#log_status ⇒ Object
44 45 46 |
# File 'lib/msgr/pool.rb', line 44 def log_status log(:info) { "[STATUS] Idle: #{idle.size} Busy: #{busy.size}" } end |
#messages ⇒ Object
75 76 77 |
# File 'lib/msgr/pool.rb', line 75 def ||= [] end |
#running? ⇒ Boolean
21 22 23 |
# File 'lib/msgr/pool.rb', line 21 def running? @running end |
#start ⇒ Object
28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 |
# File 'lib/msgr/pool.rb', line 28 def start return if running? log(:debug) { 'Spin up worker pool' } @running = true idle.clear busy.clear @size.times.map do |index| idle << Worker.new_link(Actor.current, index, @runner_klass, @runner_args) end log(:debug) { 'Pool ready.' } end |
#stop ⇒ Object Also known as: shutdown
Request a graceful shutdown of all pool workers.
50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 |
# File 'lib/msgr/pool.rb', line 50 def stop log(:debug) { 'Graceful shutdown requested.' } @running = false idle.each { |worker| worker.terminate } idle.clear if busy.any? log(:debug) { "Wait for #{busy.size} workers to terminate." } wait :shutdown end log(:debug) { 'Graceful shutdown done.' } end |
#to_s ⇒ Object
120 121 122 |
# File 'lib/msgr/pool.rb', line 120 def to_s "#{self.class.name}[#{@runner_klass}]<#{object_id}>" end |