Class: Sidekiq::Retry::Poller

Inherits:
Object
  • Object
show all
Includes:
Celluloid, Util
Defined in:
lib/sidekiq/retry.rb

Overview

The Poller checks Redis every N seconds for messages in the retry set have passed their retry timestamp and should be retried. If so, it just pops the message back onto its original queue so the workers can pick it up like any other message.

Instance Method Summary collapse

Methods included from Util

#constantize, logger, #logger, logger=, #process_id, #redis, #watchdog

Instance Method Details

#pollObject



34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
# File 'lib/sidekiq/retry.rb', line 34

def poll
  watchdog('retry poller thread died!') do

    Sidekiq.redis do |conn|
      # A message's "score" in Redis is the time at which it should be retried.
      # Just check Redis for the set of messages with a timestamp before now.
      messages = nil
      now = Time.now.to_f.to_s
      (messages, _) = conn.multi do
        conn.zrangebyscore('retry', '-inf', now)
        conn.zremrangebyscore('retry', '-inf', now)
      end

      messages.each do |message|
        logger.debug { "Retrying #{message}" }
        msg = MultiJson.decode(message)
        conn.rpush("queue:#{msg['queue']}", message)
      end
    end

    after(POLL_INTERVAL) { poll }
  end
end