Class: RabbitJobs::Worker

Inherits:
Object
  • Object
show all
Includes:
MainLoop
Defined in:
lib/rabbit_jobs/worker.rb

Overview

Worker daemon.

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from MainLoop

#log_daemon_error, #main_loop, #shutdown, #shutdown!, #startup

Constructor Details

#initialize(*queues) ⇒ Worker

Workers should be initialized with an array of string queue names. The order is important: a Worker will check the first queue given for a job. If none is found, it will check the second queue name given. If a job is found, it will be processed. Upon completion, the Worker will again check the first queue given, and so forth. In this way the queue list passed to a Worker on startup defines the priorities of queues.

If passed a single “*”, this Worker will operate on all queues in alphabetical order. Queues can be dynamically added or removed without needing to restart workers using this method.



35
36
37
38
39
40
41
42
# File 'lib/rabbit_jobs/worker.rb', line 35

def initialize(*queues)
  @queues = queues.map { |queue| queue.to_s.strip }.flatten.uniq
  if @queues == ['*'] || @queues.empty?
    @queues = RabbitJobs.config.routing_keys
  end

  fail 'Cannot initialize worker without queues.' if @queues.empty?
end

Instance Attribute Details

#consumerObject

Returns the value of attribute consumer.



11
12
13
# File 'lib/rabbit_jobs/worker.rb', line 11

def consumer
  @consumer
end

#process_nameObject

Returns the value of attribute process_name.



10
11
12
# File 'lib/rabbit_jobs/worker.rb', line 10

def process_name
  @process_name
end

Instance Method Details

#queue_params(routing_key) ⇒ Object



20
21
22
# File 'lib/rabbit_jobs/worker.rb', line 20

def queue_params(routing_key)
  RJ.config[:queues][routing_key.to_sym]
end

#queuesObject



44
45
46
# File 'lib/rabbit_jobs/worker.rb', line 44

def queues
  @queues || []
end

#workObject

Subscribes to queue and working on jobs



49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
# File 'lib/rabbit_jobs/worker.rb', line 49

def work
  return false unless startup
  @consumer ||= RJ::Consumer::JobConsumer.new

  $0 = process_name || "rj_worker (#{queues.join(', ')})"

  @processed_count = 0

  begin
    consumer_channel.prefetch(1)

    queues.each do |routing_key|
      consume_queue(routing_key)
    end

    RJ.logger.info 'Started.'

    return main_loop do
      RJ.logger.info "Processed jobs: #{@processed_count}."
    end
  rescue
    log_daemon_error($!)
  end

  true
end