Class: Sidekiq::Scheduled::Enq

Inherits:
Object
  • Object
show all
Includes:
Component
Defined in:
lib/sidekiq/scheduled.rb

Constant Summary collapse

LUA_ZPOPBYSCORE =
<<~LUA
  local key, now = KEYS[1], ARGV[1]
  local jobs = redis.call("zrange", key, "-inf", now, "byscore", "limit", 0, 1)
  if jobs[1] then
    redis.call("zrem", key, jobs[1])
    return jobs[1]
  end
LUA

Instance Attribute Summary

Attributes included from Component

#config

Instance Method Summary collapse

Methods included from Component

#fire_event, #handle_exception, #hostname, #identity, #logger, #process_nonce, #redis, #safe_thread, #tid, #watchdog

Constructor Details

#initialize(container) ⇒ Enq

Returns a new instance of Enq.



22
23
24
25
26
27
# File 'lib/sidekiq/scheduled.rb', line 22

def initialize(container)
  @config = container
  @client = Sidekiq::Client.new(config: container)
  @done = false
  @lua_zpopbyscore_sha = nil
end

Instance Method Details

#enqueue_jobs(sorted_sets = SETS) ⇒ Object



29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# File 'lib/sidekiq/scheduled.rb', line 29

def enqueue_jobs(sorted_sets = SETS)
  # A job's "score" in Redis is the time at which it should be processed.
  # Just check Redis for the set of jobs with a timestamp before now.
  redis do |conn|
    sorted_sets.each do |sorted_set|
      # Get next item in the queue with score (time to execute) <= now.
      # We need to go through the list one at a time to reduce the risk of something
      # going wrong between the time jobs are popped from the scheduled queue and when
      # they are pushed onto a work queue and losing the jobs.
      while !@done && (job = zpopbyscore(conn, keys: [sorted_set], argv: [Time.now.to_f.to_s]))
        @client.push(Sidekiq.load_json(job))
        logger.debug { "enqueued #{sorted_set}: #{job}" }
      end
    end
  end
end

#terminateObject



46
47
48
# File 'lib/sidekiq/scheduled.rb', line 46

def terminate
  @done = true
end