Class: RockQueue::Worker

Inherits:
Object
  • Object
show all
Defined in:
lib/rock-queue/worker.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(*queues) ⇒ Worker

Initialize connection to queue server



7
8
9
10
11
# File 'lib/rock-queue/worker.rb', line 7

def initialize(*queues)
  queues = [:default] if queues.size == 0
  @queues = queues
  RockQueue.logger.info "=> Initializing..."
end

Instance Attribute Details

#verboseObject

Whether the worker should log basic info to STDOUT



4
5
6
# File 'lib/rock-queue/worker.rb', line 4

def verbose
  @verbose
end

Instance Method Details

#queuesObject

Returns a list of queues A single ‘*’ means all queues



51
52
53
# File 'lib/rock-queue/worker.rb', line 51

def queues
  @queues[0] == "*" ? RockQueue.queues : @queues
end

#work(interval = 5) ⇒ Object

Main worker loop where all jobs are beeing pulled of the queue. This is also a place where every job starts and ends it’s lifecycle.



15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
# File 'lib/rock-queue/worker.rb', line 15

def work(interval = 5)
  RockQueue.logger.info "=> Worker ready. Hold your horses!"
  stop = false
  loop do
    sleep(interval)
    
    ActiveRecord::Base.verify_active_connections!
    queues.each do |qname|
      obj, args = RockQueue.pop(qname)
      if obj
        queue = QueueObject.new(obj, args)
        begin
          # code that actually performs the action
          args = queue.args.first
          RockQueue.logger.info "=> Processing class #{queue.object.name} with params: #{args.inspect}"
          args.empty? ? queue.object.perform : queue.object.perform(args)
        rescue Object => e
          # Add failed processing and retry
          if queue.add_fail(e)
            sleep(queue.get_sleep_time)
            RockQueue.logger.error "=> Processing fail! Retrying #{queue.fails.length}"
            RockQueue.logger.error "   Message: #{e.message}"
            retry
          end
        end
        stop = false
      else
        stop = true if interval == 0
      end
    end
    break if stop
  end
end