Class: Resque::Plugins::Fifo::Worker

Inherits:
Worker
  • Object
show all
Defined in:
lib/resque/plugins/fifo/worker.rb

Constant Summary collapse

UPDATE_DELAY =
10

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#main_queue_nameObject

Returns the value of attribute main_queue_name.



8
9
10
# File 'lib/resque/plugins/fifo/worker.rb', line 8

def main_queue_name
  @main_queue_name
end

Instance Method Details

#queues=(queues) ⇒ Object



10
11
12
13
14
15
16
17
18
19
# File 'lib/resque/plugins/fifo/worker.rb', line 10

def queues=(queues)
  queues = queues.empty? ? (ENV["QUEUES"] || ENV['QUEUE']).to_s.split(',') : queues
  @main_queue_name = "#{manager.queue_prefix}-#{SecureRandom.hex(10)}"

  @queues = ([:fifo_refresh, main_queue_name] + queues).map { |queue| queue.to_s.strip }
  unless ['*', '?', '{', '}', '[', ']'].any? { |char| @queues.join.include?(char) }
    @static_queues = @queues.flatten.uniq
  end
  validate_queues
end

#register_workerObject

Registers ourself as a worker. Useful when entering the worker lifecycle on startup.



51
52
53
54
55
56
# File 'lib/resque/plugins/fifo/worker.rb', line 51

def register_worker
  super

  puts "Fifo Startup - Updating worker list"
  manager.request_refresh
end

#reserveObject

Attempts to grab a job off one of the provided queues. Returns nil if no job can be found.



23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
# File 'lib/resque/plugins/fifo/worker.rb', line 23

def reserve
  queues.each do |queue|
    log_with_severity :debug, "Checking #{queue}"
    if job = Resque.reserve(queue)
      log_with_severity :debug, "Found job on #{queue}"

      if job.payload['enqueue_ts']
        delay_ts = Time.now.to_i - job.payload['enqueue_ts'].to_i
        max_delay = Resque.redis.get("fifo-stats-max-delay") || 0
        Resque.redis.incrby("fifo-stats-accumulated-delay", delay_ts)
        Resque.redis.incr("fifo-stats-accumulated-count")
        if (delay_ts > max_delay.to_i)
          Resque.redis.set("fifo-stats-max-delay", max_delay)
        end
      end
      return job
    end
  end

  nil
rescue Exception => e
  log_with_severity :error, "Error reserving job: #{e.inspect}"
  log_with_severity :error, e.backtrace.join("\n")
  raise e
end

#unregister_worker(exception = nil) ⇒ Object



58
59
60
61
62
63
# File 'lib/resque/plugins/fifo/worker.rb', line 58

def unregister_worker(exception = nil)
  super(exception)

  puts "Fifo Shutdown - Updating worker list"
  manager.request_refresh
end