Class: MicroQ::Manager::Default

Inherits:
Object
  • Object
show all
Includes:
Celluloid
Defined in:
lib/micro_q/manager/default.rb

Overview

The default manager implementation. Wrapper for a Queue and a pool of Workers. At each time slice after start! was called, try to dequeue messages from the queue. Perform each message on the worker pool.

The pool of workers (more info):

https://github.com/celluloid/celluloid/wiki/Pools

The pool manages asynchronously assigning messages to available workers, handles exceptions by restarting the dead actors and is generally a beautiful abstraction on top of a group of linked actors/threads.

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#queueObject (readonly)

Returns the value of attribute queue.



23
24
25
# File 'lib/micro_q/manager/default.rb', line 23

def queue
  @queue
end

#workersObject (readonly)

Returns the value of attribute workers.



23
24
25
# File 'lib/micro_q/manager/default.rb', line 23

def workers
  @workers
end

Class Method Details

.shutdown!Object



93
94
95
# File 'lib/micro_q/manager/default.rb', line 93

def self.shutdown!
  @shutdown = true
end

.shutdown?Boolean

Returns:

  • (Boolean)


89
90
91
# File 'lib/micro_q/manager/default.rb', line 89

def self.shutdown?
  !!@shutdown
end

Instance Method Details

#build_missing_workersObject

Don’t shrink the pool if the config changes



66
67
68
69
70
71
72
73
74
75
# File 'lib/micro_q/manager/default.rb', line 66

def build_missing_workers
  return if queue_only?

  workers.select!(&:alive?)
  @busy.select!(&:alive?)

  missing_worker_count.times do
    workers << MicroQ.config.worker.new_link(current_actor)
  end
end

#kill_allObject



81
82
83
# File 'lib/micro_q/manager/default.rb', line 81

def kill_all
  (@workers + @busy).each {|w| w.terminate if w.alive? }
end

#missing_worker_countObject



77
78
79
# File 'lib/micro_q/manager/default.rb', line 77

def missing_worker_count
  [MicroQ.config.workers - (workers.size + @busy.size), 0].max
end

#queue_only?Boolean

Returns:

  • (Boolean)


85
86
87
# File 'lib/micro_q/manager/default.rb', line 85

def queue_only?
  @queue_only ||= MicroQ.config.sqs? && !MicroQ.config.worker_mode?
end

#reinitializeObject Also known as: initialize

Handle init/death of the Queue or the Worker pool



50
51
52
53
54
55
56
57
58
59
60
61
# File 'lib/micro_q/manager/default.rb', line 50

def reinitialize(*)
  kill_all and return if self.class.shutdown?

  unless @queue && @queue.alive?
    @queue = MicroQ.config.queue.new_link
  end

  @busy ||= []
  @workers ||= []

  build_missing_workers
end

#startObject



25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
# File 'lib/micro_q/manager/default.rb', line 25

def start
  return if queue_only?

  count = workers.size

  if (messages = queue.dequeue(count)).any?
    messages.each do |message|
      worker = workers.pop
      @busy << worker

      worker.perform!(message)
    end
  end

  after(2) { start }
end

#work_done(worker) ⇒ Object



42
43
44
45
# File 'lib/micro_q/manager/default.rb', line 42

def work_done(worker)
  @busy.delete(worker)
  workers.push(worker)
end