Class: Sidekiq::Scheduled::Enq

Inherits:
Object
  • Object
show all
Defined in:
lib/sidekiq/scheduled.rb

Constant Summary collapse

LUA_ZPOPBYSCORE =
<<~LUA
  local key, now = KEYS[1], ARGV[1]
  local jobs = redis.call("zrangebyscore", key, "-inf", now, "limit", 0, 1)
  if jobs[1] then
    redis.call("zrem", key, jobs[1])
    return jobs[1]
  end
LUA

Instance Method Summary collapse

Constructor Details

#initializeEnq

Returns a new instance of Enq.



21
22
23
24
# File 'lib/sidekiq/scheduled.rb', line 21

def initialize
  @done = false
  @lua_zpopbyscore_sha = nil
end

Instance Method Details

#enqueue_jobs(sorted_sets = SETS) ⇒ Object



26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
# File 'lib/sidekiq/scheduled.rb', line 26

def enqueue_jobs(sorted_sets = SETS)
  # A job's "score" in Redis is the time at which it should be processed.
  # Just check Redis for the set of jobs with a timestamp before now.
  Sidekiq.redis do |conn|
    sorted_sets.each do |sorted_set|
      # Get next item in the queue with score (time to execute) <= now.
      # We need to go through the list one at a time to reduce the risk of something
      # going wrong between the time jobs are popped from the scheduled queue and when
      # they are pushed onto a work queue and losing the jobs.
      while !@done && (job = zpopbyscore(conn, keys: [sorted_set], argv: [Time.now.to_f.to_s]))
        Sidekiq::Client.push(Sidekiq.load_json(job))
        Sidekiq.logger.debug { "enqueued #{sorted_set}: #{job}" }
      end
    end
  end
end

#terminateObject



43
44
45
# File 'lib/sidekiq/scheduled.rb', line 43

def terminate
  @done = true
end