Class: Sidekiq::Scheduled::Poller

Inherits:
Object
  • Object
show all
Includes:
Celluloid, Util
Defined in:
lib/sidekiq/scheduled.rb

Overview

The Poller checks Redis every N seconds for messages in the retry or scheduled set have passed their timestamp and should be enqueued. If so, it just pops the message back onto its original queue so the workers can pick it up like any other message.

Constant Summary collapse

SETS =
%w(retry schedule)

Constants included from Util

Util::EXPIRY

Instance Method Summary collapse

Methods included from Util

#constantize, #logger, #process_id, #redis, #watchdog

Instance Method Details

#pollObject



21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# File 'lib/sidekiq/scheduled.rb', line 21

def poll
  watchdog('scheduling poller thread died!') do
    # A message's "score" in Redis is the time at which it should be processed.
    # Just check Redis for the set of messages with a timestamp before now.
    now = Time.now.to_f.to_s
    Sidekiq.redis do |conn|
      SETS.each do |sorted_set|
        (messages, _) = conn.multi do
          conn.zrangebyscore(sorted_set, '-inf', now)
          conn.zremrangebyscore(sorted_set, '-inf', now)
        end

        messages.each do |message|
          logger.debug { "enqueued #{sorted_set}: #{message}" }
          msg = Sidekiq.load_json(message)
          conn.rpush("queue:#{msg['queue']}", message)
        end
      end
    end

    after(POLL_INTERVAL) { poll }
  end
end