Class: RockQueue::Worker
- Inherits:
-
Object
- Object
- RockQueue::Worker
- Defined in:
- lib/rock-queue/worker.rb
Instance Attribute Summary collapse
-
#verbose ⇒ Object
Whether the worker should log basic info to STDOUT.
Instance Method Summary collapse
-
#initialize ⇒ Worker
constructor
Initialize connection to queue server.
-
#work ⇒ Object
Main worker loop where all jobs are beeing pulled of the queue.
Constructor Details
#initialize ⇒ Worker
Initialize connection to queue server
7 8 9 10 11 12 13 14 15 |
# File 'lib/rock-queue/worker.rb', line 7 def initialize config = RockQueue::Config.settings @queue = RockQueue::Base.new config.adapter, { :server => config.host, :port => config.port, :log => config.log } RockQueue::Base.logger.info "=> Initializing..." end |
Instance Attribute Details
#verbose ⇒ Object
Whether the worker should log basic info to STDOUT
4 5 6 |
# File 'lib/rock-queue/worker.rb', line 4 def verbose @verbose end |
Instance Method Details
#work ⇒ Object
Main worker loop where all jobs are beeing pulled of the queue. This is also a place where every job starts and ends it’s lifecycle.
19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 |
# File 'lib/rock-queue/worker.rb', line 19 def work RockQueue::Base.logger.info "=> Worker ready. Hold your horses!" loop do ActiveRecord::Base.verify_active_connections! @queue.receive do |queue| if queue begin # code that actually performs the action args = queue.args.first RockQueue::Base.logger.info "=> Processing class #{queue.object.name} with params: #{args.inspect}" args.empty? ? queue.object.perform : queue.object.perform(args) rescue Object => e # Add failed processing and retry if queue.add_fail(e) sleep(queue.get_sleep_time) RockQueue::Base.logger.error "=> Processing fail! Retrying #{queue.fails.length}" RockQueue::Base.logger.error " Message: #{e.}" retry end end end end end end |