Class: Leveret::Worker
- Inherits:
-
Object
- Object
- Leveret::Worker
- Extended by:
- Forwardable
- Defined in:
- lib/leveret/worker.rb
Overview
Subscribes to one or more queues and forks workers to perform jobs as they arrive
Call #do_work to subscribe to all queues and block the main thread.
Instance Attribute Summary collapse
-
#consumers ⇒ Array<Bunny::Consumer>
All of the actively subscribed queues.
-
#queues ⇒ Array<Queue>
All of the queues this worker is going to subscribe to.
Instance Method Summary collapse
-
#do_work ⇒ Object
Subscribe to all of the #queues and begin processing jobs from them.
-
#initialize(*queue_names) ⇒ Worker
constructor
Create a new worker to process jobs from the list of queue names passed.
Constructor Details
#initialize(*queue_names) ⇒ Worker
Create a new worker to process jobs from the list of queue names passed
20 21 22 23 24 25 26 |
# File 'lib/leveret/worker.rb', line 20 def initialize(*queue_names) queue_names << configuration.default_queue_name if queue_names.empty? self.queues = queue_names.map { |name| Leveret::Queue.new(name) } self.consumers = [] @time_to_die = false end |
Instance Attribute Details
#consumers ⇒ Array<Bunny::Consumer>
Returns All of the actively subscribed queues.
12 |
# File 'lib/leveret/worker.rb', line 12 attr_accessor :queues, :consumers |
#queues ⇒ Array<Queue>
Returns All of the queues this worker is going to subscribe to.
12 13 14 |
# File 'lib/leveret/worker.rb', line 12 def queues @queues end |
Instance Method Details
#do_work ⇒ Object
Subscribe to all of the #queues and begin processing jobs from them. This will block the main thread until an interrupt is received.
30 31 32 33 34 35 36 37 38 39 40 41 42 |
# File 'lib/leveret/worker.rb', line 30 def do_work log.info "Starting master process for #{queues.map(&:name).join(', ')}" prepare_for_work loop do if @time_to_die cancel_subscriptions break end sleep 1 end log.info "Exiting master process" end |