Class: RabbitJobs::Worker
- Inherits:
-
Object
- Object
- RabbitJobs::Worker
- Includes:
- MainLoop
- Defined in:
- lib/rabbit_jobs/worker.rb
Overview
Worker daemon.
Instance Attribute Summary collapse
-
#consumer ⇒ Object
Returns the value of attribute consumer.
-
#process_name ⇒ Object
Returns the value of attribute process_name.
Instance Method Summary collapse
-
#initialize(*queues) ⇒ Worker
constructor
Workers should be initialized with an array of string queue names.
- #queue_params(routing_key) ⇒ Object
- #queues ⇒ Object
-
#work ⇒ Object
Subscribes to queue and working on jobs.
Methods included from MainLoop
#log_daemon_error, #main_loop, #shutdown, #shutdown!, #startup
Constructor Details
#initialize(*queues) ⇒ Worker
Workers should be initialized with an array of string queue names. The order is important: a Worker will check the first queue given for a job. If none is found, it will check the second queue name given. If a job is found, it will be processed. Upon completion, the Worker will again check the first queue given, and so forth. In this way the queue list passed to a Worker on startup defines the priorities of queues.
If passed a single “*”, this Worker will operate on all queues in alphabetical order. Queues can be dynamically added or removed without needing to restart workers using this method.
35 36 37 38 39 40 41 42 |
# File 'lib/rabbit_jobs/worker.rb', line 35 def initialize(*queues) @queues = queues.map { |queue| queue.to_s.strip }.flatten.uniq if @queues == ['*'] || @queues.empty? @queues = RabbitJobs.config.routing_keys end fail 'Cannot initialize worker without queues.' if @queues.empty? end |
Instance Attribute Details
#consumer ⇒ Object
Returns the value of attribute consumer.
11 12 13 |
# File 'lib/rabbit_jobs/worker.rb', line 11 def consumer @consumer end |
#process_name ⇒ Object
Returns the value of attribute process_name.
10 11 12 |
# File 'lib/rabbit_jobs/worker.rb', line 10 def process_name @process_name end |
Instance Method Details
#queue_params(routing_key) ⇒ Object
20 21 22 |
# File 'lib/rabbit_jobs/worker.rb', line 20 def queue_params(routing_key) RJ.config[:queues][routing_key.to_sym] end |
#queues ⇒ Object
44 45 46 |
# File 'lib/rabbit_jobs/worker.rb', line 44 def queues @queues || [] end |
#work ⇒ Object
Subscribes to queue and working on jobs
49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 |
# File 'lib/rabbit_jobs/worker.rb', line 49 def work return false unless startup @consumer ||= RJ::Consumer::JobConsumer.new $0 = process_name || "rj_worker (#{queues.join(', ')})" @processed_count = 0 begin consumer_channel.prefetch(1) queues.each do |routing_key| consume_queue(routing_key) end RJ.logger.info 'Started.' return main_loop do RJ.logger.info "Processed jobs: #{@processed_count}." end rescue log_daemon_error($!) end true end |