Class: Sidekiq::Scheduled::Poller

Inherits:
Object
  • Object
show all
Includes:
Actor, Util
Defined in:
lib/sidekiq/scheduled.rb

Overview

The Poller checks Redis every N seconds for messages in the retry or scheduled set have passed their timestamp and should be enqueued. If so, it just pops the message back onto its original queue so the workers can pick it up like any other message.

Constant Summary collapse

SETS =
%w(retry schedule)

Constants included from Util

Util::EXPIRY

Instance Method Summary collapse

Methods included from Actor

included

Methods included from Util

#fire_event, #hostname, #identity, #logger, #redis, #watchdog

Methods included from ExceptionHandler

#handle_exception

Instance Method Details

#poll(first_time = false) ⇒ Object



21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
# File 'lib/sidekiq/scheduled.rb', line 21

def poll(first_time=false)
  watchdog('scheduling poller thread died!') do
    initial_wait if first_time

    begin
      # A message's "score" in Redis is the time at which it should be processed.
      # Just check Redis for the set of messages with a timestamp before now.
      now = Time.now.to_f.to_s
      Sidekiq.redis do |conn|
        SETS.each do |sorted_set|
          # Get the next item in the queue if it's score (time to execute) is <= now.
          # We need to go through the list one at a time to reduce the risk of something
          # going wrong between the time jobs are popped from the scheduled queue and when
          # they are pushed onto a work queue and losing the jobs.
          while message = conn.zrangebyscore(sorted_set, '-inf', now, :limit => [0, 1]).first do

            # Pop item off the queue and add it to the work queue. If the job can't be popped from
            # the queue, it's because another process already popped it so we can move on to the
            # next one.
            if conn.zrem(sorted_set, message)
              Sidekiq::Client.push(Sidekiq.load_json(message))
              logger.debug { "enqueued #{sorted_set}: #{message}" }
            end
          end
        end
      end
    rescue => ex
      # Most likely a problem with redis networking.
      # Punt and try again at the next interval
      logger.error ex.message
      logger.error ex.backtrace.first
    end

    # Randomizing scales the interval by half since
    # on average calling `rand` returns 0.5.
    # We make up for this by doubling the interval
    after(poll_interval * 2 * rand) { poll }
  end
end