Class: Leveret::Worker

Inherits:
Object
  • Object
show all
Extended by:
Forwardable
Defined in:
lib/leveret/worker.rb

Overview

Subscribes to one or more queues and forks workers to perform jobs as they arrive

Call #do_work to subscribe to all queues and block the main thread.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(*queue_names) ⇒ Worker

Create a new worker to process jobs from the list of queue names passed

Parameters:

  • queue_names (Array<String>)

    ([Leveret.configuration.default_queue_name]) A list of queue names for this worker to subscribe to and process



20
21
22
23
24
25
26
# File 'lib/leveret/worker.rb', line 20

def initialize(*queue_names)
  queue_names << configuration.default_queue_name if queue_names.empty?

  self.queues = queue_names.map { |name| Leveret::Queue.new(name) }
  self.consumers = []
  @time_to_die = false
end

Instance Attribute Details

#consumersArray<Bunny::Consumer>

Returns All of the actively subscribed queues.

Returns:

  • (Array<Bunny::Consumer>)

    All of the actively subscribed queues



12
# File 'lib/leveret/worker.rb', line 12

attr_accessor :queues, :consumers

#queuesArray<Queue>

Returns All of the queues this worker is going to subscribe to.

Returns:

  • (Array<Queue>)

    All of the queues this worker is going to subscribe to



12
13
14
# File 'lib/leveret/worker.rb', line 12

def queues
  @queues
end

Instance Method Details

#do_workObject

Subscribe to all of the #queues and begin processing jobs from them. This will block the main thread until an interrupt is received.



30
31
32
33
34
35
36
37
38
39
40
41
42
# File 'lib/leveret/worker.rb', line 30

def do_work
  log.info "Starting master process for #{queues.map(&:name).join(', ')}"
  prepare_for_work

  loop do
    if @time_to_die
      cancel_subscriptions
      break
    end
    sleep 1
  end
  log.info "Exiting master process"
end